Metadata-Version: 1.0
Name: zc.async
Version: 1.1.1
Summary: Perform durable tasks asynchronously
Home-page: UNKNOWN
Author: Gary Poster
Author-email: gary@zope.com
License: ZPL
Description: ~~~~~~~~
        zc.async
        ~~~~~~~~
        
        .. contents::
        
        ============
        Introduction
        ============
        
        Goals
        =====
        
        The zc.async package provides a way to schedule jobs to be performed
        out-of-band from your current thread.  The job might be done in another thread
        or another process, possibly on another machine.  Here are some example core
        use cases.
        
        - You want to let users do something that requires a lot of system
        resources from your application, such as creating a large PDF.  Naively
        done, six or seven simultaneous PDF requests will consume your
        application thread pool and could make your application unresponsive to
        any other users.
        
        - You want to let users spider a web site; communicate with a credit card
        company; query a large, slow LDAP database on another machine; or do
        some other action that generates network requests from the server.
        System resources might not be a problem, but, again, if something goes
        wrong, several requests could make your application unresponsive.
        
        - Perhaps because of resource contention, you want to serialize work
        that can be done asynchronously, such as updating a single data structure
        like a catalog index.
        
        - You want to decompose and parallelize a single job across many machines so
        it can be finished faster.
        
        - You have an application job that you discover is taking longer than users can
        handle, even after you optimize it.  You want a quick fix to move the work
        out-of-band.
        
        Many of these core use cases involve end-users being able to start potentially
        expensive processes, on demand.  Basic scheduled tasks are also provided by this
        package, though recurrence must be something you arrange.
        
        History
        =======
        
        This is a second-generation design.  The first generation was `zasync`,
        a mission-critical and successful Zope 2 product in use for a number of
        high-volume Zope 2 installations.  [#async_history]_ It's worthwhile noting
        that zc.async has absolutely no backwards compatibility with zasync and
        zc.async does not require Zope (although it can be used in conjuction with it,
        details below).
        
        Design Overview
        ===============
        
        ---------------
        Overview: Usage
        ---------------
        
        Looking at the design from the perspective of regular usage, your code obtains
        a ``queue``, which is a place to register jobs to be performed asynchronously.
        
        Your application calls ``put`` on the queue to register a job.  The job must be
        a pickleable, callable object; a global function, a callable persistent object,
        a method of a persistent object, or a special zc.async.job.Job object
        (discussed later) are all examples of suitable objects.  The job by default is
        registered to be performed as soon as possible, but can be registered to be
        called at a certain time.
        
        The ``put`` call will return a zc.async.job.Job object.  This object represents
        both the callable and its deferred result.  It has information about the job
        requested, the current state of the job, and the result of performing the job.
        
        An example spelling for registering a job might be ``self.pending_result =
        queue.put(self.performSpider)``.  The returned object can be stored and polled
        to see when the job is complete; or the job can be configured to do additional
        work when it completes (such as storing the result in a data structure).
        
        -------------------
        Overview: Mechanism
        -------------------
        
        Multiple processes, typically spread across multiple machines, can
        connect to the queue and claim and perform work.  As with other
        collections of processes that share pickled objects, these processes
        generally should share the same software (though some variations on this
        constraint should be possible).
        
        A process that should claim and perform work, in addition to a database
        connection and the necessary software, needs a ``dispatcher`` with a
        ``reactor`` to provide a heartbeat.  The dispatcher will rely on one or more
        persistent ``agents`` in the queue (in the database) to determine which jobs
        it should perform.
        
        A ``dispatcher`` is in charge of dispatching queued work for a given
        process to worker threads.  It works with one or more queues and a
        single reactor.  It has a universally unique identifier (UUID), which is
        usually an identifier of the application instance in which it is
        running.  The dispatcher starts jobs in dedicated threads.
        
        A ``reactor`` is something that can provide an eternal loop, or heartbeat,
        to power the dispatcher.  It can be the main twisted reactor (in the
        main thread); another instance of a twisted reactor (in a child thread);
        or any object that implements a small subset of the twisted reactor
        interface (see discussion in dispatcher.txt, and example testing reactor in
        testing.py, used below).
        
        An ``agent`` is a persistent object in a queue that is associated with a
        dispatcher and is responsible for picking jobs and keeping track of
        them. Zero or more agents within a queue can be associated with a
        dispatcher.  Each agent for a given dispatcher in a given queue is
        identified uniquely with a name [#identifying_agent]_.
        
        Generally, these work together as follows.  The reactor calls the
        dispatcher. The dispatcher tries to find the mapping of queues in the
        database root under a key of ``zc.async`` (see constant
        zc.async.interfaces.KEY).  If it finds the mapping, it iterates
        over the queues (the mapping's values) and asks each queue for the
        agents associated with the dispatcher's UUID.  The dispatcher then is
        responsible for seeing what jobs its agents want to do from the queue,
        and providing threads and connections for the work to be done.  The
        dispatcher then asks the reactor to call itself again in a few seconds.
        
        Reading More
        ============
        
        This document continues on with three other main sections: `Usage`_,
        `Configuration without Zope 3`_, and `Configuration with Zope 3`_.
        
        Other documents in the package are primarily geared as maintainer
        documentation, though the author has tried to make them readable and
        understandable.
        
        =====
        Usage
        =====
        
        Overview and Basics
        ===================
        
        The basic usage of zc.async does not depend on a particular configuration
        of the back-end mechanism for getting the jobs done.  Moreover, on some
        teams, it will be the responsibility of one person or group to configure
        zc.async, but a service available to the code of all team members.  Therefore,
        we begin our detailed discussion with regular usage, assuming configuration
        has already happened.  Subsequent sections discuss configuring zc.async
        with and without Zope 3.
        
        So, let's assume we have a queue with dispatchers, reactors and agents all
        waiting to fulfill jobs placed into the queue.  We start with a connection
        object, ``conn``, and some convenience functions introduced along the way that
        help us simulate time passing and work being done [#usageSetUp]_.
        
        -------------------
        Obtaining the queue
        -------------------
        
        First, how do we get the queue?  Your installation may have some
        conveniences.  For instance, the Zope 3 configuration described below
        makes it possible to get the primary queue with an adaptation call like
        ``zc.async.interfaces.IQueue(a_persistent_object_with_db_connection)``.
        
        But failing that, queues are always expected to be in a zc.async.queue.Queues
        mapping found off the ZODB root in a key defined by the constant
        zc.async.interfaces.KEY.
        
        >>> import zc.async.interfaces
        >>> zc.async.interfaces.KEY
        'zc.async'
        >>> root = conn.root()
        >>> queues = root[zc.async.interfaces.KEY]
        >>> import zc.async.queue
        >>> isinstance(queues, zc.async.queue.Queues)
        True
        
        As the name implies, ``queues`` is a collection of queues. As discussed later,
        it's possible to have multiple queues, as a tool to distribute and control
        work. We will assume a convention of a queue being available in the '' (empty
        string).
        
        >>> queues.keys()
        ['']
        >>> queue = queues['']
        
        -------------
        ``queue.put``
        -------------
        
        Now we want to actually get some work done.  The simplest case is simple
        to perform: pass a persistable callable to the queue's ``put`` method and
        commit the transaction.
        
        >>> def send_message():
        ...     print "imagine this sent a message to another machine"
        >>> job = queue.put(send_message)
        >>> import transaction
        >>> transaction.commit()
        
        Note that this won't really work in an interactive session: the callable needs
        to be picklable, as discussed above, so ``send_message`` would need to be
        a module global, for instance.
        
        The ``put`` returned a job.  Now we need to wait for the job to be
        performed.  We would normally do this by really waiting.  For our
        examples, we will use a helper method on the testing reactor to ``wait_for``
        the job to be completed.
        
        >>> reactor.wait_for(job)
        imagine this sent a message to another machine
        
        We also could have used the method of a persistent object.  Here's another
        quick example.
        
        First we define a simple persistent.Persistent subclass and put an instance of
        it in the database [#commit_for_multidatabase]_.
        
        >>> import persistent
        >>> class Demo(persistent.Persistent):
        ...     counter = 0
        ...     def increase(self, value=1):
        ...         self.counter += value
        ...
        >>> root['demo'] = Demo()
        >>> transaction.commit()
        
        Now we can put the ``demo.increase`` method in the queue.
        
        >>> root['demo'].counter
        0
        >>> job = queue.put(root['demo'].increase)
        >>> transaction.commit()
        
        >>> reactor.wait_for(job)
        >>> root['demo'].counter
        1
        
        The method was called, and the persistent object modified!
        
        To reiterate, only pickleable callables such as global functions and the
        methods of persistent objects can be used. This rules out, for instance,
        lambdas and other functions created dynamically. As we'll see below, the job
        instance can help us out there somewhat by offering closure-like features.
        
        --------------
        ``queue.pull``
        --------------
        
        If you put a job into a queue and it hasn't been claimed yet and you want to
        cancel the job, ``pull`` it from the queue.
        
        >>> len(queue)
        0
        >>> job = queue.put(send_message)
        >>> len(queue)
        1
        >>> job is queue.pull()
        True
        >>> len(queue)
        0
        
        ---------------
        Scheduled Calls
        ---------------
        
        When using ``put``, you can also pass a datetime.datetime to schedule a call. A
        datetime without a timezone is considered to be in the UTC timezone.
        
        >>> t = transaction.begin()
        >>> import datetime
        >>> import pytz
        >>> datetime.datetime.now(pytz.UTC)
        datetime.datetime(2006, 8, 10, 15, 44, 33, 211, tzinfo=<UTC>)
        >>> job = queue.put(
        ...     send_message, begin_after=datetime.datetime(
        ...         2006, 8, 10, 15, 56, tzinfo=pytz.UTC))
        >>> job.begin_after
        datetime.datetime(2006, 8, 10, 15, 56, tzinfo=<UTC>)
        >>> transaction.commit()
        >>> reactor.wait_for(job, attempts=2) # +5 virtual seconds
        TIME OUT
        >>> reactor.wait_for(job, attempts=2) # +5 virtual seconds
        TIME OUT
        >>> datetime.datetime.now(pytz.UTC)
        datetime.datetime(2006, 8, 10, 15, 44, 43, 211, tzinfo=<UTC>)
        
        >>> zc.async.testing.set_now(datetime.datetime(
        ...     2006, 8, 10, 15, 56, tzinfo=pytz.UTC))
        >>> reactor.wait_for(job)
        imagine this sent a message to another machine
        >>> datetime.datetime.now(pytz.UTC) >= job.begin_after
        True
        
        If you set a time that has already passed, it will be run as if it had
        been set to run as soon as possible [#already_passed]_...unless the job
        has already timed out, in which case the job fails with an
        abort [#already_passed_timed_out]_.
        
        The queue's ``put`` method is the essential API. ``pull`` is used rarely. Other
        methods are used to introspect, but are not needed for basic usage.
        
        But what is that result of the ``put`` call in the examples above?  A
        job?  What do you do with that?
        
        Jobs
        ====
        
        --------
        Overview
        --------
        
        The result of a call to `put` returns an IJob.  The
        job represents the pending result.  This object has a lot of
        functionality that's explored in other documents in this package, and
        demonstrated a bit below, but here's a summary.
        
        - You can introspect, and even modify, the call and its
        arguments.
        
        - You can specify that the job should be run serially with others
        of a given identifier.
        
        - You can specify other calls that should be made on the basis of the
        result of this call.
        
        - You can persist a reference to it, and periodically (after syncing
        your connection with the database, which happens whenever you begin or
        commit a transaction) check its `state` to see if it is equal to
        zc.async.interfaces.COMPLETED.  When it is, the call has run to
        completion, either to success or an exception.
        
        - You can look at the result of the call (once COMPLETED).  It might be
        the result you expect, or a zc.twist.Failure, which is a
        subclass of twisted.python.failure.Failure, way to safely communicate
        exceptions across connections and machines and processes.
        
        -------
        Results
        -------
        
        So here's a simple story.  What if you want to get a result back from a
        call?  Look at the job.result after the call is COMPLETED.
        
        >>> def imaginaryNetworkCall():
        ...     # let's imagine this makes a network call...
        ...     return "200 OK"
        ...
        >>> job = queue.put(imaginaryNetworkCall)
        >>> print job.result
        None
        >>> job.status == zc.async.interfaces.PENDING
        True
        >>> transaction.commit()
        >>> reactor.wait_for(job)
        >>> t = transaction.begin()
        >>> job.result
        '200 OK'
        >>> job.status == zc.async.interfaces.COMPLETED
        True
        
        --------
        Closures
        --------
        
        What's more, you can pass a Job to the `put` call.  This means that you
        aren't constrained to simply having simple non-argument calls performed
        asynchronously, but you can pass a job with a call, arguments, and
        keyword arguments--effectively, a kind of closure.  Here's a quick example.
        We'll use the demo object, and its increase method, that we introduced
        above, but this time we'll include some arguments [#job]_.
        
        With positional arguments:
        
        >>> t = transaction.begin()
        >>> job = queue.put(
        ...     zc.async.job.Job(root['demo'].increase, 5))
        >>> transaction.commit()
        >>> reactor.wait_for(job)
        >>> t = transaction.begin()
        >>> root['demo'].counter
        6
        
        With keyword arguments (``value``):
        
        >>> job = queue.put(
        ...     zc.async.job.Job(root['demo'].increase, value=10))
        >>> transaction.commit()
        >>> reactor.wait_for(job)
        >>> t = transaction.begin()
        >>> root['demo'].counter
        16
        
        Note that arguments to these jobs can be any persistable object.
        
        --------
        Failures
        --------
        
        What happens if a call raises an exception?  The return value is a Failure.
        
        >>> def I_am_a_bad_bad_function():
        ...     return foo + bar
        ...
        >>> job = queue.put(I_am_a_bad_bad_function)
        >>> transaction.commit()
        >>> reactor.wait_for(job)
        >>> t = transaction.begin()
        >>> job.result
        <zc.twist.Failure exceptions.NameError>
        
        Failures can provide useful information such as tracebacks.
        
        >>> print job.result.getTraceback()
        ... # doctest: +ELLIPSIS +NORMALIZE_WHITESPACE
        Traceback (most recent call last):
        ...
        exceptions.NameError: global name 'foo' is not defined
        <BLANKLINE>
        
        ---------
        Callbacks
        ---------
        
        You can register callbacks to handle the result of a job, whether a
        Failure or another result.
        
        Note that, unlike callbacks on a Twisted deferred, these callbacks do not
        change the result of the original job. Since callbacks are jobs, you can chain
        results, but generally callbacks for the same job all get the same result as
        input.
        
        Also note that, during execution of a callback, there is no guarantee that
        the callback will be processed on the same machine as the main call.  Also,
        some of the ``local`` functions, discussed below, will not work as desired.
        
        Here's a simple example of reacting to a success.
        
        >>> def I_scribble_on_strings(string):
        ...     return string + ": SCRIBBLED"
        ...
        >>> job = queue.put(imaginaryNetworkCall)
        >>> callback = job.addCallback(I_scribble_on_strings)
        >>> transaction.commit()
        >>> reactor.wait_for(job)
        >>> job.result
        '200 OK'
        >>> callback.result
        '200 OK: SCRIBBLED'
        
        Here's a more complex example of handling a Failure, and then chaining
        a subsequent callback.
        
        >>> def I_handle_NameErrors(failure):
        ...     failure.trap(NameError) # see twisted.python.failure.Failure docs
        ...     return 'I handled a name error'
        ...
        >>> job = queue.put(I_am_a_bad_bad_function)
        >>> callback1 = job.addCallbacks(failure=I_handle_NameErrors)
        >>> callback2 = callback1.addCallback(I_scribble_on_strings)
        >>> transaction.commit()
        >>> reactor.wait_for(job)
        >>> job.result
        <zc.twist.Failure exceptions.NameError>
        >>> callback1.result
        'I handled a name error'
        >>> callback2.result
        'I handled a name error: SCRIBBLED'
        
        Advanced Techniques and Tools
        =============================
        
        **Important**
        
        The job and its functionality described above are the core zc.async tools.
        
        The following are advanced techniques and tools of various complexities. You
        can use zc.async very productively without ever understanding or using them. If
        the following do not make sense to you now, please just move on for now.
        
        --------------
        zc.async.local
        --------------
        
        Jobs always run their callables in a thread, within the context of a
        connection to the ZODB. The callables have access to five special
        thread-local functions if they need them for special uses.  These are
        available off of zc.async.local.
        
        ``zc.async.local.getJob()``
        The ``getJob`` function can be used to examine the job, to get
        a connection off of ``_p_jar``, to get the queue into which the job
        was put, or other uses.
        
        ``zc.async.local.getQueue()``
        The ``getQueue`` function can be used to examine the queue, to put another
        task into the queue, or other uses. It is sugar for
        ``zc.async.local.getJob().queue``.
        
        ``zc.async.local.setLiveAnnotation(name, value, job=None)``
        The ``setLiveAnnotation`` tells the agent to set an annotation on a job,
        by default the current job, *in another connection*.  This makes it
        possible to send messages about progress or for coordination while in the
        middle of other work.
        
        As a simple rule, only send immutable objects like strings or
        numbers as values [#setLiveAnnotation]_.
        
        ``zc.async.local.getLiveAnnotation(name, default=None, timeout=0, poll=1, job=None)``
        The ``getLiveAnnotation`` tells the agent to get an annotation for a job,
        by default the current job, *from another connection*.  This makes it
        possible to send messages about progress or for coordination while in the
        middle of other work.
        
        As a simple rule, only ask for annotation values that will be
        immutable objects like strings or numbers [#getLiveAnnotation]_.
        
        If the ``timeout`` argument is set to a positive float or int, the function
        will wait that at least that number of seconds until an annotation of the
        given name is available. Otherwise, it will return the ``default`` if the
        name is not present in the annotations. The ``poll`` argument specifies
        approximately how often to poll for the annotation, in seconds (to be more
        precise, a subsequent poll will be min(poll, remaining seconds until
        timeout) seconds away).
        
        ``zc.async.local.getReactor()``
        The ``getReactor`` function returns the job's dispatcher's reactor.  The
        ``getLiveAnnotation`` and ``setLiveAnnotation`` functions use this,
        along with the zc.twist package, to work their magic; if you are feeling
        adventurous, you can do the same.
        
        ``zc.async.local.getDispatcher()``
        The ``getDispatcher`` function returns the job's dispatcher.  This might
        be used to analyze its non-persistent poll data structure, for instance
        (described later in configuration discussions).
        
        Let's give three of those a whirl. We will write a function that examines the
        job's state while it is being called, and sets the state in an annotation, then
        waits for our flag to finish.
        
        >>> def annotateStatus():
        ...     zc.async.local.setLiveAnnotation(
        ...         'zc.async.test.status',
        ...         zc.async.local.getJob().status)
        ...     zc.async.local.getLiveAnnotation(
        ...         'zc.async.test.flag', timeout=5)
        ...     return 42
        ...
        >>> job = queue.put(annotateStatus)
        >>> transaction.commit()
        >>> import time
        >>> def wait_for_annotation(job, key):
        ...     reactor.time_flies(dispatcher.poll_interval) # starts thread
        ...     for i in range(10):
        ...         while reactor.time_passes():
        ...             pass
        ...         transaction.begin()
        ...         if key in job.annotations:
        ...             break
        ...         time.sleep(0.1)
        ...     else:
        ...         print 'Timed out' + repr(dict(job.annotations))
        ...
        >>> wait_for_annotation(job, 'zc.async.test.status')
        >>> job.annotations['zc.async.test.status'] == (
        ...     zc.async.interfaces.ACTIVE)
        True
        >>> job.status == zc.async.interfaces.ACTIVE
        True
        
        [#stats_1]_
        
        >>> job.annotations['zc.async.test.flag'] = True
        >>> transaction.commit()
        >>> reactor.wait_for(job)
        >>> job.result
        42
        
        [#stats_2]_ ``getReactor`` and ``getDispatcher`` are for advanced use
        cases and are not explored further here.
        
        ----------
        Job Quotas
        ----------
        
        One class of asynchronous jobs are ideally serialized.  For instance,
        you may want to reduce or eliminate the chance of conflict errors when
        updating a text index.  One way to do this kind of serialization is to
        use the ``quota_names`` attribute of the job.
        
        For example, let's first show two non-serialized jobs running at the
        same time, and then two serialized jobs created at the same time.
        The first part of the example does not use queue_names, to show a contrast.
        
        For our parallel jobs, we'll do something that would create a deadlock
        if they were serial.  Notice that we are mutating the job arguments after
        creation to accomplish this, which is supported.
        
        >>> def waitForParallel(other):
        ...     zc.async.local.setLiveAnnotation(
        ...         'zc.async.test.flag', True)
        ...     zc.async.local.getLiveAnnotation(
        ...         'zc.async.test.flag', job=other, timeout=0.4, poll=0)
        ...
        >>> job1 = queue.put(waitForParallel)
        >>> job2 = queue.put(waitForParallel)
        >>> job1.args.append(job2)
        >>> job2.args.append(job1)
        >>> transaction.commit()
        >>> reactor.wait_for(job1, job2)
        >>> job1.status == zc.async.interfaces.COMPLETED
        True
        >>> job2.status == zc.async.interfaces.COMPLETED
        True
        >>> job1.result is job2.result is None
        True
        
        On the other hand, for our serial jobs, we'll do something that would fail
        if it were parallel.  We'll rely on ``quota_names``.
        
        Quotas verge on configuration, which is not what this section is about,
        because they must be configured on the queue.  However, they also affect
        usage, so we show them here.
        
        >>> def pause(other):
        ...     zc.async.local.setLiveAnnotation(
        ...         'zc.async.test.flag', True)
        ...     res = zc.async.local.getLiveAnnotation(
        ...         'zc.async.test.flag', timeout=0.4, poll=0.1, job=other)
        ...
        >>> job1 = queue.put(pause)
        >>> job2 = queue.put(imaginaryNetworkCall)
        
        You can't put a name in ``quota_names`` unless the quota has been created
        in the queue.
        
        >>> job1.quota_names = ('test',)
        Traceback (most recent call last):
        ...
        ValueError: ('unknown quota name', 'test')
        >>> queue.quotas.create('test')
        >>> job1.quota_names = ('test',)
        >>> job2.quota_names = ('test',)
        
        Now we can see the two jobs being performed serially.
        
        >>> job1.args.append(job2)
        >>> transaction.commit()
        >>> reactor.time_flies(dispatcher.poll_interval)
        1
        >>> for i in range(10):
        ...     t = transaction.begin()
        ...     if job1.status == zc.async.interfaces.ACTIVE:
        ...         break
        ...     time.sleep(0.1)
        ... else:
        ...     print 'TIME OUT'
        ...
        >>> job2.status == zc.async.interfaces.PENDING
        True
        >>> job2.annotations['zc.async.test.flag'] = False
        >>> transaction.commit()
        >>> reactor.wait_for(job1)
        >>> reactor.wait_for(job2)
        >>> print job1.result
        None
        >>> print job2.result
        200 OK
        
        Quotas can be configured for limits greater than one at a time, if desired.
        This may be valuable when a needed resource is only available in limited
        numbers at a time.
        
        Note that, while quotas are valuable tools for doing serialized work such as
        updating a text index, other optimization features sometimes useful for this
        sort of task, such as collapsing similar jobs, are not provided directly by
        this package. This functionality could be trivially built on top of zc.async,
        however [#idea_for_collapsing_jobs]_.
        
        --------------
        Returning Jobs
        --------------
        
        Our examples so far have done work directly.  What if the job wants to
        orchestrate other work?  One way this can be done is to return another
        job.  The result of the inner job will be the result of the first
        job once the inner job is finished.  This approach can be used to
        break up the work of long running processes; to be more cooperative to
        other jobs; and to make parts of a job that can be parallelized available
        to more workers.
        
        Serialized Work
        ---------------
        
        First, consider a serialized example.  This simple pattern is one approach.
        
        >>> def second_job(value):
        ...     # imagine a lot of work goes on...
        ...     return value * 2
        ...
        >>> def first_job():
        ...     # imagine a lot of work goes on...
        ...     intermediate_value = 21
        ...     queue = zc.async.local.getJob().queue
        ...     return queue.put(zc.async.job.Job(
        ...         second_job, intermediate_value))
        ...
        >>> job = queue.put(first_job)
        >>> transaction.commit()
        >>> reactor.wait_for(job, attempts=3)
        TIME OUT
        >>> reactor.wait_for(job, attempts=3)
        >>> job.result
        42
        
        The second_job could also have returned a job, allowing for additional
        legs.  Once the last job returns a real result, it will cascade through the
        past jobs back up to the original one.
        
        A different approach could have used callbacks.  Using callbacks can be
        somewhat more complicated to follow, but can allow for a cleaner
        separation of code: dividing code that does work from code that
        orchestrates the jobs.  We'll see an example of the idea below.
        
        Parallelized Work
        -----------------
        
        Now how can we set up parallel jobs?  There are other good ways, but we
        can describe one way that avoids potential problems with the
        current-as-of-this-writing (ZODB 3.8 and trunk) default optimistic MVCC
        serialization behavior in the ZODB.  The solution uses callbacks, which
        also allows us to cleanly divide the "work" code from the synchronization
        code, as described in the previous paragraph.
        
        First, we'll define the jobs that do work.  ``job_A``, ``job_B``, and
        ``job_C`` will be jobs that can be done in parallel, and
        ``post_process`` will be a function that assembles the job results for a
        final result.
        
        >>> def job_A():
        ...     # imaginary work...
        ...     return 7
        ...
        >>> def job_B():
        ...     # imaginary work...
        ...     return 14
        ...
        >>> def job_C():
        ...     # imaginary work...
        ...     return 21
        ...
        >>> def post_process(*args):
        ...     # this callable represents one that needs to wait for the
        ...     # parallel jobs to be done before it can process them and return
        ...     # the final result
        ...     return sum(args)
        ...
        
        Now this code works with jobs to get everything done.  Note, in the
        callback function, that mutating the same object we are checking
        (job.args) is the way we are enforcing necessary serializability
        with MVCC turned on.
        
        >>> def callback(job, result):
        ...     job.args.append(result)
        ...     if len(job.args) == 3: # all results are in
        ...         zc.async.local.getJob().queue.put(job)
        ...
        >>> def main_job():
        ...     job = zc.async.job.Job(post_process)
        ...     queue = zc.async.local.getJob().queue
        ...     for j in (job_A, job_B, job_C):
        ...         queue.put(j).addCallback(
        ...             zc.async.job.Job(callback, job))
        ...     return job
        ...
        
        That may be a bit mind-blowing at first.  The trick to catch here is that,
        because the main_job returns a job, the result of that job will become the
        result of the main_job once the returned (``post_process``) job is done.
        
        Now we'll put this in and let it cook.
        
        >>> job = queue.put(main_job)
        >>> transaction.commit()
        >>> reactor.wait_for(job, attempts=3)
        TIME OUT
        >>> reactor.wait_for(job, attempts=3)
        TIME OUT
        >>> reactor.wait_for(job, attempts=3)
        TIME OUT
        >>> reactor.wait_for(job, attempts=3)
        >>> job.result
        42
        
        Ta-da!
        
        For real-world usage, you'd also probably want to deal with the possibility of
        one or more of the jobs generating a Failure, among other edge cases.
        
        -------------------
        Returning Deferreds
        -------------------
        
        What if you want to do work that doesn't require a ZODB connection?  You
        can also return a Twisted deferred (twisted.internet.defer.Deferred).
        When you then ``callback`` the deferred with the eventual result, the
        agent will be responsible for setting that value on the original
        deferred and calling its callbacks.  This can be a useful trick for
        making network calls using Twisted or zc.ngi, for instance.
        
        >>> def imaginaryNetworkCall2(deferred):
        ...     # make a network call...
        ...     deferred.callback('200 OK')
        ...
        >>> import twisted.internet.defer
        >>> import threading
        >>> def delegator():
        ...     deferred = twisted.internet.defer.Deferred()
        ...     t = threading.Thread(
        ...         target=imaginaryNetworkCall2, args=(deferred,))
        ...     t.run()
        ...     return deferred
        ...
        >>> job = queue.put(delegator)
        >>> transaction.commit()
        >>> reactor.wait_for(job)
        >>> job.result
        '200 OK'
        
        Conclusion
        ==========
        
        This concludes our discussion of zc.async usage. The `next section`_ shows how
        to configure zc.async without Zope 3 [#stop_usage_reactor]_.
        
        .. _next section: `Configuration without Zope 3`_
        
        .. ......... ..
        .. Footnotes ..
        .. ......... ..
        
        .. [#async_history] The first generation, zasync, had the following goals:
        
        - be scalable, so that another process or machine could do the
        asynchronous work;
        
        - support lengthy jobs outside of the ZODB;
        
        - support lengthy jobs inside the ZODB;
        
        - be recoverable, so that crashes would not lose work;
        
        - be discoverable, so that logs and web interfaces give a view into
        the work being done asynchronously;
        
        - be easily extendible, to do new jobs; and
        
        - support graceful job expiration and cancellation.
        
        It met its goals well in some areas and adequately in others.
        
        Based on experience with the first generation, this second
        generation identifies several areas of improvement from the first
        design, and adds several goals.
        
        - Improvements
        
        * More carefully delineate the roles of the comprising components.
        
        The zasync design has three main components, as divided by their
        roles: persistent deferreds, now called jobs; job queues (the
        original zasync's "asynchronous call manager"); and dispatchers
        (the original zasync ZEO client).  The zasync 1.x design
        blurred the lines between the three components such that the
        component parts could only be replaced with difficulty, if at
        all. A goal for the 2.x design is to clearly define the role for
        each of three components such that, for instance, a user of a
        queue does not need to know about the dispatcher ot the agents.
        
        * Improve scalability of asynchronous workers.
        
        The 1.x line was initially designed for a single asynchronous
        worker, which could be put on another machine thanks to ZEO.
        Tarek Ziad of Nuxeo wrote zasyncdispatcher, which allowed
        multiple asynchronous workers to accept work, allowing multiple
        processes and multiple machines to divide and conquer. It worked
        around the limitations of the original zasync design to provide
        even more scalability. However, it was forced to divide up work
        well before a given worker looks at the queue.
        
        While dividing work earlier allows guesses and heuristics a
        chance to predict what worker might be more free in the future,
        a more reliable approach is to let the worker gauge whether it
        should take a job at the time the job is taken. Perhaps the
        worker will choose based on the worker's load, or other
        concurrent jobs in the process, or other details. A goal for the
        2.x line is to more directly support this type of scalability.
        
        * Improve scalability of registering jobs.
        
        The 1.x line initially wasn't concerned about very many
        concurrent asynchronous requests.  When this situation was
        encountered, it caused ConflictErrors between the worker process
        reading the deferred queue and the code that was adding the
        deferreds.  Thanks to Nuxeo, this problem was addressed in the
        1.x line.  A goal for the new version is to include and improve
        upon the 1.x solution.
        
        * Make it even simpler to provide new jobs.
        
        In the first version, `plugins` performed jobs.  They had a
        specific API and they had to be configured.  A goal for the new
        version is to require no specific API for jobs, and to not
        require any configuration.
        
        * Improve report information, especially through the web.
        
        The component that the first version of zasync provided to do
        the asynchronous work, the zasync client, provided very verbose
        logs of the jobs done, but they were hard to read and also did
        not have a through- the-web parallel.  Two goals for the new
        version are to improve the usefulness of the filesystem logs and
        to include more complete through-the-web visibility of the
        status of the provided asynchronous clients.
        
        * Make it easier to configure and start, especially for small
        deployments.
        
        A significant barrier to experimentation and deployment of the
        1.x line was the difficulty in configuration.  The 1.x line
        relied on ZConfig for zasync client configuration, demanding
        non-extensible similar-yet-subtly-different .conf files like the
        Zope conf files. The 2.x line plans to provide code that Zope 3
        can configure to run in the same process as a standard Zope 3
        application.  This means that development instances can start a
        zasync quickly and easily.  It also means that processes can be
        reallocated on the fly during production use, so that a machine
        being used as a zasync process can quickly be converted to a web
        server, if needed, and vice versa.  It further means that the
        Zope web server can be used for through-the-web reports of the
        current zasync process state.
        
        - New goals
        
        * Support intermediate return calls so that jobs can report back
        how they are doing.
        
        A frequent request from users of zasync 1.x was the ability for
        a long- running asynchronous process to report back progress to
        the original requester.  The 2.x line addresses this with three
        changes:
        
        + jobs are annotatable;
        
        + jobs should not be modified in an asynchronous
        worker that does work (though they may be read);
        
        + jobs can request another job in a synchronous process
        that annotates the job with progress status or other
        information.
        
        Because of relatively recent changes in ZODB--multi version
        concurrency control--this simple pattern should not generate
        conflict errors.
        
        * Support time-delayed calls.
        
        Retries and other use cases make time-delayed deferred calls
        desirable. The new design supports these sort of calls.
        
        .. [#identifying_agent] The combination of a queue name plus a
        dispatcher UUID plus an agent name uniquely identifies an agent.
        
        .. [#usageSetUp] We set up the configuration for our usage examples here.
        
        You must have two adapter registrations: IConnection to
        ITransactionManager, and IPersistent to IConnection.  We will also
        register IPersistent to ITransactionManager because the adapter is
        designed for it.
        
        We also need to be able to get data manager partials for functions and
        methods; normal partials for functions and methods; and a data manager for
        a partial. Here are the necessary registrations.
        
        The dispatcher will look for a UUID utility, so we also need one of these.
        
        The ``zc.async.configure.base`` function performs all of these
        registrations. If you are working with zc.async without ZCML you might want
        to use it or ``zc.async.configure.minimal`` as a convenience.
        
        >>> import zc.async.configure
        >>> zc.async.configure.base()
        
        Now we'll set up the database, and make some policy decisions.  As
        the subsequent ``configuration`` sections discuss, some helpers are
        available for you to set this stuff up if you'd like, though it's not too
        onerous to do it by hand.
        
        We'll use a test reactor that we can control.
        
        >>> import zc.async.testing
        >>> reactor = zc.async.testing.Reactor()
        >>> reactor.start() # this mokeypatches datetime.datetime.now
        
        We need to instantiate the dispatcher with a reactor and a DB.  We
        have the reactor, so here is the DB.  We use a FileStorage rather
        than a MappingStorage variant typical in tests and examples because
        we want MVCC.
        
        >>> import ZODB.FileStorage
        >>> storage = ZODB.FileStorage.FileStorage(
        ...     'zc_async.fs', create=True)
        >>> from ZODB.DB import DB
        >>> db = DB(storage)
        >>> conn = db.open()
        >>> root = conn.root()
        
        Now let's create the mapping of queues, and a single queue.
        
        >>> import zc.async.queue
        >>> import zc.async.interfaces
        >>> mapping = root[zc.async.interfaces.KEY] = zc.async.queue.Queues()
        >>> queue = mapping[''] = zc.async.queue.Queue()
        >>> import transaction
        >>> transaction.commit()
        
        Now we can instantiate, activate, and perform some reactor work in order
        to let the dispatcher register with the queue.
        
        >>> import zc.async.dispatcher
        >>> dispatcher = zc.async.dispatcher.Dispatcher(db, reactor)
        >>> dispatcher.activate()
        >>> reactor.time_flies(1)
        1
        
        The UUID is set on the dispatcher.
        
        >>> import zope.component
        >>> import zc.async.interfaces
        >>> UUID = zope.component.getUtility(zc.async.interfaces.IUUID)
        >>> dispatcher.UUID == UUID
        True
        
        Here's an agent named 'main'
        
        >>> import zc.async.agent
        >>> agent = zc.async.agent.Agent()
        >>> queue.dispatchers[dispatcher.UUID]['main'] = agent
        >>> agent.chooser is zc.async.agent.chooseFirst
        True
        >>> agent.size
        3
        >>> transaction.commit()
        
        .. [#commit_for_multidatabase] We commit before we do the next step as a
        good practice, in case the queue is from a different database than
        the root.  See the configuration sections for a discussion about
        why putting the queue in another database might be a good idea.
        
        Rather than committing the transaction,
        ``root._p_jar.add(root['demo'])`` would also accomplish the same
        thing from a multi-database perspective, without a commit.  It was
        not used in the example because the ``transaction.commit()`` the author
        judged it to be less jarring to the reader.  If you are down here
        reading this footnote, maybe the author was wrong. :-)
        
        .. [#already_passed]
        
        >>> t = transaction.begin()
        >>> job = queue.put(
        ...     send_message, datetime.datetime(2006, 8, 10, 15, tzinfo=pytz.UTC))
        >>> transaction.commit()
        >>> reactor.wait_for(job)
        imagine this sent a message to another machine
        
        It's worth noting that this situation consitutes a small exception
        in the handling of scheduled calls.  Scheduled calls usually get
        preference when jobs are handed out over normal non-scheduled "as soon as
        possible" jobs.  However, setting the begin_after date to an earlier
        time puts the job at the end of the (usually) FIFO queue of non-scheduled
        tasks: it is treated exactly as if the date had not been specified.
        
        .. [#already_passed_timed_out]
        
        >>> t = transaction.begin()
        >>> job = queue.put(
        ...     send_message, datetime.datetime(2006, 7, 21, 12, tzinfo=pytz.UTC))
        >>> transaction.commit()
        >>> reactor.wait_for(job)
        >>> job.result
        <zc.twist.Failure zc.async.interfaces.AbortedError>
        >>> import sys
        >>> job.result.printTraceback(sys.stdout) # doctest: +NORMALIZE_WHITESPACE
        Traceback (most recent call last):
        Failure: zc.async.interfaces.AbortedError:
        
        .. [#job] The Job class can take arguments and keyword arguments
        for the wrapped callable at call time as well, similar to Python
        2.5's `partial`.  This will be important when we use the Job as
        a callback.  For this use case, though, realize that the job
        will be called with no arguments, so you must supply all necessary
        arguments for the callable on creation time.
        
        .. [#setLiveAnnotation]  Here's the real rule, which is more complex.
        *Do not send non-persistent mutables or a persistent.Persistent
        object without a connection, unless you do not refer to it again in
        the current job.*
        
        .. [#getLiveAnnotation] Here's the real rule. *To prevent surprising
        errors, do not request an annotation that might be a persistent
        object.*
        
        .. [#stats_1] The dispatcher has a getStatistics method.  It also shows the
        fact that there is an active task.
        
        >>> import pprint
        >>> pprint.pprint(dispatcher.getStatistics()) # doctest: +ELLIPSIS
        {'failed': 2,
        'longest active': (..., 'unnamed'),
        'longest failed': (..., 'unnamed'),
        'longest successful': (..., 'unnamed'),
        'shortest active': (..., 'unnamed'),
        'shortest failed': (..., 'unnamed'),
        'shortest successful': (..., 'unnamed'),
        'started': 12,
        'statistics end': datetime.datetime(2006, 8, 10, 15, 44, 22, 211),
        'statistics start': datetime.datetime(2006, 8, 10, 15, 56, 47, 211),
        'successful': 9,
        'unknown': 0}
        
        We can also see the active job with ``getActiveJobIds``
        
        >>> job_ids = dispatcher.getActiveJobIds()
        >>> len(job_ids)
        1
        >>> info = dispatcher.getJobInfo(*job_ids[0])
        >>> pprint.pprint(info) # doctest: +ELLIPSIS
        {'call': "<zc.async.job.Job (oid ..., db 'unnamed') ``zc.async.doctest_test.annotateStatus()``>",
        'completed': None,
        'failed': False,
        'poll id': ...,
        'quota names': (),
        'result': None,
        'started': datetime.datetime(...),
        'thread': ...}
        >>> info['thread'] is not None
        True
        >>> info['poll id'] is not None
        True
        
        
        .. [#stats_2] Now the task is done, as the stats reflect.
        
        >>> pprint.pprint(dispatcher.getStatistics()) # doctest: +ELLIPSIS
        {'failed': 2,
        'longest active': None,
        'longest failed': (..., 'unnamed'),
        'longest successful': (..., 'unnamed'),
        'shortest active': None,
        'shortest failed': (..., 'unnamed'),
        'shortest successful': (..., 'unnamed'),
        'started': 10,
        'statistics end': datetime.datetime(2006, 8, 10, 15, 46, 52, 211),
        'statistics start': datetime.datetime(2006, 8, 10, 15, 56, 52, 211),
        'successful': 8,
        'unknown': 0}
        
        Although, wait a second--the 'statistics end', the 'started', and the
        'successful' values have changed!  Why?
        
        To keep memory from rolling out of control, the dispatcher by default
        only keeps 10 to 12.5 minutes worth of poll information in memory.  For
        the rest, keep logs and look at them (...and rotate them!).
        
        The ``getActiveJobIds`` list is empty now.
        
        >>> dispatcher.getActiveJobIds()
        []
        >>> info = dispatcher.getJobInfo(*job_ids[0])
        >>> pprint.pprint(info) # doctest: +ELLIPSIS
        {'call': "<zc.async.job.Job (oid ..., db 'unnamed') ``zc.async.doctest_test.annotateStatus()``>",
        'completed': datetime.datetime(...),
        'failed': False,
        'poll id': ...,
        'quota names': (),
        'result': '42',
        'started': datetime.datetime(...),
        'thread': ...}
        >>> info['thread'] is not None
        True
        >>> info['poll id'] is not None
        True
        
        .. [#idea_for_collapsing_jobs] For instance, here is one approach.  Imagine
        you are queueing the job of indexing documents. If the same document has a
        request to index, the job could simply walk the queue and remove (``pull``)
        similar tasks, perhaps aggregating any necessary data. Since the jobs are
        serial because of a quota, no other worker should be trying to work on
        those jobs.
        
        .. [#stop_usage_reactor]
        
        >>> pprint.pprint(dispatcher.getStatistics()) # doctest: +ELLIPSIS
        {'failed': 2,
        'longest active': None,
        'longest failed': (..., 'unnamed'),
        'longest successful': (..., 'unnamed'),
        'shortest active': None,
        'shortest failed': (..., 'unnamed'),
        'shortest successful': (..., 'unnamed'),
        'started': 22,
        'statistics end': datetime.datetime(2006, 8, 10, 15, 46, 52, 211),
        'statistics start': datetime.datetime(2006, 8, 10, 15, 57, 47, 211),
        'successful': 20,
        'unknown': 0}
        >>> reactor.stop()
        
        
        ============================
        Configuration without Zope 3
        ============================
        
        This section discusses setting up zc.async without Zope 3. Since Zope 3 is
        ill-defined, we will be more specific: this describes setting up zc.async
        without ZCML, without any zope.app packages, and with as few dependencies as
        possible. A casual way of describing the dependencies is "ZODB, Twisted and
        zope.component," though we directly depend on some smaller packages and
        indirectly on others [#specific_dependencies]_.
        
        You may have one or two kinds of configurations for your software using
        zc.async. The simplest approach is to have all processes able both to put items
        in queues, and to perform them with a dispatcher. You can then use on-the-fly
        ZODB configuration to determine what jobs, if any, each process' dispatcher
        performs. If a dispatcher has no agents in a given queue, as we'll discuss
        below, the dispatcher will not perform any job for that queue.
        
        However, if you want to create some processes that can only put items in a
        queue, and do not have a dispatcher at all, that is easy to do. We'll call this
        a "client" process, and the full configuration a "client/server process". As
        you might expect, the configuration of a client process is a subset of the
        configuration of the client/server process.
        
        We will first describe setting up a client, non-dispatcher process, in which
        you only can put items in a zc.async queue; and then describe setting up a
        dispatcher client/server process that can be used both to request and to
        perform jobs.
        
        Configuring a Client Process
        ============================
        
        Generally, zc.async configuration has four basic parts: component
        registrations, ZODB setup, ZODB configuration, and process configuration.  For
        a client process, we'll discuss required component registrations; ZODB
        setup;  minimal ZODB configuration; process configuration; and then circle
        back around for some optional component registrations.
        
        --------------------------------
        Required Component Registrations
        --------------------------------
        
        The required registrations can be installed for you by the
        ``zc.async.configure.base`` function. Most other examples in this package,
        such as those in the `Usage`_ section, use this in their
        test setup.
        
        Again, for a quick start, you might just want to use the helper
        ``zc.async.configure.base`` function, and move on to the `Required ZODB Set
        Up`_ section below.
        
        Here, though, we will go over each required registration to briefly explain
        what they are.
        
        You must have three adapter registrations: IConnection to
        ITransactionManager, IPersistent to IConnection, and IPersistent to
        ITransactionManager.
        
        The ``zc.twist`` package provides all of these adapters.  However,
        zope.app.keyreference also provides a version of the ``connection`` adapter
        that is identical or very similar, and that should work fine if you are
        already using that package in your application.
        
        >>> import zc.twist
        >>> import zope.component
        >>> zope.component.provideAdapter(zc.twist.transactionManager)
        >>> zope.component.provideAdapter(zc.twist.connection)
        >>> import ZODB.interfaces
        >>> zope.component.provideAdapter(
        ...     zc.twist.transactionManager, adapts=(ZODB.interfaces.IConnection,))
        
        We also need to be able to adapt functions and methods to jobs.  The
        zc.async.job.Job class is the expected implementation.
        
        >>> import types
        >>> import zc.async.interfaces
        >>> import zc.async.job
        >>> zope.component.provideAdapter(
        ...     zc.async.job.Job,
        ...     adapts=(types.FunctionType,),
        ...     provides=zc.async.interfaces.IJob)
        >>> zope.component.provideAdapter(
        ...     zc.async.job.Job,
        ...     adapts=(types.MethodType,),
        ...     provides=zc.async.interfaces.IJob)
        >>> zope.component.provideAdapter( # optional, rarely used
        ...     zc.async.job.Job,
        ...     adapts=(zc.twist.METHOD_WRAPPER_TYPE,),
        ...     provides=zc.async.interfaces.IJob)
        
        The queue looks for the UUID utility to set the ``assignerUUID`` job attribute,
        and may want to use it to optionally filter jobs during ``claim`` in the
        future. Also, the dispatcher will look for a UUID utility if a UUID is not
        specifically provided to its constructor.
        
        >>> from zc.async.instanceuuid import UUID
        >>> zope.component.provideUtility(
        ...     UUID, zc.async.interfaces.IUUID, '')
        
        The UUID we register here is a UUID of the instance, which is expected
        to uniquely identify the process when in production. It is stored in
        the file specified by the ``ZC_ASYNC_UUID`` environment variable (or in
        ``os.join(os.getcwd(), 'uuid.txt')`` if this is not specified, for easy
        initial experimentation with the package).
        
        >>> import uuid
        >>> import os
        >>> f = open(os.environ["ZC_ASYNC_UUID"])
        >>> uuid_hex = f.readline().strip()
        >>> f.close()
        >>> uuid = uuid.UUID(uuid_hex)
        >>> UUID == uuid
        True
        
        The uuid.txt file is intended to stay in the instance home as a persistent
        identifier.
        
        Again, all of the required registrations above can be accomplished quickly with
        ``zc.async.configure.base``.
        
        --------------------
        Required ZODB Set Up
        --------------------
        
        On a basic level, zc.async needs a setup that supports good conflict
        resolution.  Most or all production ZODB storages now have the necessary
        APIs to support MVCC.
        
        Of course, if you want to run multiple processes, you need ZEO. You should also
        then make sure that your ZEO server installation has all the code that includes
        conflict resolution, such as zc.queue, because, as of this writing, conflict
        resolution happens in the ZEO server, not in clients.
        
        A more subtle decision is whether to use multiple databases.  The zc.async
        dispatcher can generate a lot of database churn.  It may be wise to put the
        queue in a separate database from your content database(s).
        
        The downsides to this option include the fact that you must be careful to
        specify to which database objects belong; and that broken cross-database
        references are not handled gracefully in the ZODB as of this writing.
        
        We will use multiple databases for our example here, because we are trying to
        demonstrate production-quality examples. We will show this with a pure-Python
        approach, rather than the ZConfig approach usually used by Zope. If you know
        ZConfig, that will be a reasonable approach as well; see zope.app.appsetup
        for how Zope uses ZConfig to set up multidatabases.
        
        In our example, we create two file storages. In production, you might likely
        use ZEO; hooking ClientStorage up instead of FileStorage should be straight
        forward.
        
        >>> databases = {}
        >>> import ZODB.FileStorage
        >>> storage = ZODB.FileStorage.FileStorage(
        ...     'main.fs', create=True)
        
        >>> async_storage = ZODB.FileStorage.FileStorage(
        ...     'async.fs', create=True)
        
        >>> from ZODB.DB import DB
        >>> databases[''] = db = DB(storage)
        >>> databases['async'] = async_db = DB(async_storage)
        >>> async_db.databases = db.databases = databases
        >>> db.database_name = ''
        >>> async_db.database_name = 'async'
        >>> conn = db.open()
        >>> root = conn.root()
        
        ------------------
        ZODB Configuration
        ------------------
        
        A Queue
        -------
        
        All we must have for a client to be able to put jobs in a queue is...a queue.
        
        For a quick start, the ``zc.async.subscribers`` module provides a subscriber to
        a DatabaseOpened event that does the right dance. See
        ``multidb_queue_installer`` and ``queue_installer`` in that module, and you can
        see that in use in `Configuration with Zope 3`_. For now, though, we're taking
        things step by step and explaining what's going on.
        
        Dispatchers look for queues in a mapping off the root of the database in
        a key defined as a constant: zc.async.interfaces.KEY.  This mapping should
        generally be a zc.async.queue.Queues object.
        
        If we were not using a multi-database for our example, we could simply install
        the queues mapping with this line:
        ``root[zc.async.interfaces.KEY] = zc.async.queue.Queues()``.  We will need
        something a bit more baroque.  We will add the queues mapping to the 'async'
        database, and then make it available in the main database ('') with the proper
        key.
        
        >>> conn2 = conn.get_connection('async')
        >>> import zc.async.queue
        >>> queues = conn2.root()['mounted_queues'] = zc.async.queue.Queues()
        
        Note that the 'mounted_queues' key in the async database is arbitrary:
        what we care about is the key in the database that the dispatcher will
        see.
        
        Now we add the object explicitly to conn2, so that the ZODB will know the
        "real" database in which the object lives, even though it will be also
        accessible from the main database.
        
        >>> conn2.add(queues)
        >>> root[zc.async.interfaces.KEY] = queues
        >>> import transaction
        >>> transaction.commit()
        
        Now we need to put a queue in the queues collection.  We can have more than
        one, as discussed below, but we suggest a convention of the primary queue
        being available in a key of '' (empty string).
        
        >>> queue = queues[''] = zc.async.queue.Queue()
        >>> transaction.commit()
        
        Quotas
        ------
        
        We touched on quotas in the usage section.  Some jobs will need to
        access resoources that are shared across processes.  A central data
        structure such as an index in the ZODB is a prime example, but other
        examples might include a network service that only allows a certain
        number of concurrent connections.  These scenarios can be helped by
        quotas.
        
        Quotas are demonstrated in the usage section.  For configuration, you
        should know these characteristics:
        
        - you cannot add a job with a quota name that is not defined in the
        queue [#undefined_quota_name]_;
        
        - you cannot add a quota name to a job in a queue if the quota name is not
        defined in the queue [#no_mutation_to_undefined]_;
        
        - you can create and remove quotas on the queue [#create_remove_quotas]_;
        
        - you can remove quotas if pending jobs have their quota names--the quota name
        is then ignored [#remove_quotas]_;
        
        - quotas default to a size of 1 [#default_size]_;
        
        - this can be changed at creation or later [#change_size]_; and
        
        - decreasing the size of a quota while the old quota size is filled will
        not affect the currently running jobs [#decreasing_affects_future]_.
        
        Multiple Queues
        ---------------
        
        Since we put our queues in a mapping of them, we can also create multiple
        queues.  This can make some scenarios more convenient and simpler to reason
        about.  For instance, while you might have agents filtering jobs as we
        describe above, it might be simpler to say that you have a queue for one kind
        of job--say, processing a video file or an audio file--and a queue for other
        kinds of jobs.  Then it is easy and obvious to set up simple FIFO agents
        as desired for different dispatchers.  The same kind of logic could be
        accomplished with agents, but it is easier to picture the multiple queues.
        
        Another use case for multiple queues might be for specialized queues, like ones
        that broadcast jobs. You could write a queue subclass that broadcasts copies of
        jobs they get to all dispatchers, aggregating results.  This could be used to
        send "events" to all processes, or to gather statistics on certain processes,
        and so on.
        
        Generally, any time the application wants to be able to assert a kind of job
        rather than letting the agents decide what to do, having separate queues is
        a reasonable tool.
        
        ---------------------
        Process Configuration
        ---------------------
        
        Daemonization
        -------------
        
        You often want to daemonize your software, so that you can restart it if
        there's a problem, keep track of it and monitor it, and so on.  ZDaemon
        (http://pypi.python.org/pypi/zdaemon) and Supervisor (http://supervisord.org/)
        are two fairly simple-to-use ways of doing this for both client and
        client/server processes. If your main application can be packaged as a
        setuptools distribution (egg or source release or even development egg) then
        you can have your main application as a zc.async client and your dispatchers
        running a separate zc.async-only main loop that simply includes your main
        application as a dependency, so the necessary software is around. You may have
        to do a bit more configuration on the client/server side to mimic global
        registries such as zope.component registrations and so on between the client
        and the client/servers, but this shouldn't be too bad.
        
        UUID File Location
        ------------------
        
        As discussed above, the instanceuuid module will look for an environmental
        variable ``ZC_ASYNC_UUID`` to find the file name to use, and failing that will
        use ``os.join(os.getcwd(), 'uuid.txt')``.  It's worth noting that daemonization
        tools such as ZDaemon and Supervisor (3 or greater) make setting environment
        values for child processes an easy (and repeatable) configuration file setting.
        
        -----------------------------------------------------
        Optional Component Registrations for a Client Process
        -----------------------------------------------------
        
        The only optional component registration potentially valuable for client
        instances that only put jobs in the queue is registering an adapter from
        persistent objects to a queue.  The ``zc.async.queue.getDefaultQueue`` adapter
        does this for an adapter to the queue named '' (empty string).  Since that's
        what we have from the `ZODB Configuration`_ above section, we'll register it.
        Writing your own adapter is trivial, as you can see if you look at the
        implementation of this function.
        
        >>> zope.component.provideAdapter(zc.async.queue.getDefaultQueue)
        >>> zc.async.interfaces.IQueue(root) is queue
        True
        
        Configuring a Client/Server Process
        ===================================
        
        Configuring a client/server process--something that includes a running
        dispatcher--means doing everything described above, plus a bit more.  You
        need to set up and start a reactor and dispatcher; configure agents as desired
        to get the dispatcher to do some work; and optionally configure logging.
        
        For a quick start, the ``zc.async.subscribers`` module has some conveniences
        to start a threaded reactor and dispatcher, and to install agents.  You might
        want to look at those to get started.  They are also used in the Zope 3
        configuration (README_3).  Meanwhile, this document continues to go
        step-by-step instead, to try and explain the components and configuration.
        
        Even though it seems reasonable to first start a dispatcher and then set up its
        agents, we'll first define a subscriber to create an agent. As we'll see below,
        the dispatcher fires an event when it registers with a queue, and another when
        it activates the queue. These events give you the opportunity to register
        subscribers to add one or more agents to a queue, to tell the dispatcher what
        jobs to perform. zc.async.agent.addMainAgentActivationHandler is a reasonable
        starter: it adds a single agent named 'main' if one does not exist. The agent
        has a simple indiscriminate FIFO policy for the queue. If you want to write
        your own subscriber, look at this, or at the more generic subscriber in the
        ``zc.async.subscribers`` module.
        
        Agents are an important part of the ZODB configuration, and so are described
        more in depth below.
        
        >>> import zc.async.agent
        >>> zope.component.provideHandler(
        ...     zc.async.agent.addMainAgentActivationHandler)
        
        This subscriber is registered for the IDispatcherActivated event; another
        approach might use the IDispatcherRegistered event.
        
        -----------------------
        Starting the Dispatcher
        -----------------------
        
        Now we can start the reactor, and start the dispatcher.
        In some applications this may be done with an event subscriber to
        DatabaseOpened, as is done in ``zc.async.subscribers``. Here, we will do it
        inline.
        
        Any object that conforms to the specification of zc.async.interfaces.IReactor
        will be usable by the dispatcher.  For our example, we will use our own instance
        of the Twisted select-based reactor running in a separate thread.  This is
        separate from the Twisted reator installed in twisted.internet.reactor, and
        so this approach can be used with an application that does not otherwise use
        Twisted (for instance, a Zope application using the "classic" zope publisher).
        
        The testing module also has a reactor on which the `Usage` section relies, if
        you would like to see a minimal contract.
        
        Configuring the basics is fairly simple, as we'll see in a moment.  The
        trickiest part is to handle signals cleanly. It is also optional! The
        dispatcher will eventually figure out that there was not a clean shut down
        before and take care of it. Here, though, essentially as an optimization, we
        install signal handlers in the main thread using ``reactor._handleSignals``.
        ``reactor._handleSignals`` may work in some real-world applications, but if
        your application already needs to handle signals you may need a more careful
        approach. Again, see ``zc.async.subscribers`` for some options you can explore.
        
        >>> import twisted.internet.selectreactor
        >>> reactor = twisted.internet.selectreactor.SelectReactor()
        >>> reactor._handleSignals()
        
        Now we are ready to instantiate our dispatcher.
        
        >>> dispatcher = zc.async.dispatcher.Dispatcher(db, reactor)
        
        Notice it has the uuid defined in instanceuuid.
        
        >>> dispatcher.UUID == UUID
        True
        
        Now we can start the reactor and the dispatcher in a thread.
        
        >>> import threading
        >>> def start():
        ...     dispatcher.activate()
        ...     reactor.run(installSignalHandlers=0)
        ...
        >>> thread = threading.Thread(target=start)
        >>> thread.setDaemon(True)
        
        >>> thread.start()
        
        The dispatcher should be starting up now.  Let's wait for it to activate.
        We're using a test convenience, get_poll, defined in the testing module.
        
        >>> from zc.async.testing import get_poll
        >>> poll = get_poll(dispatcher, 0)
        
        We're off!  The events have been fired for registering and activating the
        dispatcher.  Therefore, our subscriber to add our agent has fired.
        
        We need to begin our transaction to synchronize our view of the database.
        
        >>> t = transaction.begin()
        
        We get the collection of dispatcher agents from the queue, using the UUID.
        
        >>> dispatcher_agents = queue.dispatchers[UUID]
        
        It has one agent--the one placed by our subscriber.
        
        >>> dispatcher_agents.keys()
        ['main']
        >>> agent = dispatcher_agents['main']
        
        Now we have our agent!  But...what is it [#stop_config_reactor]_?
        
        ------
        Agents
        ------
        
        Agents are the way you control what a dispatcher's worker threads do.  They
        pick the jobs and assign them to their dispatcher when the dispatcher asks.
        
        *If a dispatcher does not have any agents in a give queue, it will not perform
        any tasks for that queue.*
        
        We currently have an agent that simply asks for the next available FIFO job.
        We are using an agent implementation that allows you to specify a callable to
        choose the job.  That callable is now zc.async.agent.chooseFirst.
        
        >>> agent.chooser is zc.async.agent.chooseFirst
        True
        
        Here's the entire implementation of that function::
        
        def chooseFirst(agent):
        return agent.queue.claim()
        
        What would another agent do?  Well, it might pass a filter function to
        ``claim``.  This function takes a job and returns a value evaluated as a
        boolean.  For instance, let's say we always wanted a certain number of
        threads available for working on a particular call; for the purpose of
        example, we'll use ``operator.mul``, though a more real-world example
        might be a network call or a particular call in your application.
        
        >>> import operator
        >>> def chooseMul(agent):
        ...     return agent.queue.claim(lambda job: job.callable is operator.mul)
        ...
        
        Another variant would prefer operator.mul, but if one is not in the queue,
        it will take any.
        
        >>> def preferMul(agent):
        ...     res = agent.queue.claim(lambda job: job.callable is operator.mul)
        ...     if res is None:
        ...         res = agent.queue.claim()
        ...     return res
        ...
        
        Other approaches might look at the current jobs in the agent, or the agent's
        dispatcher, and decide what jobs to prefer on that basis.  The agent should
        support many ideas.
        
        Let's set up another agent, in addition to the ``chooseFirst`` one, that has
        the ``preferMul`` policy.
        
        >>> agent2 = dispatcher_agents['mul'] = zc.async.agent.Agent(preferMul)
        
        Another characteristic of agents is that they specify how many jobs they
        should pick at a time.  The dispatcher actually adjusts the size of the
        ZODB connection pool to accommodate its agents' size.  The default is 3.
        
        >>> agent.size
        3
        >>> agent2.size
        3
        
        We can change that at creation or later.
        
        Finally, it's worth noting that agents contain the jobs that are currently
        worked on by the dispatcher, on their behalf; and have a ``completed``
        collection of the more recent completed jobs, beginning with the most recently
        completed job.
        
        ----------------------
        Logging and Monitoring
        ----------------------
        
        Logs are sent to the ``zc.async.events`` log for big events, like startup and
        shutdown, and errors.  Poll and job logs are sent to ``zc.async.trace``.
        Confugure the standard Python logging module as usual to send these logs where
        you need.  Be sure to auto-rotate the trace logs.
        
        The package supports monitoring using zc.z3monitor, but using this package
        includes more Zope 3 dependencies, so it is not included here. If you would
        like to use it, see monitor.txt in the package and our next section:
        `Configuration with Zope 3`_. Otherwise, if you want to roll your own
        monitoring, glance at monitor.py--you'll see that most of the heavy lifting for
        the monitor support is done in the dispatcher, so it should be pretty easy to
        hook up the basic data another way.
        
        >>> reactor.stop()
        
        .. ......... ..
        .. Footnotes ..
        .. ......... ..
        
        .. [#specific_dependencies]  More specifically, as of this writing,
        these are the minimal egg dependencies (including indirect
        dependencies):
        
        - pytz
        A Python time zone library
        
        - rwproperty
        A small package of descriptor conveniences
        
        - uuid
        The uuid module included in Python 2.5
        
        - zc.dict
        A ZODB-aware dict implementation based on BTrees.
        
        - zc.queue
        A ZODB-aware queue
        
        - zc.twist
        Conveniences for working with Twisted and the ZODB
        
        - twisted
        The Twisted internet library.
        
        - ZConfig
        A general configuration package coming from the Zope project with which
        the ZODB tests.
        
        - zdaemon
        A general daemon tool coming from the Zope project.
        
        - ZODB3
        The Zope Object Database.
        
        - zope.bforest
        Aggregations of multiple BTrees into a single dict-like structure,
        reasonable for rotating data structures, among other purposes.
        
        - zope.component
        A way to hook together code by contract.
        
        - zope.deferredimport
        A way to defer imports in Python packages, often to prevent circular
        import problems.
        
        - zope.deprecation
        A small framework for deprecating features.
        
        - zope.event
        An exceedingly small event framework that derives its power from
        zope.component.
        
        - zope.i18nmessageid
        A way to specify strings to be translated.
        
        - zope.interface
        A way to specify code contracts and other data structures.
        
        - zope.proxy
        A way to proxy other Python objects.
        
        - zope.testing
        Testing extensions and helpers.
        
        The next section, `Configuration With Zope 3`_, still tries to limit
        dependencies--we only rely on additional packages zc.z3monitor, simplejson,
        and zope.app.appsetup ourselves--but as of this writing zope.app.appsetup
        ends up dragging in a large chunk of zope.app.* packages. Hopefully that
        will be refactored in Zope itself, and our full Zope 3 configuration can
        benefit from the reduced indirect dependencies.
        
        .. [#undefined_quota_name]
        
        >>> import operator
        >>> import zc.async.job
        >>> job = zc.async.job.Job(operator.mul, 5, 2)
        >>> job.quota_names = ['content catalog']
        >>> job.quota_names
        ('content catalog',)
        >>> queue.put(job)
        Traceback (most recent call last):
        ...
        ValueError: ('unknown quota name', 'content catalog')
        >>> len(queue)
        0
        
        .. [#no_mutation_to_undefined]
        
        >>> job.quota_names = ()
        >>> job is queue.put(job)
        True
        >>> job.quota_names = ('content catalog',)
        Traceback (most recent call last):
        ...
        ValueError: ('unknown quota name', 'content catalog')
        >>> job.quota_names
        ()
        
        .. [#create_remove_quotas]
        
        >>> list(queue.quotas)
        []
        >>> queue.quotas.create('testing')
        >>> list(queue.quotas)
        ['testing']
        >>> queue.quotas.remove('testing')
        >>> list(queue.quotas)
        []
        
        .. [#remove_quotas]
        
        >>> queue.quotas.create('content catalog')
        >>> job.quota_names = ('content catalog',)
        >>> queue.quotas.remove('content catalog')
        >>> job.quota_names
        ('content catalog',)
        >>> job is queue.claim()
        True
        >>> len(queue)
        0
        
        .. [#default_size]
        
        >>> queue.quotas.create('content catalog')
        >>> queue.quotas['content catalog'].size
        1
        
        .. [#change_size]
        
        >>> queue.quotas['content catalog'].size = 2
        >>> queue.quotas['content catalog'].size
        2
        >>> queue.quotas.create('frobnitz account', size=3)
        >>> queue.quotas['frobnitz account'].size
        3
        
        .. [#decreasing_affects_future]
        
        >>> job1 = zc.async.job.Job(operator.mul, 5, 2)
        >>> job2 = zc.async.job.Job(operator.mul, 5, 2)
        >>> job3 = zc.async.job.Job(operator.mul, 5, 2)
        >>> job1.quota_names = job2.quota_names = job3.quota_names = (
        ...     'content catalog',)
        >>> job1 is queue.put(job1)
        True
        >>> job2 is queue.put(job2)
        True
        >>> job3 is queue.put(job3)
        True
        >>> job1 is queue.claim()
        True
        >>> job2 is queue.claim()
        True
        >>> print queue.claim()
        None
        >>> quota = queue.quotas['content catalog']
        >>> len(quota)
        2
        >>> list(quota) == [job1, job2]
        True
        >>> quota.filled
        True
        >>> quota.size = 1
        >>> quota.filled
        True
        >>> print queue.claim()
        None
        >>> job1()
        10
        >>> print queue.claim()
        None
        >>> len(quota)
        1
        >>> list(quota) == [job2]
        True
        >>> job2()
        10
        >>> job3 is queue.claim()
        True
        >>> list(quota) == [job3]
        True
        >>> len(quota)
        1
        >>> job3()
        10
        >>> print queue.claim()
        None
        >>> len(queue)
        0
        >>> quota.clean()
        >>> len(quota)
        0
        >>> quota.filled
        False
        
        .. [#stop_config_reactor] We don't want the live dispatcher for our demos,
        actually.  See dispatcher.txt to see the live dispatcher actually in use.
        So, here we'll stop the "real" reactor and switch to a testing one.
        
        >>> reactor.callFromThread(reactor.stop)
        >>> thread.join(3)
        >>> assert not dispatcher.activated, 'dispatcher did not deactivate'
        
        >>> import zc.async.testing
        >>> reactor = zc.async.testing.Reactor()
        >>> dispatcher.reactor = reactor
        >>> dispatcher.activate()
        >>> reactor.start()
        
        
        =========================
        Configuration with Zope 3
        =========================
        
        Our last main section can be the shortest yet, both because we've already
        introduced all of the main concepts, and because we will be leveraging
        conveniences to automate much of the configuration shown in the section
        discussing configuration without Zope 3.
        
        Client Set Up
        =============
        
        If you want to set up a client alone, without a dispatcher, include the egg in
        your setup.py, include the configure.zcml in your applications zcml, make sure
        you share the database in which the queues will be held, and make sure that
        either the zope.app.keyreference.persistent.connectionOfPersistent adapter is
        registered, or zc.twist.connection.
        
        That should be it.
        
        Client/Server Set Up
        ====================
        
        For a client/server combination, use zcml that is something like the
        basic_dispatcher_policy.zcml, make sure you have access to the database with
        the queues, configure logging and monitoring as desired, configure the
        ``ZC_ASYNC_UUID`` environmental variable in zdaemon.conf if you are in
        production, and start up! Getting started is really pretty easy. You can even
        start a dispatcher-only version by not starting any servers in zcml.
        
        In comparison to the non-Zope 3 usage, an important difference in your setup.py
        is that, if you want the full set up described below, including zc.z3monitor,
        you'll need to specify "zc.async [z3]" as the desired package in your
        ``install_requires``, as opposed to just "zc.async" [#extras_require]_.
        
        We'll look at this by making a zope.conf-alike and a site.zcml-alike.  We'll
        need a place to put some files, so we'll use a temporary directory.  This, and
        the comments in the files that we set up, are the primary differences between
        our examples and a real set up.
        
        We'll do this in two versions.  The first version uses a single database, as
        you might do to get started quickly, or for a small site.  The second version
        has one database for the main application, and one database for the async data,
        as will be more appropriate for typical production usage.
        
        -----------------------------
        Shared Single Database Set Up
        -----------------------------
        
        As described above, using a shared single database will probably be the
        quickest way to get started.  Large-scale production usage will probably prefer
        to use the `Two Database Set Up`_ described later.
        
        So, without further ado, here is the text of our zope.conf-alike, and of our
        site.zcml-alike [#get_vals]_.
        
        >>> zope_conf = """
        ... site-definition %(site_zcml_file)s
        ...
        ... <zodb main>
        ...   <filestorage>
        ...     create true
        ...     path %(main_storage_path)s
        ...   </filestorage>
        ... </zodb>
        ...
        ... <product-config zc.z3monitor>
        ...   port %(monitor_port)s
        ... </product-config>
        ...
        ... <logger>
        ...   level debug
        ...   name zc.async
        ...   propagate no
        ...
        ...   <logfile>
        ...     path %(async_event_log)s
        ...   </logfile>
        ... </logger>
        ...
        ... <logger>
        ...   level debug
        ...   name zc.async.trace
        ...   propagate no
        ...
        ...   <logfile>
        ...     path %(async_trace_log)s
        ...   </logfile>
        ... </logger>
        ...
        ... <eventlog>
        ...   <logfile>
        ...     formatter zope.exceptions.log.Formatter
        ...     path STDOUT
        ...   </logfile>
        ...   <logfile>
        ...     formatter zope.exceptions.log.Formatter
        ...     path %(event_log)s
        ...   </logfile>
        ... </eventlog>
        ... """ % {'site_zcml_file': site_zcml_file,
        ...        'main_storage_path': os.path.join(dir, 'main.fs'),
        ...        'async_storage_path': os.path.join(dir, 'async.fs'),
        ...        'monitor_port': monitor_port,
        ...        'event_log': os.path.join(dir, 'z3.log'),
        ...        'async_event_log': os.path.join(dir, 'async.log'),
        ...        'async_trace_log': os.path.join(dir, 'async_trace.log'),}
        ...
        
        In a non-trivial production system of you will also probably want to replace
        the file storage with a <zeoclient> stanza.
        
        Also note that an open monitor port should be behind a firewall, of course.
        
        We'll assume that zdaemon.conf has been set up to put ZC_ASYNC_UUID in the
        proper place too.  It would have looked something like this in the
        zdaemon.conf::
        
        <environment>
        ZC_ASYNC_UUID /path/to/uuid.txt
        </environment>
        
        (Other tools, such as supervisor, also can work, of course; their spellings are
        different and are "left as an exercise to the reader" at the moment.)
        
        We'll do that by hand:
        
        >>> os.environ['ZC_ASYNC_UUID'] = os.path.join(dir, 'uuid.txt')
        
        Now let's define our site-zcml-alike.
        
        >>> site_zcml = """
        ... <configure xmlns='http://namespaces.zope.org/zope'
        ...            xmlns:meta="http://namespaces.zope.org/meta"
        ...            >
        ... <include package="zope.component" file="meta.zcml" />
        ... <include package="zope.component" />
        ... <include package="zc.z3monitor" />
        ... <include package="zc.async" file="basic_dispatcher_policy.zcml" />
        ...
        ... <!-- this is usually handled in Zope applications by the
        ...      zope.app.keyreference.persistent.connectionOfPersistent adapter -->
        ... <adapter factory="zc.twist.connection" />
        ... </configure>
        ... """
        
        Now we're done.
        
        If we process these files, and wait for a poll, we've got a working
        set up [#process]_.
        
        >>> import zc.async.dispatcher
        >>> dispatcher = zc.async.dispatcher.get()
        >>> import pprint
        >>> pprint.pprint(get_poll(dispatcher, 0))
        {'': {'main': {'active jobs': [],
        'error': None,
        'len': 0,
        'new jobs': [],
        'size': 3}}}
        >>> bool(dispatcher.activated)
        True
        
        We can ask for a job to be performed, and get the result.
        
        >>> conn = db.open()
        >>> root = conn.root()
        >>> import zc.async.interfaces
        >>> queue = zc.async.interfaces.IQueue(root)
        >>> import operator
        >>> import zc.async.job
        >>> job = queue.put(zc.async.job.Job(operator.mul, 21, 2))
        >>> import transaction
        >>> transaction.commit()
        >>> wait_for_result(job)
        42
        
        We can connect to the monitor server with telnet.
        
        >>> import telnetlib
        >>> tn = telnetlib.Telnet('127.0.0.1', monitor_port)
        >>> tn.write('async status\n') # immediately disconnects
        >>> print tn.read_all() # doctest: +ELLIPSIS
        {
        "poll interval": {
        "seconds": ...
        },
        "status": "RUNNING",
        "time since last poll": {
        "seconds": ...
        },
        "uptime": {
        "seconds": ...
        },
        "uuid": "..."
        }
        <BLANKLINE>
        
        Now we'll "shut down" with a CTRL-C, or SIGINT, and clean up.
        
        >>> import signal
        >>> if getattr(os, 'getpid', None) is not None: # UNIXEN, not Windows
        ...     pid = os.getpid()
        ...     try:
        ...         os.kill(pid, signal.SIGINT)
        ...     except KeyboardInterrupt:
        ...         if dispatcher.activated:
        ...             assert False, 'dispatcher did not deactivate'
        ...     else:
        ...         print "failed to send SIGINT, or something"
        ... else:
        ...     dispatcher.reactor.callFromThread(dispatcher.reactor.stop)
        ...     for i in range(30):
        ...         if not dispatcher.activated:
        ...             break
        ...         time.sleep(0.1)
        ...     else:
        ...         assert False, 'dispatcher did not deactivate'
        ...
        >>> import transaction
        >>> t = transaction.begin() # sync
        >>> import zope.component
        >>> import zc.async.interfaces
        >>> uuid = zope.component.getUtility(zc.async.interfaces.IUUID)
        >>> da = queue.dispatchers[uuid]
        >>> bool(da.activated)
        False
        
        >>> db.close()
        >>> import shutil
        >>> shutil.rmtree(dir)
        
        These instructions are very similar to the `Two Database Set Up`_.
        
        .. ......... ..
        .. Footnotes ..
        .. ......... ..
        
        .. [#extras_require] The "[z3]" is an "extra", defined in zc.async's setup.py
        in ``extras_require``. It pulls along zc.z3monitor and simplejson in
        addition to the packages described in the `Configuration without Zope 3`_
        section. Unfortunately, zc.z3monitor depends on zope.app.appsetup, which as
        of this writing ends up depending indirectly on many, many packages, some
        as far flung as zope.app.rotterdam.
        
        .. [#get_vals]
        
        >>> import errno, os, random, socket, tempfile
        >>> dir = tempfile.mkdtemp()
        >>> site_zcml_file = os.path.join(dir, 'site.zcml')
        
        >>> s = socket.socket()
        >>> for i in range(20):
        ...     monitor_port = random.randint(20000, 49151)
        ...     try:
        ...         s.bind(('127.0.0.1', monitor_port))
        ...     except socket.error, e:
        ...         if e.args[0] == errno.EADDRINUSE:
        ...             pass
        ...         else:
        ...             raise
        ...     else:
        ...         s.close()
        ...         break
        ... else:
        ...     assert False, 'could not find available port'
        ...     monitor_port = None
        ...
        
        .. [#process]
        
        >>> zope_conf_file = os.path.join(dir, 'zope.conf')
        >>> f = open(zope_conf_file, 'w')
        >>> f.write(zope_conf)
        >>> f.close()
        >>> f = open(site_zcml_file, 'w')
        >>> f.write(site_zcml)
        >>> f.close()
        
        >>> import zdaemon.zdoptions
        >>> import zope.app.appsetup
        >>> options = zdaemon.zdoptions.ZDOptions()
        >>> options.schemadir = os.path.join(
        ...     os.path.dirname(os.path.abspath(zope.app.appsetup.__file__)),
        ...     'schema')
        >>> options.realize(['-C', zope_conf_file])
        >>> config = options.configroot
        
        >>> import zope.app.appsetup.product
        >>> zope.app.appsetup.product.setProductConfigurations(
        ...     config.product_config)
        >>> ignore = zope.app.appsetup.config(config.site_definition)
        >>> import zope.app.appsetup.appsetup
        >>> db = zope.app.appsetup.appsetup.multi_database(config.databases)[0][0]
        
        >>> import zope.event
        >>> import zc.async.interfaces
        >>> zope.event.notify(zc.async.interfaces.DatabaseOpened(db))
        
        >>> from zc.async.testing import get_poll, wait_for_result
        
        
        -------------------
        Two Database Set Up
        -------------------
        
        Even though it is a bit more trouble to set up, large-scale production usage
        will probably prefer to use this approach, over the shared single database
        described above.
        
        For our zope.conf, we only need one additional stanza to the one seen above::
        
        <zodb async>
        <filestorage>
        create true
        path REPLACE_THIS_WITH_PATH_TO_STORAGE
        </filestorage>
        </zodb>
        
        (You would replace "REPLACE_THIS_WITH_PATH_TO_STORAGE" with the path to the
        storage file.)
        
        As before, you will probably prefer to use ZEO rather than FileStorage in
        production
        
        The zdaemon.conf instructions are the same: set the ZC_ASYNC_UUID environment
        variable properly in the zdaemon.conf file.
        
        For our site.zcml, the only difference is that we use the
        multidb_dispatcher_policy.zcml file rather than the
        basic_dispatcher_policy.zcml file.
        
        If you want to change policy, change "multidb_dispatcher_policy.zcml" to
        "dispatcher.zcml" in the example above and register your replacement bits for
        the policy in "multidb_dispatcher_policy.zcml".  You'll see that most of that
        comes from code in subscribers.py, which can be adjusted easily.
        
        If we process the files described above, and wait for a poll, we've got a
        working set up [#process_multi]_.
        
        >>> import zc.async.dispatcher
        >>> dispatcher = zc.async.dispatcher.get()
        >>> import pprint
        >>> pprint.pprint(get_poll(dispatcher, 0))
        {'': {'main': {'active jobs': [],
        'error': None,
        'len': 0,
        'new jobs': [],
        'size': 3}}}
        >>> bool(dispatcher.activated)
        True
        
        As before, we can ask for a job to be performed, and get the result.
        
        >>> conn = db.open()
        >>> root = conn.root()
        >>> import zc.async.interfaces
        >>> queue = zc.async.interfaces.IQueue(root)
        >>> import operator
        >>> import zc.async.job
        >>> job = queue.put(zc.async.job.Job(operator.mul, 21, 2))
        >>> import transaction
        >>> transaction.commit()
        >>> wait_for_result(job)
        42
        
        Hopefully zc.async will be an easy-to-configure, easy-to-use, and useful tool
        for you! Good luck! [#shutdown]_
        
        .. ......... ..
        .. Footnotes ..
        .. ......... ..
        
        .. [#process_multi]
        
        >>> import errno, os, random, socket, tempfile
        >>> dir = tempfile.mkdtemp()
        >>> site_zcml_file = os.path.join(dir, 'site.zcml')
        
        >>> s = socket.socket()
        >>> for i in range(20):
        ...     monitor_port = random.randint(20000, 49151)
        ...     try:
        ...         s.bind(('127.0.0.1', monitor_port))
        ...     except socket.error, e:
        ...         if e.args[0] == errno.EADDRINUSE:
        ...             pass
        ...         else:
        ...             raise
        ...     else:
        ...         s.close()
        ...         break
        ... else:
        ...     assert False, 'could not find available port'
        ...     monitor_port = None
        ...
        
        >>> zope_conf = """
        ... site-definition %(site_zcml_file)s
        ...
        ... <zodb main>
        ...   <filestorage>
        ...     create true
        ...     path %(main_storage_path)s
        ...   </filestorage>
        ... </zodb>
        ...
        ... <zodb async>
        ...   <filestorage>
        ...     create true
        ...     path %(async_storage_path)s
        ...   </filestorage>
        ... </zodb>
        ...
        ... <product-config zc.z3monitor>
        ...   port %(monitor_port)s
        ... </product-config>
        ...
        ... <logger>
        ...   level debug
        ...   name zc.async
        ...   propagate no
        ...
        ...   <logfile>
        ...     path %(async_event_log)s
        ...   </logfile>
        ... </logger>
        ...
        ... <logger>
        ...   level debug
        ...   name zc.async.trace
        ...   propagate no
        ...
        ...   <logfile>
        ...     path %(async_trace_log)s
        ...   </logfile>
        ... </logger>
        ...
        ... <eventlog>
        ...   <logfile>
        ...     formatter zope.exceptions.log.Formatter
        ...     path STDOUT
        ...   </logfile>
        ...   <logfile>
        ...     formatter zope.exceptions.log.Formatter
        ...     path %(event_log)s
        ...   </logfile>
        ... </eventlog>
        ... """ % {'site_zcml_file': site_zcml_file,
        ...        'main_storage_path': os.path.join(dir, 'main.fs'),
        ...        'async_storage_path': os.path.join(dir, 'async.fs'),
        ...        'monitor_port': monitor_port,
        ...        'event_log': os.path.join(dir, 'z3.log'),
        ...        'async_event_log': os.path.join(dir, 'async.log'),
        ...        'async_trace_log': os.path.join(dir, 'async_trace.log'),}
        ...
        
        >>> os.environ['ZC_ASYNC_UUID'] = os.path.join(dir, 'uuid.txt')
        
        >>> site_zcml = """
        ... <configure xmlns='http://namespaces.zope.org/zope'
        ...            xmlns:meta="http://namespaces.zope.org/meta"
        ...            >
        ... <include package="zope.component" file="meta.zcml" />
        ... <include package="zope.component" />
        ... <include package="zc.z3monitor" />
        ... <include package="zc.async" file="multidb_dispatcher_policy.zcml" />
        ...
        ... <!-- this is usually handled in Zope applications by the
        ...      zope.app.keyreference.persistent.connectionOfPersistent adapter -->
        ... <adapter factory="zc.twist.connection" />
        ... </configure>
        ... """
        
        >>> zope_conf_file = os.path.join(dir, 'zope.conf')
        >>> f = open(zope_conf_file, 'w')
        >>> f.write(zope_conf)
        >>> f.close()
        >>> f = open(site_zcml_file, 'w')
        >>> f.write(site_zcml)
        >>> f.close()
        
        >>> import zdaemon.zdoptions
        >>> import zope.app.appsetup
        >>> options = zdaemon.zdoptions.ZDOptions()
        >>> options.schemadir = os.path.join(
        ...     os.path.dirname(os.path.abspath(zope.app.appsetup.__file__)),
        ...     'schema')
        >>> options.realize(['-C', zope_conf_file])
        >>> config = options.configroot
        
        >>> import zope.app.appsetup.product
        >>> zope.app.appsetup.product.setProductConfigurations(
        ...     config.product_config)
        >>> ignore = zope.app.appsetup.config(config.site_definition)
        >>> import zope.app.appsetup.appsetup
        >>> db = zope.app.appsetup.appsetup.multi_database(config.databases)[0][0]
        
        >>> import zope.event
        >>> import zc.async.interfaces
        >>> zope.event.notify(zc.async.interfaces.DatabaseOpened(db))
        
        >>> from zc.async.testing import get_poll, wait_for_result
        
        .. [#shutdown]
        
        >>> import zc.async.dispatcher
        >>> dispatcher = zc.async.dispatcher.get()
        >>> dispatcher.reactor.callFromThread(dispatcher.reactor.stop)
        >>> dispatcher.thread.join(3)
        
        >>> db.close()
        >>> db.databases['async'].close()
        >>> import shutil
        >>> shutil.rmtree(dir)
        
        =======
        Changes
        =======
        
        1.1.1 (2008-05-14)
        ==================
        
        - more README tweaks.
        
        - converted all reports from the dispatcher, including the monitor output,
        to use "unpacked" integer oids.  This addresses a problem that simplejson
        was having in trying to interpret the packed string blobs as unicode, and
        then making zc.ngi fall over.
        
        - added several more tests for the monitor code.
        
        - made the ``async jobs`` monitor command be "up to the minute".  Before, it
        included all of the new and active jobs from the previous poll; now, it
        also filters out those that have since completed.
        
        - The ``async job`` command was broken, as revealed by a new monitor test.
        Fixed, which also means we need a new version of zope.bforest (1.2) for a new
        feature there.
        
        1.1 (2008-04-24)
        ================
        
        - Fired events when the IQueues and IQueue objects are installed by the
        QueueInstaller (thanks to Fred Drake).
        
        - Dispatchers make agent threads keep their connections, so each connection's
        object cache use is optimized if the agent regularly requests jobs with
        the same objects.
        
        - README improved (thanks to Benji York and Sebastian Ware).
        
        - Callbacks are logged at start in the trace log.
        
        - All job results (including callbacks) are logged, including verbose
        tracebacks if the callback generated a failure.
        
        - Had the ThreadedDispatcherInstaller subscriber stash the thread on the
        dispatcher, so you can shut down tests like this:
        
        >>> import zc.async.dispatcher
        >>> dispatcher = zc.async.dispatcher.get()
        >>> dispatcher.reactor.callFromThread(dispatcher.reactor.stop)
        >>> dispatcher.thread.join(3)
        
        - Added ``getQueue`` to zc.async.local as a convenience (it does what you
        could already do: ``zc.async.local.getJob().queue``).
        
        - Clarified that ``IQueue.pull`` is the approved way of removing scheduled jobs
        from a queue in interfaces and README.
        
        - reports in the logs of a job's success or failure come before callbacks are
        started.
        
        - Added a section showing how the basic_dispatcher_policy.zcml worked, which
        then pushed the former README_3 examples into README_3b.
        
        - Put ZPL everywhere I was supposed to.
        
        - Moved a number of helpful testing functions out of footnotes and into
        zc.async.testing, both so that zc.async tests don't have to redefine them
        and client packages can reuse them.
        
        1.0 (2008-04-09)
        ================
        
        Initial release.
        
        
        
Platform: UNKNOWN
