=================
Persistent Queues
=================

Persistent queues are simply queues that are optimized for persistency via the
ZODB. They assume that the ZODB is using MVCC to avoid read conflicts. They
attempt to resolve write conflicts so that transactions that add and remove
objects simultaneously are merged, unless the transactions are trying to
remove the same value from the queue.

An important characteristic of these queues is that they do not expect to
hold more than one reference to any given equivalent item at a time.  For
instance, some of the conflict resolution features will not perform
desirably if it is reasonable for your application to hold two copies of the
string "hello" within the same queue at once.

The module provides two flavors: a simple persistent queue that keeps all
contained objects in one persistent object (`PersistentQueue`), and a
persistent queue that divides up its contents into multiple composite
elements (`CompositePersistentQueue`). They should be equivalent in terms of
API and so are mostly examined in the abstract in this document: we'll generate
instances with a representative `Queue` factory, that could be either class.
They only differ in an aspect of their write conflict resolution behavior,
which is discussed below.

Queues can be instantiated with no arguments.

    >>> q = Queue()
    >>> from zc.queue.interfaces import IQueue
    >>> from zope.interface.verify import verifyObject
    >>> verifyObject(IQueue, q)
    True

The basic API is simple: use `put` to add items to the back of the queue, and
`pull` to pull things off the queue, defaulting to the front of the queue.

    >>> q.put(1)
    >>> q.put(2)
    >>> q.pull()
    1
    >>> q.put(3)
    >>> q.pull()
    2
    >>> q.pull()
    3

The `pull` method takes an optional zero-based index argument, and can accept
negative values.

    >>> q.put(4)
    >>> q.put(5)
    >>> q.put(6)
    >>> q.pull(-1)
    6
    >>> q.pull(1)
    5
    >>> q.pull(0)
    4

Requesting an item from an empty queue raises an IndexError.

    >>> q.pull() # doctest: +ELLIPSIS
    Traceback (most recent call last):
    ...
    IndexError: ...

Requesting an invalid index value does the same.

    >>> q.put(7)
    >>> q.put(8)
    >>> q.pull(2) # doctest: +ELLIPSIS
    Traceback (most recent call last):
    ...
    IndexError: ...

Beyond these core queue operations, queues support len...

    >>> len(q)
    2
    >>> q.pull()
    7
    >>> len(q)
    1
    >>> q.pull()
    8
    >>> len(q)
    0

...iter (which does *not* empty the queue)...

    >>> iter(q).next()
    Traceback (most recent call last):
    ...
    StopIteration
    >>> q.put(9)
    >>> q.put(10)
    >>> q.put(11)
    >>> iter(q).next()
    9
    >>> [i for i in q]
    [9, 10, 11]
    >>> q.pull()
    9
    >>> [i for i in q]
    [10, 11]

...bool...

    >>> bool(q)
    True
    >>> q.pull()
    10
    >>> q.pull()
    11
    >>> bool(q)
    False

...and list-like bracket access (which again does *not* empty the queue).

    >>> q.put(12)
    >>> q[0]
    12
    >>> q.pull()
    12
    >>> q[0] # doctest: +ELLIPSIS
    Traceback (most recent call last):
    ...
    IndexError: ...
    >>> for i in range (13, 23):
    ...     q.put(i)
    ...
    >>> q[0]
    13
    >>> q[1]
    14
    >>> q[2]
    15
    >>> q[-1]
    22
    >>> q[-10]
    13

That's it--there's no additional way to add anything beyond `put`, and no
additional way to remove anything beyond `pull`.

The only other wrinkle is the conflict resolution code.  To show this, we
will have to have two copies of the same queue, from two different connections.

NOTE: this testing approach has known weaknesses.  See discussion in tests.py.

    >>> import transaction
    >>> from zc.queue.tests import ConflictResolvingMappingStorage
    >>> from ZODB import DB
    >>> db = DB(ConflictResolvingMappingStorage('test'))
    >>> transactionmanager_1 = transaction.TransactionManager()
    >>> transactionmanager_2 = transaction.TransactionManager()
    >>> connection_1 = db.open(transaction_manager=transactionmanager_1)
    >>> root_1 = connection_1.root()

    >>> q = Queue()
    >>> q.__name__ = "queue"
    >>> root_1["queue"] = q
    >>> del q
    >>> transactionmanager_1.commit()
    >>> q_1 = root_1['queue']

    >>> transactionmanager_2 = transaction.TransactionManager()
    >>> connection_2 = db.open(transaction_manager=transactionmanager_2)
    >>> root_2 = connection_2.root()
    >>> q_2 = root_2['queue']

Now we have two copies of the same queue, with separate transaction managers
and connections about the same way two threads would have them. The '_1'
suffix identifies the objects for user 1, in thread 1; and the '_2' suffix
identifies the objects for user 2, in a concurrent thread 2.

First, let's have the two users add some items to the queue concurrently.
For concurrent commits of only putting a single new item (one each in two
transactions), in both types of queue the user who commits first gets the
lower position in the queue--that is, the position that will leave the queue
sooner using default `pull` calls.

In this example, even though q_1 is modified first, q_2's transaction is
committed first, so q_2's addition is first after the merge.

    >>> q_1.put(1001)
    >>> q_2.put(1000)
    >>> transactionmanager_2.commit()
    >>> transactionmanager_1.commit()
    >>> connection_1.sync()
    >>> connection_2.sync()
    >>> list(q_1)
    [1000, 1001]
    >>> list(q_2)
    [1000, 1001]

For commits of more than one additions per connection of two, or of more than
two concurrent adding transactions, the behavior is the same for the
PersistentQueue: the first commit's additions will go before the second
commit's.

    >>> from zc import queue
    >>> if isinstance(q_1, queue.PersistentQueue):
    ...     for i in range(5):
    ...         q_1.put(i)
    ...     for i in range(1002, 1005):
    ...         q_2.put(i)
    ...     transactionmanager_2.commit()
    ...     transactionmanager_1.commit()
    ...     connection_1.sync()
    ...     connection_2.sync()
    ...

As we'll see below, that will again reliably put all the values from the first
commit earlier in the queue, to result in
[1000, 1001, 1002, 1003, 1004, 0, 1, 2, 3, 4].

For the CompositePersistentQueue, the behavior is different.  The order
will be maintained with a set of additions in a transaction, but the values
may be merged between the two transactions' additions.  We will compensate
for that here to get a reliable queue state.

    >>> if isinstance(q_1, queue.CompositePersistentQueue):
    ...     for i1, i2 in ((1002, 1003), (1004, 0), (1, 2), (3, 4)):
    ...         q_1.put(i1)
    ...         q_2.put(i2)
    ...         transactionmanager_1.commit()
    ...         transactionmanager_2.commit()
    ...         connection_1.sync()
    ...         connection_2.sync()
    ...

Whichever kind of queue we have, we now have the following values.

    >>> list(q_1)
    [1000, 1001, 1002, 1003, 1004, 0, 1, 2, 3, 4]
    >>> list(q_2)
    [1000, 1001, 1002, 1003, 1004, 0, 1, 2, 3, 4]

If two users try to add the same item, then a conflict error is raised.

    >>> q_1.put(5)
    >>> q_2.put(5)
    >>> transactionmanager_1.commit()
    >>> transactionmanager_2.commit() # doctest: +ELLIPSIS
    Traceback (most recent call last):
    ...
    ConflictError: ...
    >>> transactionmanager_2.abort()
    >>> connection_1.sync()
    >>> connection_2.sync()
    >>> list(q_1)
    [1000, 1001, 1002, 1003, 1004, 0, 1, 2, 3, 4, 5]
    >>> list(q_2)
    [1000, 1001, 1002, 1003, 1004, 0, 1, 2, 3, 4, 5]

Users can also concurrently remove items from a queue...

    >>> q_1.pull()
    1000
    >>> q_1[0]
    1001

    >>> q_2.pull(5)
    0
    >>> q_2[5]
    1

    >>> q_2[0] # 1000 value still there in this connection
    1000

    >>> q_1[4] # 0 value still there in this connection.
    0

    >>> transactionmanager_1.commit()
    >>> transactionmanager_2.commit()
    >>> connection_1.sync()
    >>> connection_2.sync()
    >>> list(q_1)
    [1001, 1002, 1003, 1004, 1, 2, 3, 4, 5]
    >>> list(q_2)
    [1001, 1002, 1003, 1004, 1, 2, 3, 4, 5]

...as long as they don't remove the same item.

    >>> q_1.pull()
    1001
    >>> q_2.pull()
    1001
    >>> transactionmanager_1.commit()
    >>> transactionmanager_2.commit() # doctest: +ELLIPSIS
    Traceback (most recent call last):
    ...
    ConflictError: ...
    >>> transactionmanager_2.abort()
    >>> connection_1.sync()
    >>> connection_2.sync()
    >>> list(q_1)
    [1002, 1003, 1004, 1, 2, 3, 4, 5]
    >>> list(q_2)
    [1002, 1003, 1004, 1, 2, 3, 4, 5]

Also importantly, users can concurrently remove and add items to a queue.

    >>> q_1.pull()
    1002
    >>> q_1.pull()
    1003
    >>> q_1.pull()
    1004
    >>> q_2.put(6)
    >>> q_2.put(7)
    >>> transactionmanager_1.commit()
    >>> transactionmanager_2.commit()
    >>> connection_1.sync()
    >>> connection_2.sync()
    >>> list(q_1)
    [1, 2, 3, 4, 5, 6, 7]
    >>> list(q_2)
    [1, 2, 3, 4, 5, 6, 7]

As a final example, we'll show the conflict resolution code under extreme
duress, with multiple simultaneous puts and pulls.

    >>> res_1 = []
    >>> for i in range(6, -1, -2):
    ...     res_1.append(q_1.pull(i))
    ...
    >>> res_1
    [7, 5, 3, 1]
    >>> res_2 = []
    >>> for i in range(5, 0, -2):
    ...     res_2.append(q_2.pull(i))
    ...
    >>> res_2
    [6, 4, 2]
    >>> for i in range(8, 12):
    ...     q_1.put(i)
    ...
    >>> for i in range(12, 16):
    ...     q_2.put(i)
    ...
    >>> list(q_1)
    [2, 4, 6, 8, 9, 10, 11]
    >>> list(q_2)
    [1, 3, 5, 7, 12, 13, 14, 15]
    >>> transactionmanager_1.commit()
    >>> transactionmanager_2.commit()
    >>> connection_1.sync()
    >>> connection_2.sync()

After these commits, if the queue is a PersistentQueue, the new values are
in the order of their commit.  However, as discussed above, if the queue is
a CompositePersistentQueue the behavior is different. While the order will be
maintained comparitively--so if object `A` is ahead of object `B` in the queue
on commit then `A` will still be ahead of `B` after a merge of the conflicting
transactions--values may be interspersed between the two transactions.

For instance, if our example queue were a PersistentQueue, the values would
be [8, 9, 10, 11, 12, 13, 14, 15].  However, if it were a
CompositePersistentQueue, the values might be the same, or might be any
combination in which [8, 9, 10, 11] and [12, 13, 14, 15], from the two
transactions, are still in order.  One ordering might be
[8, 9, 12, 13, 10, 11, 14, 15], for instance.

    >>> if isinstance(q_1, queue.PersistentQueue):
    ...     res_1 = list(q_1)
    ...     res_2 = list(q_2)
    ... elif isinstance(q_1, queue.CompositePersistentQueue):
    ...     firstsrc_1 = list(q_1)
    ...     firstsrc_2 = list(q_2)
    ...     secondsrc_1 = firstsrc_1[:]
    ...     secondsrc_2 = firstsrc_2[:]
    ...     for val in [12, 13, 14, 15]:
    ...         firstsrc_1.remove(val)
    ...         firstsrc_2.remove(val)
    ...     for val in [8, 9, 10, 11]:
    ...         secondsrc_1.remove(val)
    ...         secondsrc_2.remove(val)
    ...     res_1 = firstsrc_1 + secondsrc_1
    ...     res_2 = firstsrc_2 + secondsrc_2
    ...
    >>> res_1
    [8, 9, 10, 11, 12, 13, 14, 15]
    >>> res_2
    [8, 9, 10, 11, 12, 13, 14, 15]

    >>> db.close() # cleanup
