Socket programs, particularly servers, must often be ready to perform many tasks at once. Example 19-1 accepts a connection request, then serves a single client until that client has finishedother connection requests must wait. This is not acceptable for servers in production use. Clients cannot wait too long: the server must be able to service multiple clients at once.
One approach that lets your program perform several tasks at once is threading, covered in Chapter 14. Module SocketServer optionally supports threading, as covered earlier in this chapter. An alternative to threading that can offer better performance and scalability is event-driven (also known as asynchronous) programming.
An event-driven program sits in an event loop, where it waits for events. In networking, typical events are "a client requests connection," "data arrived on a socket," and "a socket is available for writing." The program responds to each event by executing a small slice of work to service that event, then goes back to the event loop to wait for the next event. The Python library supports event-driven network programming with low-level select module and higher-level asyncore and asynchat modules. Even more complete support for event-driven programming is in the Twisted package (available at http://www.twistedmatrix.com), particularly in subpackage twisted.internet.
The select module exposes a cross-platform low-level function that lets you implement high-performance asynchronous network servers and clients. Module select offers additional platform-dependent functionality on Unix-like platforms, but I cover only cross-platform functionality in this book.
select |
select(inputs,outputs,excepts,timeout=None) |
inputs, outputs, and excepts are lists of socket objects waiting for input events, output events, and exceptional conditions, respectively. timeout is a float, the maximum time to wait in seconds. When timeout is None, there is no maximum wait: select waits until one or more objects receive events. When timeout is 0, select returns at once, without waiting.
select returns a tuple with three items (i,o,e). i is a list of zero or more of the items of inputs, those that received input events. o is a list of zero or more of the items of outputs, those that received output events. e is a list of zero or more of the items of excepts, those that received exceptional conditions (i.e., out-of-band data). Any or all of i, o, and e can be empty, but at least one of them is non-empty if timeout is None.
In addition to sockets, you can have in lists inputs, outputs, and excepts other objects that supply a method fileno, callable without arguments, returning a socket's file descriptor. For example, the server classes of module SocketServer, covered earlier in this chapter, follow this protocol. Therefore, you can have instances of those classes in the lists. On Unix-like platforms, select.select has wider applicability, since it can also accept file descriptors that do not refer to sockets. On Windows, however, select.select can accept only file descriptors that do refer to sockets.
Example 19-6 uses module select to reimplement the server of Example 19-1 with the added ability to serve any number of clients simultaneously.
import socket import select sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) sock.bind(('', 8881)) sock.listen(5) # lists of sockets to watch for input and output events ins = [sock] ous = [] # mapping socket -> data to send on that socket when feasible data = {} # mapping socket -> (host, port) on which the client is running adrs = {} try: while True: i, o, e = select.select(ins, ous, []) # no excepts nor timeout for x in i: if x is sock: # input event on sock means client trying to connect newSocket, address = sock.accept( ) print "Connected from", address ins.append(newSocket) adrs[newSocket] = address else: # other input events mean data arrived, or disconnections newdata = x.recv(8192) if newdata: # data arrived, prepare and queue the response to it print "%d bytes from %s" % (len(newdata), adrs[x]) data[x] = data.get(x, '') + newdata if x not in ous: ous.append(x) else: # a disconnect, give a message and clean up print "disconnected from", adrs[x] del adrs[x] try: ous.remove(x) except ValueError: pass x.close( ) for x in o: # output events always mean we can send some data tosend = data.get(x) if tosend: nsent = x.send(tosend) print "%d bytes to %s" % (nsent, adrs[x]) # remember data still to be sent, if any tosend = tosend[nsent:] if tosend: print "%d bytes remain for %s" % (len(tosend), adrs[x]) data[x] = tosend else: try: del data[x] except KeyError: pass ous.remove(x) print "No data currently remain for", adrs[x] finally: sock.close( )
Programming at such a low level incurs substantial complications, as shown by the complexity of Example 19-6 and its data structures. Run the server of Example 19-6 on a terminal window and try a few runs of Example 19-2 while the server is running. You should also try telnet localhost 8881 on other terminal windows (or other platform-dependent Telnet-like programs) to verify the behavior of longer-term connections.
The asyncore and asynchat modules help you implement high-performance asynchronous network servers and clients at a higher, more productive level than module select affords.
Module asyncore supplies one function.
loop |
loop( ) |
Implements the asynchronous event loop, dispatching all network events to previously instantiated dispatcher objects. loop terminates when all dispatcher objects (i.e., all communication channels) are closed.
Module asyncore also supplies class dispatcher, which supplies all methods of socket objects, plus specific methods for event-driven programming, with names starting with 'handle_'. Your class X subclasses dispatcher and overrides the handle_ methods for all events you need to handle. To initialize an instance d of dispatcher, you can pass an argument s, an already connected socket object. Otherwise, you must call:
d.create_socket(socket.AF_INET,socket.SOCK_STREAM)
and then call on d either connect, to connect to a server, or bind and listen, to have d itself be a server. The most frequently used methods of an instance d of a subclass X of dispatcher are the following.
create_socket |
d.create_socket(family,type) |
Creates d's socket with the given family and type. family is generally socket.AF_INET. type is generally socket.SOCK_STREAM, since class dispatcher normally uses a TCP (i.e., connection-based) socket.
handle_accept |
d.handle_accept( ) |
Called when a new client has connected. Your class X normally responds by calling self.accept, then instantiating another subclass Y of dispatcher with the resulting new socket, in order to handle the new client connection.
Your implementation of handle_accept need not return the resulting instance of Y: all instances of subclasses of dispatcher register themselves with the asyncore framework in method dispatcher._ _init_ _, so that asyncore calls back to their methods as appropriate.
handle_close |
d.handle_close( ) |
Called when the connection is closing.
handle_connect |
d.handle_connect( ) |
Called when the connection is starting.
handle_read |
d.handle_read( ) |
Called when the socket has new data that you can read without blocking.
handle_write |
d.handle_write( ) |
Called when the socket has buffer space, so you can write without blocking.
Module asyncore also supplies class dispatcher_with_send, a subclass of dispatcher that overrides one method.
send |
d.send(data) |
In class dispatcher_with_send, method d.send is equivalent to a socket object's method send_all in that it sends all the data. However, d.send does not send all the data at once and does not block; rather, d sends the data in small packets of 512 bytes each in response to handle_write events (callbacks). This strategy ensures good performance in simple cases.
Example 19-7 uses module asyncore to reimplement the server of Example 19-1, with the added ability to serve any number of clients simultaneously.
import asyncore import socket class MainServerSocket(asyncore.dispatcher): def __init_ _(self, port): asyncore.dispatcher.__init_ _(self) self.create_socket(socket.AF_INET, socket.SOCK_STREAM) self.bind(('',port)) self.listen(5) def handle_accept(self): newSocket, address = self.accept( ) print "Connected from", address SecondaryServerSocket(newSocket) class SecondaryServerSocket(asyncore.dispatcher_with_send): def handle_read(self): receivedData = self.recv(8192) if receivedData: self.send(receivedData) else: self.close( ) def handle_close(self): print "Disconnected from", self.getpeername( ) MainServerSocket(8881) asyncore.loop( )
The complexity of Example 19-7 is modest, comparable with that of Example 19-1. The additional functionality of serving multiple clients simultaneously, with the high performance and scalability of asynchronous event-driven programming, comes quite cheaply thanks to asyncore's power.
Note that method handle_read of SecondaryServerSocket can freely use self.send without precautions because SecondaryServerSocket subclasses dispatcher_with_send, which overrides method send to ensure that it sends all data passed to it. We could not do that if we had instead chosen to subclass asyncore.dispatcher directly.
The asynchat module supplies class async_chat, which subclasses asyncore.dispatcher and adds methods to support data buffering and line-oriented protocols. You subclass async_chat with your class X and override some methods. The most frequently used additional methods of an instance x of a subclass of async_chat are the following.
collect_incoming_data |
x.collect_incoming_data(data) |
Called whenever a byte string data of data arrives. Normally, x adds data to some buffer that x keeps, most often a list using the list's append method.
found_terminator |
x.found_terminator( ) |
Called whenever the terminator, set by method set_terminator, is found. Normally, x processes the buffer it keeps, then clears the buffer.
push |
x.push(data) |
Your class X normally doesn't override this method. The implementation in base class async_chat adds string data to an output buffer that it sends as appropriate. Method push is therefore quite similar to method send of class asyncore.dispatcher_with_send, but method push has a more sophisticated implementation to ensure good performance in more cases.
set_terminator |
x.set_terminator(terminator) |
Your class X normally doesn't override this method. terminator is normally '\r\n', the line terminator specified by most Internet protocols. terminator can also be None, to disable calls to found_terminator.
Example 19-8 uses module asynchat to reimplement the server of Example 19-7, with small differences due to using class asynchat.async_chat instead of class asyncore.dispatcher_with_send. To highlight async_chat's typical use, Example 19-8 responds (by echoing the received data back to the client, like all other server examples in this chapter) only when it has received a complete line (i.e., one ending with \n).
import asyncore, asynchat, socket class MainServerSocket(asyncore.dispatcher): def __init_ _(self, port): print 'initing MSS' asyncore.dispatcher.__init_ _(self) self.create_socket(socket.AF_INET, socket.SOCK_STREAM) self.bind(('',port)) self.listen(5) def handle_accept(self): newSocket, address = self.accept( ) print "Connected from", address SecondaryServerSocket(newSocket) class SecondaryServerSocket(asynchat.async_chat): def __init_ _(self, *args): print 'initing SSS' asynchat.async_chat.__init_ _(self, *args) self.set_terminator('\n') self.data = [] def collect_incoming_data(self, data): self.data.append(data) def found_terminator(self): self.push(''.join(self.data)) self.data = [] def handle_close(self): print "Disconnected from", self.getpeername( ) self.close( ) MainServerSocket(8881) asyncore.loop( )
To try out Example 19-8, we cannot use Example 19-2 as it stands because it does not ensure that it sends only entire lines terminated with \n. It doesn't take much to fix that, however. The following client program, for example, is quite suitable for testing Example 19-8, as well as any of the other server examples in this chapter:
import socket sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) sock.connect(('localhost', 8881)) print "Connected to server" data = """A few lines of data to test the operation of both server and client.""" for line in data.splitlines( ): sock.sendall(line+'\n') print "Sent:", line response = sock.recv(8192) print "Received:", response sock.close( )
The only difference in this code with respect to Example 19-2 is the change to the argument in the call to sock.sendall, in the first line of the loop body. This code simply adds a line terminator '\n', to ensure it interoperates with Example 19-8.
The Twisted package (available at http://www.twistedmatrix.com) is a freely available framework for network clients and servers. Twisted includes powerful, high-level components such as a web server, a user authentication system, a mail server, instant messaging, and so on. Each is highly scalable and easily customizable, and all are integrated to interoperate smoothly. It's a tribute to the power of Python and to the ingenuity of Twisted's developers that so much can be accomplished within the small compass of half a megabyte's worth of download.
The twisted.internet package is the low-level, highly stable part of Twisted that supports event-driven clients and servers. twisted.internet supplies module protocol, supporting protocol handlers and factories, and object reactor, embodying the concept of an event loop. Note that to make fully productive use of twisted.internet, you need a good understanding of the design patterns used in distributed computing. Douglas Schmidt, of the Center for Distributed Object Computing of Washington University, documents such design patterns at http://www.cs.wustl.edu/~schmidt/patterns-ace.html.
twisted.protocols implements many protocols that use twisted.internet's infrastructure, including SSH, DNS, FTP, HTTP, IRC, NNTP, POP3, SMTP, SocksV4, and Telnet.
A reactor object allows you to establish protocol factories as listeners (servers) on given TCP/IP ports (or other transports, such as SSL), and to connect protocol handlers as clients. You can choose different reactor implementations. The default reactor uses the select module covered earlier in this chapter. Other specialized reactors integrate with GUI toolkits' event loops, or use platform-specific techniques such as the Windows event loop or the poll system call support available in the select module on some Unix-like systems. The default reactor is often sufficient, but the extra flexibility of being able to use other implementations can help you to integrate GUIs or other platform-specific capabilities, or to achieve even higher performance and scalability.
A reactor object r supplies many methods. Client TCP APIs should be finalized by the time you read this book, but they're not definitive yet, so I do not cover them. The reactor methods most frequently used for programs that implement TCP/IP servers with twisted.internet are the following.
callLater |
r.callLater(delay,callable,*args,**kwds) |
Schedules a call to callable(*args,**kwds) to happen delay seconds from now. delay is a float, so it can also express fractions of a second. Returns an ID that you may pass to method cancelCallLater.
cancelCallLater |
r.cancelCallLater(ID) |
Cancels a call scheduled by method callLater. ID must be the result of a previous call to r.callLater.
listenTCP |
r.listenTCP(port,factory,backlog=5) |
Establishes factory, which must be an instance of class Factory (or any subclass of Factory), as the protocol handler for a TCP server on the given port. No more than backlog clients can be kept waiting for connection at any given time.
run |
r.run( ) |
Runs the event loop until r.stop( ) is called.
stop |
r.stop( ) |
Stops the event loop started by calling r.run( ).
A transport object embodies a network connection. Each protocol object calls methods on self.transport to write data to its counterpart and to disconnect. A transport object t supplies the following methods.
getHost |
t.getHost( ) |
Returns a tuple identifying this side of the connection. The first item indicates the kind of connection, while other items depend on the kind of connection. For a TCP connection, returns ('INET', host, port).
getPeer |
t.getPeer( ) |
Returns a tuple identifying the other side of the connection (easily confused by proxies, masquerading, firewalls, and so on), just like getHost's result.
loseConnection |
t.loseConnection( ) |
Tells t to disconnect as soon as t has finished writing all pending data.
write |
t.write(data) |
Transmits string data to the counterpart, or queues it up for transmission. t tries its best to ensure that all data you pass to write is eventually sent.
The reactor instantiates protocol handlers using a factory, and calls methods on protocol handler instances when events occur. A protocol handler subclasses class Protocol and overrides some methods. A protocol handler may use its factory, available as self.factory, as a repository for state that needs to be shared among handlers or persist across multiple instantiations. A protocol factory may subclass class Factory, but this subclassing is not always necessary since in many cases the stock Factory supplies all you need. Just set the protocol attribute of a Factory instance f to a class object that is an appropriate subclass of Protocol, then pass f to the reactor.
An instance p of a subclass of Protocol supplies the following methods.
connectionLost |
p.connectionLost(reason) |
Called when the connection to the counterpart has been closed. Argument reason is an object explaining why the connection has been closed. reason is not an instance of a Python exception, but has an attribute reason.value that normally is such an instance. You can use str(reason) to get an explanation string, including a brief traceback, or str(reason.value) to get just the explanation string without any traceback.
connectionMade |
p.connectionMade( ) |
Called when the connection to the counterpart has just succeeded.
dataReceived |
p.dataReceived(data) |
Called when string data has just been received from the counterpart.
Example 19-9 uses twisted.internet to implement an echo server with the ability to serve any number of clients simultaneously.
import twisted.internet.protocol import twisted.internet.reactor class EchoProtocol(twisted.internet.protocol.Protocol): def connectionMade(self): self.peer = self.transport.getPeer( )[1:] print "Connected from", self.peer def dataReceived(self, data): self.transport.write(data) def connectionLost(self, reason): print "Disconnected from", self.peer, reason.value factory = twisted.internet.protocol.Factory( ) factory.protocol = EchoProtocol twisted.internet.reactor.listenTCP(8881, factory) twisted.internet.reactor.run( )
Example 19-9 exhibits scalability at least as good as Example 19-7, yet it's easily the simplest of the echo server examples in this chaptera good indication of Twisted's power and simplicity. Note the statement:
factory.protocol = EchoProtocol
This binds the class object EchoProtocol as the attribute protocol of object factory. The right-hand side of the assignment must not be EchoProtocol( ), with parentheses after the class name. Such a right-hand side would call, and therefore instantiate, class EchoProtocol, and therefore the statement would bind to factory.protocol a protocol instance object rather than a protocol class object. Such a mistake would make the server fail pretty quickly.