When creating services to handle simultaneous requests, processing state must be managed. A common approach, threading, applies the method used for single user operations: keeping processing state within local variables and function arguments. In this case, the operating environment handles variable allocation and cleanup. Unfortunately, threading requires much preemption and locking magic to keep each thread's execution independent, yet allowing data to be shared between threads. This magic can lead to subtle programming errors, and, in the case of Python, slowness as every changes to a mutable object must be synchronized with the global interpreter lock.
An alternative is to use a event-driven approach, where each operation
is broken down into distinct steps, each step scheduled for operation in
an delayed execution queue. In this approach, an object is often used
used to maintain state information between each step, allowing other
operations to be performed. Often each step is a member function, queued
within Twisted using reactor.CallLater.
from twisted.internet import reactor
class Request:
def __init__(self, name):
self.name = name
reactor.callLater(0, self.step_one)
def step_one(self):
print self.name, ": Step One"
reactor.callLater(0, self.step_two)
def step_two(self):
print self.name, ": Step Two"
reactor.callLater(0, self.step_done)
def step_done(self):
# cleanup, and don't call reactor again.
pass
Request("James")
Request("Wendy")
# start and shut down the event loop
reactor.callLater(1, reactor.stop)
reactor.run()
In the output of this example, one can see that execution alternates between steps for James and Wendy's request. In effect, the requests are done in a parallel, cooperative manner. While this is a good start, it isn't perfect as the event loop is stopped with a hard-coded timeout. It would be better for each Request object to signal when it is completed so that the event loop can be shut down sooner.
In Twisted, communication between operations is often accomplished with
defer.Deferred. The Deferred object decouples an asyncronous
operation by providing a temporary storage location for its result. Instead
of calling the operation and waiting for a return value, an operation
schedules itself to be executed with reactor.callLaterand then
returns a defer.Deferred object. At this point, the caller
can register a callback function to be executed when the operation has
completed. In the example below, defer.DeferredList is used
to merge the result of each request into a single notification, which is
used to stop the main event loop.
from twisted.internet import reactor, defer
class Request:
def __init__(self, name):
self.name = name
self.d = defer.Deferred()
reactor.callLater(0, self.step_one)
def step_one(self):
print self.name, ": Step One"
reactor.callLater(0, self.step_two)
def step_two(self):
print self.name, ": Step Two"
reactor.callLater(0, self.step_done)
def step_done(self):
self.d.callback(None) # notify callback that we are done
james = Request("James")
wendy = Request("Wendy")
# start and shut down the event loop
d = defer.DeferredList([james.d, wendy.d])
d.addCallback(lambda _: reactor.stop())
reactor.run()
While this Deferred approach is very good, it can get
quickly complicated, especially if the request is not a simple linear
sequence of steps, or when results must flow between steps incrementally.
The flow module addresses these shortcomings using python generators.
An iterator is basically an object which produces a sequence of values.
Python's iterators are simply objects with an __iter__()
member function which returns an object (usually itself) which has a
next() member function. The next() method is
then invoked till it raises a StopIteration exception.
from twisted.python.compat import iter, StopIteration
class Counter:
def __init__(self, count):
self.count = count
def __iter__(self):
return self
def next(self):
ret = self.count
self.count -= 1
if ret: return ret
raise StopIteration
return ret
import sys
if sys.version_info < (2,2):
def list(it):
ret = []
it = iter(it)
try:
while 1:
ret.append(it.next())
except StopIteration: pass
return ret
print list(Counter(3))
# prints: [3, 2, 1]
Often times it is useful for an iterator to change state during its production of values. This can be done nicely with the 'state' pattern.
class States:
def __iter__(self):
self.next = self.state_one
return self
def state_one(self):
self.next = self.state_two
return "one"
def state_two(self):
self.next = self.state_stop
return "two"
def state_stop(self):
raise StopIteration
print list(States())
# prints: ['one', 'two']
With Python 2.2, there is a wonderful syntax sugar for creating
iterators... generators. When a generator is first executed, an iterator
is returned. And from there on, each invocation of next()
gives the subsequent value produced by the yield statement.
With generators, the two iterators above become very easy to express.
from __future__ import generators
def Counter(count):
while count > 0:
yield count
count -= 1
def States():
yield "one"
yield "two"
print list(Counter(3))
print list(States())
# prints:
# [3, 2, 1]
# ['one', 'two']
One technical difference between iterators and generators, is that raising
an exception from a generator permanently halts the generator, while raising
an exception from an iterator's next() method does not
invalidate the iterator, that is, one could call the next()
method again and possibly get results. From here on, we use the generator
syntax for building iterators.
It is often useful to view an operation information as a flow between stages, where each stage may have several states or steps. This can be coded where the output of one generator is consumed by another. In this view, the last generator in the chain 'pulls' data from previous stages.
from __future__ import generators
def Counter(count):
while count > 0:
yield count
count -= 1
def Consumer():
producer = Counter(3)
for result in producer:
if 2 != result:
yield result
print list(Consumer())
# prints: [3, 1]
While this is a very clean syntax for creating a multi-stage operation, it would block all other operations. Therefore, some mechanism for pausing the generator and resuming it later is required.
The flow module provides this ability to cooperate with other tasks. This is accomplished by wrapping iterables in a flow stage object and following an alternating yield pattern. That is, before each value pulled from the stage, the operation must yield the wrapper object. During this yield bookkeeping is done to prepare the next value, or, if the next value is not available, re-scheduling the operation to be executed later.
from __future__ import generators
from twisted.flow import flow
def Counter(count):
while count > 0:
yield count
count -= 1
def Consumer():
producer = flow.wrap(Counter(3))
yield producer
for result in producer:
if 2 != result:
yield result
yield producer
print list(flow.Block(Consumer))
# prints: [3, 1]
In the above code, producer.next() is called
implicitly, and thus the generator above is equivalent to...
from __future__ import generators
def Consumer():
producer = flow.wrap(Counter(3))
while True:
yield producer
result = producer.next()
if 2 != result:
yield result
The next() method of the wrapper object does several
things. First, it checks to see if there are results ready, if so it
returns the next one. If not, it looks for a failure, raising it.
And finally, checking to see if the end of the input has been reached.
More concretely...
from __future__ import generators
def Consumer():
producer = flow.wrap(Counter(3))
while True:
yield producer
if producer.results:
result = producer.results.pop(0)
yield result
continue
if producer.failure:
producer.stop = 1
producer.failure.trap()
if producer.stop:
break
Another difference between plain old iterables and one wrapped with
the flow module is that exceptions caught are wrapped with a
twisted.python.failure.Failure object for later delivery.
There are two basic ways to recover from exceptions. One way is to list
expected exceptions in the call to flow.wrap. Alternatively,
a try/except block can be used, catching flow.Failure objects.
from __future__ import generators
from twisted.flow import flow
def Producer(throw):
yield 1
yield 2
raise throw
yield 3
def Consumer(producer):
producer = flow.wrap(producer, IOError)
yield producer
try:
for result in producer:
if result is IOError:
# handle trapped error
yield "trapped"
else:
yield result
yield producer
except AssertionError, err:
# handle assertion error
yield str(err)
print list(flow.Block(Consumer(Producer(IOError("trap")))))
print list(flow.Block(Consumer(Producer(AssertionError("notrap")))))
# prints: [1, 2, 'trapped']
# prints: [1, 2, 'notrap']
This seems like quite the effort, wrapping each iterator and then
having to alter the calling sequence. Why? The answer is that it
allows for a flow.Cooperate object to be returned. When
this happens, the entire call chain can be paused so that other flows
can use the call stack. For flow.Block, the implementation of Cooperate
simply puts the call chain to sleep.
from __future__ import generators
from twisted.flow import flow
def gen():
yield 'immediate'
yield flow.Cooperate(2)
yield 'delayed'
for x in flow.Block(gen):
print x
# prints:
# immediate
# delayed
Cooperate can be demonstrated with flow.Merge and
flow.Zip components. These two stages join two or more
generators into a single stream. The Merge operation does this by
rotating between any of its input streams which are available. The
Zip operation, on the other hand, waits for a result from each stream
before it produces a result.
from __future__ import generators
from twisted.flow import flow
def Right():
yield "one"
yield "two"
yield flow.Cooperate()
yield "three"
def Left():
yield 1
yield 2
yield 3
print "Zip", list(flow.Block(flow.Zip(Right,Left)))
print "Merge", list(flow.Block(flow.Merge(Right,Left)))
# Zip [('one', 1), ('two', 2), ('three', 3)]
# Merge ['one', 1, 'two', 2, 3, 'three']
While flow.Block is useful for understanding how flow
works, it undermines the whole purpose by sleeping during Cooperate
and blocking other operations. Following are numerous examples of how
to integrate flow with the Twisted framework in a non-blocking manner.
For starters, the long example of Wendy and James, with its numerous
calls to reactor.callLater to schedule each step of the
operation can be rewritten using flow.Deferred.
from __future__ import generators
from twisted.internet import reactor, defer
from twisted.flow import flow
def request(name):
print name, ": Step One"
yield flow.Cooperate()
print name, ": Step Two"
james = flow.Deferred(request("James"))
wendy = flow.Deferred(request("Wendy"))
# start and shut down the event loop
d = defer.DeferredList([wendy, james])
d.addCallback(lambda _: reactor.stop())
reactor.run()
Under the sheets, when flow.Deferred encounters a
flow.Cooperate event, it reschedules itself to be resumed
at a later time, allowing other asyncronous operations to proceed. Once
again, defer.DeferredList is only used here to stop the
reactor after all operations are completed.
By using flow.Deferred it is easy to make up a web
resource which is both long running, but also can serve more than
one customer at a time. Run the example below, and with two
browsers, view the webpage. Notice that both web pages are
being created at the same time.
from __future__ import generators
from twisted.internet import reactor
from twisted.web import server, resource
from twisted.flow import flow
def cooperative(count):
""" simulate a cooperative resource, that not block """
from random import random
idx = 0
while idx < count:
val = random()
yield flow.Cooperate(val)
yield str(val)[-5:]
idx += 1
def flowRender(req):
count = int(req.args.get("count",["30"])[0])
req.write("<html><body>")
req.write(" %s Random numbers: <ol>\n" % count)
source = flow.wrap(cooperative(count))
yield source
for itm in source:
req.write("<li>%s</li>\n" % itm)
yield source
req.write("</ol></body></html>\n")
class FlowResource(resource.Resource):
def __init__(self, gen):
resource.Resource.__init__(self)
self.gen = gen
def isLeaf(self): return true
def render(self, req):
self.d = flow.Deferred(self.gen(req))
self.d.addCallback(lambda _: req.finish())
return server.NOT_DONE_YET
print "visit http://localhost:8081/ to view the example"
root = FlowResource(flowRender)
site = server.Site(root)
reactor.listenTCP(8081,site)
reactor.run()
The flow module can also be used to construct protocols easily, following is an echo client and server. For each protocol, one must yield the connection before reading from it. When the generator finishes, the connection is automatically closed.
from __future__ import generators
from twisted.flow import flow
from twisted.internet import protocol, reactor
PORT = 8392
def echoServer(conn):
yield conn
for data in conn:
conn.write(data)
yield conn
reactor.callLater(0,reactor.stop)
server = protocol.ServerFactory()
server.protocol = flow.makeProtocol(echoServer)
reactor.listenTCP(PORT,server)
def echoClient(conn):
conn.write("Hello World")
yield conn
print conn.next()
conn.write("Another Line")
yield conn
print conn.next()
client = protocol.ClientFactory()
client.protocol = flow.makeProtocol(echoClient)
reactor.connectTCP("localhost", PORT, client)
reactor.run()
While the Flow module allows for multiple cooperative tasks
to work in a single thread, sometimes it is necessary to have
the output of another thread be consumed within a flow. This
can be done with flow.Threaded, which takes an
iterable object and executes it in another thread. Following
is a sample iterable, countSleep which simulates a blocking
producer which must be put into a thread. To show that
flow.Threaded does not block other operations,
a similar, cooperative count is included.
from __future__ import generators
from twisted.internet import reactor, defer
from twisted.flow import flow
def countSleep(index):
from time import sleep
for index in range(index):
sleep(.3)
print "sleep", index
yield index
def countCooperate(index):
for index in range(index):
yield flow.Cooperate(.1)
print "cooperate", index
yield "coop %s" % index
d = flow.Deferred( flow.Merge(
flow.Threaded(countSleep(5)),
countCooperate(5)))
# # alternatively
# d1 = flow.Deferred(flow.Threaded(countSleep(5)))
# d2 = flow.Deferred(countCooperate(10))
# d = defer.DeferredList([d1,d2])
def prn(x):
print x
reactor.stop()
d.addCallback(prn)
reactor.run()
Since most standard database drivers are thread based,
the flow builds on the ThreadedIterator by
providing a QueryIterator, which takes an sql
query and a ConnectionPool.
from __future__ import generators
from twisted.enterprise import adbapi
from twisted.internet import reactor
from twisted.flow import flow
dbpool = adbapi.ConnectionPool("SomeDriver",host='localhost',
db='Database',user='User',passwd='Password')
sql = """
(SELECT 'one')
UNION ALL
(SELECT 'two')
UNION ALL
(SELECT 'three')
"""
def consumer():
query = flow.Threaded(flow.QueryIterator(dbpool, sql))
yield query
for row in query:
print "Processed result : ", row
from twisted.internet import reactor
def finish(result):
print "Deferred Complete : ", result
f = flow.Deferred(consumer())
f.addBoth(finish)
reactor.callLater(1,reactor.stop)
reactor.run()
# prints
# Processed result : ('one',)
# Processed result : ('two',)
# Processed result : ('three',)
# Deferred Complete: []