The dispatcher is the code responsible for actually performing the
zc.async jobs.

Typically, the dispatcher is a singleton for a given process.  Multiple
dispatchers can be created at once, primarily for ease of testing.

Dispatchers expect to get a "reactor" to power the timed calls.  This reactor
must have several methods:

class IReactor(zope.interface.Interface):
    
    def callFromThread(callable, *args, **kw):
        """have callable run in reactor's thread, by reactor, ASAP.
        
        Intended to be called from a thread other than the reactor's main
        loop.
        """
    
    def callInThread(callable, *args, **kw):
        """have callable run in a separate thread, ASAP.
        
        Must be called in same thread as reactor's main loop.
        """
    
    def callLater(seconds, callable, *args, **kw):
        """have callable run in reactor at least <seconds> from now
        
        Must be called in same thread as reactor's main loop.
        """

    def addSystemEventTrigger(phase, event, callable, *args, **kw):
        """Install a callable to be run in phase of event.
        
        must support phase 'before', and event 'shutdown'.
        """

    def callWhenRunning(self, _callable, *args, **kw):
        """run callable now if running, or when started.
        """

The twisted reactors provide these necessary methods.  You can also write
your own reactor.

For this example, we will use the twisted select reactor running in another
thread.  Other doctests in this package use a test reactor defined in the
zc.async.testing module, as an example of a write-your-own reactor.

To instantiate the dispatcher, we need a reactor and a db.  We'll create it,
then start it off in the thread.  We'll assume we already have the db and the
necessary adapter registrations [#setUp]_.

Here's the reactor.  The _handleSignals just lets the reactor handle signals.
In most real world usage you'll need to be more careful, hooking into
the signal handling of your larger app.  Look at the code in
``zc.async.subscribers.ThreadedDispatcherInstaller`` for an example that should
be sufficient for Zope.

    >>> import twisted.internet.selectreactor
    >>> reactor = twisted.internet.selectreactor.SelectReactor()
    >>> reactor._handleSignals()

Now here's our dispatcher.  The poll_interval is a number of seconds (float or
int).  It defaults to 5 but we're setting it to 0.5 in order to make this run
faster when this document is run as a test.

    >>> import zc.async.dispatcher
    >>> dispatcher = zc.async.dispatcher.Dispatcher(
    ...     db, reactor, poll_interval=0.5)
    >>> import zc.async.instanceuuid
    >>> dispatcher.UUID is zc.async.instanceuuid.UUID
    True
    >>> dispatcher.reactor is reactor
    True
    >>> dispatcher.db is db
    True
    >>> dispatcher.poll_interval
    0.5

Now we're ready to start up.  Notice the current size of the db's connection
pool.  When we activate the dispatcher we'll see that this automatically ups
the connection pool for the db by 1, to include the dispatcher's poll.

    >>> db.getPoolSize()
    7

    >>> import threading
    >>> def start():
    ...     dispatcher.activate()
    ...     reactor.run(installSignalHandlers=0)
    ...
    >>> thread = threading.Thread(target=start)
    >>> thread.setDaemon(True)
    >>> import datetime
    >>> initial = datetime.datetime.utcnow()
    >>> thread.start()

The dispatcher should be starting up now.  Let's wait for it to activate.
We're using a test convenience, get_poll, defined in the footnotes
[#get_poll]_.

    >>> poll = get_poll(0)
    >>> poll == {}
    True
    >>> initial <= poll.utc_timestamp <= datetime.datetime.utcnow()
    True

Now the pool size has increased, as we mentioned above.

    >>> db.getPoolSize()
    8

Now let's add a queues folder and a queue.  The queues folder is always
expected to be in the zc.async.interfaces.KEY key of the database root.

    >>> import zc.async.queue
    >>> import zc.async.interfaces
    >>> container = root[zc.async.interfaces.KEY] = zc.async.queue.Queues()
    >>> queue = container[''] = zc.async.queue.Queue()
    >>> import transaction
    >>> transaction.commit()

Now the next poll will register and activate the dispatcher in the queue.

    >>> poll = get_poll()

This accomplished the following things.

- The dispatcher registered and activated itself with the queue.

    >>> t = transaction.begin() # sync
    >>> list(queue.dispatchers) == [dispatcher.UUID]
    True
    >>> list(queue.dispatchers[dispatcher.UUID]) # these would be agent names
    []
    >>> queue.dispatchers[dispatcher.UUID].UUID == dispatcher.UUID
    True
    >>> bool(queue.dispatchers[dispatcher.UUID].activated)
    True

- The queue fired events to announce the dispatcher's registration and
  activation.  We could have registered subscribers for either or both
  of these events to create agents.
  
  Note that the dispatcher in queue.dispatchers is a persistent
  representative of the actual dispatcher: they are different objects.

    >>> from zope.component import eventtesting
    >>> import zc.async.interfaces
    >>> evs = eventtesting.getEvents(
    ...     zc.async.interfaces.IDispatcherRegistered)
    >>> evs # doctest: +ELLIPSIS
    [<zc.async.interfaces.DispatcherRegistered object at ...>]
    >>> evs[0].object._p_oid == queue.dispatchers[dispatcher.UUID]._p_oid
    True

    >>> evs = eventtesting.getEvents(
    ...     zc.async.interfaces.IDispatcherActivated)
    >>> evs # doctest: +ELLIPSIS
    [<zc.async.interfaces.DispatcherActivated object at ...>]
    >>> evs[0].object._p_oid == queue.dispatchers[dispatcher.UUID]._p_oid
    True

- The dispatcher made its first ping. A ping means that the dispatcher changes
  a datetime to record that it is alive. 

    >>> queue.dispatchers[dispatcher.UUID].last_ping is not None
    True

  The dispatcher needs to update its last_ping after every ``ping_interval``
  seconds.  If it has not updated the last_ping after ``ping_death_interval``
  then the dispatcher is considered to be dead, and active jobs in the
  dispatcher's agents are ended (and given a chance to respond to that status
  change, so they can put themselves back on the queue to be restarted if
  desired).

    >>> queue.dispatchers[dispatcher.UUID].ping_interval
    datetime.timedelta(0, 30)
    >>> queue.dispatchers[dispatcher.UUID].ping_death_interval
    datetime.timedelta(0, 60)

- We have some log entries.  (We're using some magic log handlers inserted by
  setup code in tests.py here.)
  
    >>> print event_logs # doctest: +ELLIPSIS
    zc.async.events INFO
      attempting to activate dispatcher ...

    >>> print trace_logs # doctest: +ELLIPSIS
    zc.async.trace DEBUG
      poll ...

So the dispatcher is running now.  It still won't do any jobs until we tell
it the kind of jobs it should perform.  Let's add in our default agent, with
the default configuration that it is willing to perform any job.

    >>> import zc.async.agent
    >>> agent = zc.async.agent.Agent()
    >>> queue.dispatchers[dispatcher.UUID]['main'] = agent
    >>> transaction.commit()

The next poll will include the fact that it asked the 'main' agent for
a job.

    >>> poll = get_poll()
    >>> import pprint
    >>> pprint.pprint(dict(poll))
    {'': {'main': {'active jobs': [],
                   'error': None,
                   'len': 0,
                   'new jobs': [],
                   'size': 3}}}

The pool size for the db has increased again to account for the size of the
agent.

    >>> db.getPoolSize()
    11

We can actually get it to perform some jobs now.  Here's a silly simple
one.  We use a test convenience, wait_for_result, defined in the footnotes
[#wait_for_result]_.

    >>> import operator
    >>> job1 = queue.put(
    ...     zc.async.job.Job(operator.mul, 14, 3))
    >>> print job1.result
    None
    >>> transaction.commit()

    >>> wait_for_result(job1)
    42

That's cool.  We have a poll object that has a record of this too.

    >>> for poll in dispatcher.polls:
    ...     if (poll.get('') and poll[''].get('main') and
    ...         poll['']['main']['new jobs']):
    ...         break
    ... else:
    ...     assert False, 'poll not found'
    ...
    >>> pprint.pprint(dict(poll)) # doctest: +ELLIPSIS
    {'': {'main': {'active jobs': [],
                   'error': None,
                   'len': 0,
                   'new jobs': [('\x00...', 'unnamed')],
                   'size': 3}}}

We also have some log entries.

    >>> info = debug = None
    >>> for r in reversed(trace_logs.records):
    ...     if info is None and r.levelname == 'INFO':
    ...         info = r
    ...     elif debug is None and r.levelname == 'DEBUG':
    ...         debug = r
    ...     elif info is not None and debug is not None:
    ...         break
    ... else:
    ...     assert False, 'could not find'
    ...
    >>> print info.getMessage() # doctest: +ELLIPSIS +NORMALIZE_WHITESPACE
    <zc.async.job.Job (oid ..., db 'unnamed')
     ``<built-in function mul>(14, 3)``> succeeded in thread ... with result:
    42

    >>> print debug.getMessage() # doctest: +ELLIPSIS +NORMALIZE_WHITESPACE
    poll ...:
      {'':
        {'main':
          {'active jobs': [], 'error': None,
           'new jobs': [('\x...', 'unnamed')], 'len': 0, 'size': 3}}}
    

[#getPollInfo]_ Notice our ``new jobs`` from the poll and the log has a value
in it now. We can get some information about that job from the dispatcher.

    >>> info = dispatcher.getJobInfo(*poll['']['main']['new jobs'][0])
    >>> pprint.pprint(info)
    ... # doctest: +ELLIPSIS
    {'call': "<zc.async.job.Job (oid ..., db 'unnamed') ``<built-in function mul>(14, 3)``>",
     'completed': datetime.datetime(...),
     'failed': False,
     'poll id': ...,
     'quota names': (),
     'result': '42',
     'started': datetime.datetime(...),
     'thread': ...}
     >>> info['thread'] is not None
     True
     >>> info['poll id'] is not None
     True

Notice that the result is a repr.  If this had been a failure, it would have
been a (very) verbose traceback [#show_error]_.

As seen in other documents in zc.async, the job can also be a method of a
persistent object, affecting a persistent object.

    >>> import BTrees.Length
    >>> length = root['length'] = BTrees.Length.Length()
    >>> length()
    0
    >>> job2 = queue.put(zc.async.job.Job(length.change, 4))
    >>> transaction.commit()
    >>> wait_for_result(job2)
    >>> length()
    4

``zc.async.local`` also allows some fun tricks.  Your method can access the
job--for instance, to access the queue and put another job in, or access
annotations on the job, as of the last database sync for the thread's
connection (at transaction boundaries).

    >>> import zc.async
    >>> def hand_off():
    ...     job = zc.async.local.getJob()
    ...     return job.queue.put(zc.async.job.Job(operator.mul, 21, 2))
    ...
    >>> job3 = queue.put(hand_off)
    >>> transaction.commit()

    >>> wait_for_result(job3)
    42

It can also get and set job annotations *live, in another connection*. 
This allows you to send messages about job progress, or get live
information about whether you should change or stop your work, for
instance.

An important caveat about this is that the annotations, whether a get or
a set, must not be persistent objects, or contain them directly or indirectly.
We use a new test convenience , wait_for_annotation, defined in the footnotes
[#wait_for_annotation]_.

    >>> def annotation_func():
    ...     zc.async.local.setLiveAnnotation('hello', 'from thread!')
    ...     reply = zc.async.local.getLiveAnnotation(
    ...         'reply', timeout=3, poll=0.1)
    ...     local = zc.async.local.getJob().annotations.get('reply', 'MISSING')
    ...     return 'reply is %r.  Locally it is %s.' % (reply, local)
    ...
    >>> job4 = queue.put(annotation_func)
    >>> transaction.commit()
    
    >>> wait_for_annotation(job4, 'hello')
    'from thread!'
    >>> job4.annotations['reply'] = 'HIYA'
    >>> transaction.commit()
    >>> wait_for_result(job4)
    "reply is 'HIYA'.  Locally it is MISSING."

We can analyze the work the dispatcher has done. The records for this generally
only go back about ten or twelve minutes--just enough to get a feel for the
current health of the dispatcher. Use the log if you want more long-term
analysis.

    >>> pprint.pprint(dispatcher.getStatistics()) # doctest: +ELLIPSIS
    {'failed': 1,
     'longest active': None,
     'longest failed': ('\x00...', 'unnamed'),
     'longest successful': ('\x00...', 'unnamed'),
     'shortest active': None,
     'shortest failed': ('\x00...', 'unnamed'),
     'shortest successful': ('\x00...', 'unnamed'),
     'started': 6,
     'statistics end': datetime.datetime(...),
     'statistics start': datetime.datetime(...),
     'successful': 5,
     'unknown': 0}

We can get a report on the reactor's status.

    >>> pprint.pprint(dispatcher.getStatusInfo()) # doctest: +ELLIPSIS
    {'poll interval': datetime.timedelta(0, 0, 500000),
     'status': 'RUNNING',
     'time since last poll': datetime.timedelta(...),
     'uptime': datetime.timedelta(...)}

When we stop the reactor, the dispatcher also deactivates.

    >>> reactor.callFromThread(reactor.stop)
    >>> thread.join(3)

    >>> pprint.pprint(dispatcher.getStatusInfo()) # doctest: +ELLIPSIS
    {'poll interval': datetime.timedelta(0, 0, 500000),
     'status': 'STOPPED',
     'time since last poll': None,
     'uptime': None,
     'uuid': UUID('...')}

The db's pool size has returned to the original value.

    >>> db.getPoolSize()
    7

.. ......... ..
.. Footnotes ..
.. ......... ..

.. [#setUp]

    >>> import ZODB.FileStorage
    >>> storage = ZODB.FileStorage.FileStorage(
    ...     'main.fs', create=True)
    >>> from ZODB.DB import DB 
    >>> db = DB(storage) 
    >>> conn = db.open()
    >>> root = conn.root()
    >>> import zc.async.configure
    >>> zc.async.configure.base()

.. [#get_poll]

    >>> import time
    >>> def get_poll(count = None):
    ...     if count is None:
    ...         count = len(dispatcher.polls)
    ...     for i in range(30):
    ...         if len(dispatcher.polls) > count:
    ...             return dispatcher.polls.first()
    ...         time.sleep(0.1)
    ...     else:
    ...         assert False, 'no poll!'
    ... 

.. [#wait_for_result]

    >>> import zc.async.interfaces
    >>> def wait_for_result(job):
    ...     for i in range(30):
    ...         t = transaction.begin()
    ...         if job.status == zc.async.interfaces.COMPLETED:
    ...             return job.result
    ...         time.sleep(0.1)
    ...     else:
    ...         assert False, 'job never completed'
    ...

.. [#getPollInfo] The dispatcher has a ``getPollInfo`` method that lets you
    find this poll information also.
    
    >>> dispatcher.getPollInfo(at=poll.key) is poll
    True
    >>> dispatcher.getPollInfo(at=poll.utc_timestamp) is poll
    True
    >>> dispatcher.getPollInfo(before=poll.key) is not poll
    True
    >>> dispatcher.getPollInfo(before=poll.utc_timestamp) is not poll
    True
    >>> dispatcher.getPollInfo(before=poll.key-16) is poll
    True
    >>> dispatcher.getPollInfo(
    ...     before=poll.utc_timestamp + datetime.timedelta(seconds=0.4)
    ...     ) is poll
    True

.. [#show_error] OK, so you want to see a verbose traceback?  OK, you asked
    for it. We're eliding more than 90% of this, and this is a small one,
    believe it or not. Rotate your logs!
    
    Notice that all of the values in the logs are reprs.

    >>> bad_job = queue.put(
    ...     zc.async.job.Job(operator.mul, 14, None))
    >>> transaction.commit()

    >>> wait_for_result(bad_job)
    <zc.twist.Failure exceptions.TypeError>
    
    >>> for r in reversed(trace_logs.records):
    ...     if r.levelname == 'ERROR':
    ...         break
    ... else:
    ...     assert False, 'could not find log'
    ...
    >>> print r.getMessage() # doctest: +ELLIPSIS +NORMALIZE_WHITESPACE
    <zc.async.job.Job (oid ..., db 'unnamed')
     ``<built-in function mul>(14, None)``> failed in thread ... with result:
    *--- Failure #... (pickled) ---
    .../zc/async/job.py:...: _call_with_retry(...)
     [ Locals ]...
     ( Globals )...
    .../zc/async/job.py:...: <lambda>(...)
     [ Locals ]...
     ( Globals )...
    exceptions.TypeError: unsupported operand type(s) for *: 'int' and 'NoneType'
    *--- End of Failure #... ---
    <BLANKLINE>

.. [#wait_for_annotation]

    >>> def wait_for_annotation(job, name):
    ...     for i in range(30):
    ...         t = transaction.begin()
    ...         if name in job.annotations:
    ...             return job.annotations[name]
    ...         time.sleep(0.1)
    ...     else:
    ...         assert False, 'TIMEOUT'
    ...
