The client-server model with multiple connections

If we are working with Python version 3.4+, there is a module called selectors, which provides an API for quickly building an object-oriented server based on the I/O primitives. The documentation and an example of this is available at https://docs.python.org/3.7/library/selectors.html.

In this example, we are going to implement a server that controls several connections using the selectors package.

You can find the following code in the tcp_server_selectors.py file:

#!/usr/bin/env python3

import selectors
import types
import socket

selector = selectors.DefaultSelector()

def accept_connection(sock):
connection, address = sock.accept()
print('Connection accepted in {}'.format(address))
# We put the socket in non-blocking mode
connection.setblocking(False)
data = types.SimpleNamespace(addr=address, inb=b'', outb=b'')
events = selectors.EVENT_READ | selectors.EVENT_WRITE
selector.register(connection, events, data=data)

In the previous code block, we defined the accept_connection() method for accepting connections from the clients, put the socket in non-blocking mode, and registered a selector for capturing read and write events. In the following code block, we are defining the service_connection() method for differentiating messages marked as event read selector and messages marked as event write selector:

def service_connection(key, mask):
sock = key.fileobj
data = key.data
if mask & selectors.EVENT_READ:
recv_data = sock.recv(BUFFER_SIZE)
if recv_data:
data.outb += recv_data
else:
print('Closing connection in {}'.format(data.addr))
selector.unregister(sock)
sock.close()
if mask & selectors.EVENT_WRITE:
if data.outb:
print('Echo from {} to {}'.format(repr(data.outb), data.addr))
sent = sock.send(data.outb)
data.outb = data.outb[sent:]

In the following block of code, we can see our main program for establishing the host, port, and BUFFER_SIZE constants, and configuring our socket in non-blocking mode. We will also register the socket to be monitored by the selector functions:

if __name__ == '__main__':
host = 'localhost'
port = 12345
BUFFER_SIZE = 1024
# We create a TCP socket
socket_tcp = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
# We configure the socket in non-blocking mode
socket_tcp.setblocking(False)
socket_tcp.bind((host, port))
socket_tcp.listen()
print('Openned socket for listening connections on {} {}'.format(host, port))
socket_tcp.setblocking(False)
# We register the socket to be monitored by the selector functions
selector.register(socket_tcp, selectors.EVENT_READ, data=None)
while socket_tcp:
events = selector.select(timeout=None)
for key, mask in events:
if key.data is None:
accept_connection(key.fileobj)
else:
service_connection(key, mask)
socket_tcp.close()
print('Connection finished.')

Let's explore our implementation a bit more:

  • Like before, we defined the variables that are necessary to link with the socket: host, port, BUFFER_SIZE, and MESSAGE.
  • We configured the socket for non-blocking mode with socket_tcp.setblocking(False). Socket module functions return a value immediately, they have to wait for a system call to complete to return a value. When we configure the socket in non-blocking, we make sure our application does not stop waiting for a response from the system.
  • We start a while loop in which the first line is events = sel.select (timeout = None). This function blocks until there are sockets ready to be written/read. Then it returns a list of pairs (key, event), one for each socket. The key is a SelectorKey that contains a fileobj attribute. Key.fileobj is the socket object, and mask is an event mask for operations that are ready.
  • If key.data is None, we know that it comes from the socket that is open and we need to accept the connection. We call the accept_connection() function that we defined to handle this situation.
  • If key.data is not None, it is a client socket that is ready to be accepted and we need to address it. So we call the service_connection() function with key and mask as arguments, which contain everything we need to operate the socket.

Now, let's look at an implementation of a client. It is quite similar to the implementation of the server but instead of waiting for connections, the client starts to initiate connections with the start_connections() function.

You can find the following code in the tcp_client_selectors.py file:

#!/usr/bin/env python3

import socket
import selectors
import types

selector = selectors.DefaultSelector()
messages = ['This is the first message', 'This is the second message']
BUFFER_SIZE = 1024

def start_connections(host, port, num_conns):
server_address = (host, port)
for i in range(0, num_conns):
connid = i + 1
print('Starting connection {} towards {}'.format(connid, server_address))
socket_tcp = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
# We connect using connect_ex () instead of connect
socket_tcp.connect_ex(server_address)
events = selectors.EVENT_READ | selectors.EVENT_WRITE
data = types.SimpleNamespace(connid=connid,
msg_total=sum(len(m) for m in messages), recv_total=0,
messages=list(messages),outb=b'')
selector.register(socket_tcp, events, data=data)
events = selector.select()
for key, mask in events:
service_connection(key, mask)

In the previous code block, we defined the start_connections() method to connect with the server and register a selector for capturing read and write events. In the following code block, we define the service_connection() method for differentiating messages marked as event read selector and event write selector:

def service_connection(key, mask):
sock = key.fileobj
data = key.data
if mask & selectors.EVENT_READ:
recv_data = sock.recv(BUFFER_SIZE)
if recv_data:
print('Received {} from connection {}'.format(repr(recv_data), data.connid))
data.recv_total += len(recv_data)
if not recv_data or data.recv_total == data.msg_total:
print('Closing connection', data.connid)
selector.unregister(sock)
sock.close()
if mask & selectors.EVENT_WRITE:
if not data.outb and data.messages:
data.outb = data.messages.pop(0).encode()
if data.outb:
print('Sending {} to connection {}'.format(repr(data.outb), data.connid))
sent = sock.send(data.outb)
sock.shutdown(socket.SHUT_WR)
data.outb = data.outb[sent:]

if __name__ == '__main__':
host = 'localhost'
port = 12345
BUFFER_SIZE = 1024
start_connections(host, port, 2)

Now, we execute our new server and client implementation for multiple connections:

$ python tcp_server_selectors.py &
Openned socket for listening connections on localhost 12345

$ python tcp_server_selectors.py &
$ python tcp_client_selectors.py
Starting connection 1 towards ('localhost', 12345)
Starting connection 2 towards ('localhost', 12345)
Connection accepted in ('127.0.0.1', 7107)
Connection accepted in ('127.0.0.1', 7109)
Sending 'This is the first message' to connection 1
Sending 'This is the first message' to connection 2
Closing connection in ('127.0.0.1', 7107)
Closing connection in ('127.0.0.1', 7109)

As we can see, our clients communicate with our server and it echoes to verify that the messages were received.

In this section, we looked at non-blocking I/O with the socket and selectors modules to build an object-oriented server based on the I/O primitives.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset