Metadata-Version: 2.1
Name: esdbclient
Version: 1.0a3
Summary: Python gRPC Client for EventStoreDB
Home-page: https://github.com/pyeventsourcing/esdbclient
License: BSD 3-Clause
Author: John Bywater
Author-email: john.bywater@appropriatesoftware.net
Requires-Python: >=3.7,<4.0
Classifier: Development Status :: 3 - Alpha
Classifier: License :: OSI Approved :: BSD License
Classifier: License :: Other/Proprietary License
Classifier: Programming Language :: Python
Classifier: Programming Language :: Python :: 3
Classifier: Programming Language :: Python :: 3.7
Classifier: Programming Language :: Python :: 3.8
Classifier: Programming Language :: Python :: 3.9
Classifier: Programming Language :: Python :: 3.10
Classifier: Programming Language :: Python :: 3.11
Classifier: Programming Language :: Python :: 3
Classifier: Programming Language :: Python :: 3.10
Classifier: Programming Language :: Python :: 3.11
Classifier: Programming Language :: Python :: 3.7
Classifier: Programming Language :: Python :: 3.8
Classifier: Programming Language :: Python :: 3.9
Requires-Dist: dnspython (>=2.3.0,<3.0.0)
Requires-Dist: grpcio (>=1.51.0,!=1.52.*)
Requires-Dist: protobuf (>=3.11.0)
Requires-Dist: typing_extensions
Project-URL: Repository, https://github.com/pyeventsourcing/esdbclient
Description-Content-Type: text/markdown

from esdbclient import AsyncioESDBClient# Python gRPC Client for EventStoreDB

This [Python package](https://pypi.org/project/esdbclient/) provides a Python
gRPC client for [EventStoreDB](https://www.eventstore.com/).

This client has been developed in collaboration with the EventStoreDB
team. Although not all the features of EventStoreDB are supported
by this client, many of the most useful ones are presented
in an easy-to-use interface.

This client has been tested to work with EventStoreDB LTS versions 21.10,
without and without SSL/TLS, and with Python versions 3.7 to 3.11. There
is 100% test coverage. The code has typing annotations, checked with mypy.
The code is formatted with black and isort, and checked with flake8. Poetry
is used for package management during development, and for building and
publishing distributions to [PyPI](https://pypi.org/project/esdbclient/).

## Synopsis

The `ESDBClient` class can be imported from the `esdbclient` package.

To run the client, you will need a connection string URI. And, to
connect to a "secure" EventStoreDB server, you will also need an
SSL/TLS certificate.

Probably the three most useful methods of `ESDBClient` are:

* `append_events()` This method can be used to record
events in a particular "stream". This is useful when executing a command
in an application. Either all or none of the events will be recorded.

* `read_stream_events()` This method can be used to retrieve all the recorded
events in a "stream". This is useful for reconstructing an aggregate before
executing a command in an application.

* `subscribe_all_events()` This method can be used to receive all recorded
events across all "streams". This is useful in event-processing components,
and supports processing events with "exactly-once" semantics.

The example below uses an "insecure" EventStoreDB server running locally on port 2114.

```python
import esdbclient, uuid


# Construct ESDBClient with an EventStoreDB URI.

client = esdbclient.ESDBClient(uri="esdb://localhost:2114?Tls=false")


# Append events to a new stream.

stream_name = str(uuid.uuid4())

event1 = esdbclient.NewEvent(type='OrderCreated', data=b'data1')

client.append_events(
    stream_name=stream_name,
    expected_position=None,
    events=[event1],
)


# Append more events to an existing stream.

event2 = esdbclient.NewEvent(type='OrderUpdated', data=b'data2')
event3 = esdbclient.NewEvent(type='OrderDeleted', data=b'data3')

client.append_events(
    stream_name=stream_name,
    expected_position=0,
    events=[event2, event3],
)


# Read all events recorded in a stream.

recorded = client.read_stream_events(
    stream_name=stream_name
)

assert len(recorded) == 3
assert recorded[0].data == event1.data
assert recorded[1].data == event2.data
assert recorded[2].data == event3.data
assert recorded[0].type == event1.type
assert recorded[1].type == event2.type
assert recorded[2].type == event3.type


# In an event-processing component, use a "catch-up" subscription
# to receive all events across all streams, including events that
# have not yet been recorded, starting from the component's last
# saved "commit position".

last_saved_commit_position = 0

subscription = client.subscribe_all_events(
    commit_position=last_saved_commit_position
)

# To implement "exactly-once" semantics, iterate over the
# "catch-up" subscription. Process each received event,
# in turn, through an event-processing policy. Save the
# value of the commit_position attribute of the processed
# event with new state generated by the policy in the same
# atomic transaction. Use the last saved "commit position"
# when restarting the "catch-up" subscription.

received = []
for event in subscription:
    received.append(event)
    if event.id == event3.id:
        break

assert received[-3].data == event1.data
assert received[-2].data == event2.data
assert received[-1].data == event3.data
assert received[-3].type == event1.type
assert received[-2].type == event2.type
assert received[-1].type == event3.type

assert received[-3].commit_position > 0
assert received[-2].commit_position > received[-3].commit_position
assert received[-1].commit_position > received[-2].commit_position


# Close the client after use.

client.close()
```

See below for more details.

For an example of usage, see the [eventsourcing-eventstoredb](
https://github.com/pyeventsourcing/eventsourcing-eventstoredb) package.

## Table of contents

<!-- TOC -->
* [Install package](#install-package)
  * [From PyPI](#from-pypi)
  * [With Poetry](#with-poetry)
* [EventStoreDB server](#eventstoredb-server)
  * [Run container](#run-container)
  * [Stop container](#stop-container)
* [EventStoreDB client](#eventstoredb-client)
  * [Import class](#import-class)
  * [Construct client](#construct-client)
* [Streams](#streams)
  * [Append events](#append-events)
  * [Append event](#append-event)
  * [Idempotent append operations](#idempotent-append-operations)
  * [Read stream events](#read-stream-events)
  * [Read all events](#read-all-events)
  * [Get current stream position](#get-current-stream-position)
  * [Get current commit position](#get-current-commit-position)
  * [Get stream metadata](#get-stream-metadata)
  * [Set stream metadata](#set-stream-metadata)
  * [Delete stream](#delete-stream)
  * [Tombstone stream](#tombstone-stream)
* [Catch-up subscriptions](#catch-up-subscriptions)
  * [How to implement exactly-once event processing](#how-to-implement-exactly-once-event-processing)
  * [Subscribe all events](#subscribe-all-events)
  * [Subscribe stream events](#subscribe-stream-events)
* [Persistent subscriptions](#persistent-subscriptions)
  * [Create subscription](#create-subscription)
  * [Read subscription](#read-subscription)
  * [Get subscription info](#get-subscription-info)
  * [List subscriptions](#list-subscriptions)
  * [Delete subscription](#delete-subscription)
  * [Create stream subscription](#create-stream-subscription)
  * [Read stream subscription](#read-stream-subscription)
  * [Get stream subscription info](#get-stream-subscription-info)
  * [List stream subscriptions](#list-stream-subscriptions)
  * [Delete stream subscription](#delete-stream-subscription)
* [Connection](#connection)
  * [Reconnect](#reconnect)
  * [Close](#close)
* [Notes](#notes)
  * [Connection strings](#connection-strings)
  * [Regular expression filters](#regular-expression-filters)
  * [New event objects](#new-event-objects)
  * [Recorded event objects](#recorded-event-objects)
  * [Asyncio client](#asyncio-client)
* [Contributors](#contributors)
  * [Install Poetry](#install-poetry)
  * [Setup for PyCharm users](#setup-for-pycharm-users)
  * [Setup from command line](#setup-from-command-line)
  * [Project Makefile commands](#project-makefile-commands)
<!-- TOC -->

## Install package

It is recommended to install Python packages into a Python virtual environment.

### From PyPI

You can use pip to install this package directly from
[the Python Package Index](https://pypi.org/project/esdbclient/).

    $ pip install esdbclient

### With Poetry

You can use Poetry to add this package to your pyproject.toml and install it.

    $ poetry add esdbclient

## EventStoreDB server

The EventStoreDB server can be run locally using the official Docker container image.

### Run container

For development, you can run a "secure" EventStoreDB server using the following command.

    $ docker run -d --name eventstoredb-secure -it -p 2113:2113 --env "HOME=/tmp" eventstore/eventstore:21.10.9-buster-slim --dev

As we will see, the client needs an EventStoreDB connection string URI as the value of
its `uri` constructor argument. See the Notes section below for detailed information
about EventStoreDB connection string URIs.

The connection string for this "secure" EventStoreDB server would be:

    esdb://admin:changeit@localhost:2113

To connect to a "secure" server, you will usually need to include a "username"
and a "password" in the connection string, so that the server can authenticate the
client. The default username is "admin" and the default password is "changeit".

When connecting to a "secure" server, the client also needs an SSL/TLS
certificate as the value of its `root_certificates` constructor argument. To connect
to a "secure" server you will need an SSL/TLS certificate so that the client can
authenticate the server. For development, you can either use the SSL/TLS certificate
of the certificate authority used to create the server's certificate, or when using a
single-node cluster, you can use the server certificate itself. You can get the
server certificate with the following Python code.


```python
import ssl

server_certificate = ssl.get_server_certificate(addr=('localhost', 2113))
```

You can also start an "insecure" server using the following command.

    $ docker run -d --name eventstoredb-insecure -it -p 2114:2113 eventstore/eventstore:21.10.9-buster-slim --insecure

The connection string URI for this "insecure" server would be:

    esdb://localhost:2114?Tls=false

As we will see, when connecting to an "insecure" server, there is no need to include
a "username" and a "password" in the connection string. If you do, these values will
be ignored by the client, so that they will not be sent to the server over an
insecure channel.

Please note, the "insecure" connection string uses a query string with the field-value
`Tls=false`. The value of this field is by default `true`. See the Notes section below
for more information about EventStoreDB connection strings and the fields that can be
used in the query string to specify connection options.

### Stop container

To stop and remove the "secure" container, use the following Docker commands.

    $ docker stop eventstoredb-secure
	$ docker rm eventstoredb-secure

To stop and remove the "insecure" container, use the following Docker commands.

    $ docker stop eventstoredb-insecure
	$ docker rm eventstoredb-insecure


## EventStoreDB client

This EventStoreDB client is implemented in the `esdbclient` package with
the `ESDBClient` class.

### Import class

The `ESDBClient` class can be imported from the `esdbclient` package.

```python
from esdbclient import ESDBClient
```

### Construct client

The `ESDBClient` class can be constructed with a `uri` argument, which is required.
And, to connect to a "secure" EventStoreDB server, the optional `root_certificates`
argument is also required.

The `uri` argument is expected to be an EventStoreDB connection string URI that
conforms with the standard EventStoreDB "esdb" or "esdb+discover" URI schemes. The
syntax and semantics of EventStoreDB connection strings are explained in the Notes
section below.

For example, the following connection string specifies that the client should
attempt to create a "secure" connection to port 2113 on "localhost", and use the
client credentials "username" and "password" when making calls to the server.

    esdb://username:password@localhost:2113?Tls=true

The client must be configured to create a "secure" connection to a "secure" server,
or alternatively an "insecure" connection to an "insecure" server. By default, the
client will attempt to create a "secure" connection. And so, when connecting to an
"insecure" server, the connection string must specify that the client should attempt
to make an "insecure" connection.

The following connection string specifies that the client should
attempt to create an "insecure" connection to port 2114 on "localhost".
When connecting to an "insecure" server, the client will ignore any
username and password information included in the connection string,
so that usernames and passwords are not sent over an "insecure" connection.

    esdb://localhost:2114?Tls=false

Please note, the "insecure" connection string uses a query string with the field-value
`Tls=false`. The value of this field is by default `true`. Unless the connection string
URI includes the field-value `Tls=false` in the query string, the `root_certificates`
constructor argument is also required.

When connecting to a "secure" server, the `root_certificates` argument is expected to
be a Python `str` containing PEM encoded SSL/TLS root certificates. This value is
passed directly to `grpc.ssl_channel_credentials()`. It is used for authenticating the
server to the client. It is commonly the certificate of the certificate authority that
was responsible for generating the SSL/TLS certificate used by the EventStoreDB server.
But, alternatively for development, you can use the server's certificate itself.

In the example below, the constructor argument values are taken from the operating
system environment. This is a typical arrangement in a production environment. It is
done this way here so that the code in this documentation can be tested with both
a "secure" and an "insecure" server.

```python
import os

client = ESDBClient(
    uri=os.getenv("ESDB_URI"),
    root_certificates=os.getenv("ESDB_ROOT_CERTIFICATES"),
)
```

See the Notes section below for detailed information about EventStoreDB connection
strings and the fields that can be used in the query string to specify connection
options.

## Streams

In EventStoreDB, a "stream" is a sequence of recorded events that all have
the same "stream name". There will normally be many streams in a database.
Each recorded event has a "stream position" in its stream, and a "commit position"
in the database. The stream positions of the recorded events in a stream is a gapless
sequence starting from zero. The commit positions of the recorded events in the database
form a sequence that is not gapless.

The methods `append_events()`, `read_stream_events()` and `read_all_events()` can
be used to record and read events in the database.

### Append events

*requires leader*

The `append_events()` method can be used to write a sequence of new events atomically
to a "stream". Writing new events either creates a stream, or appends events to the end
of a stream. This method is idempotent (see below).

This method can be used to record atomically all the new
events that are generated when executing a command in an application.

Three arguments are required, `stream_name`, `expected_position`
and `events`.

The `stream_name` argument is required, and is expected to be a Python
`str` that uniquely identifies the stream in the database.

The `expected_position` argument is required, is expected to be: `None`
if events are being written to a new stream, and otherwise an Python `int`
equal to the position in the stream of the last recorded event in the stream.

The `events` argument is required, and is expected to be a sequence of new
event objects to be appended to the named stream. The `NewEvent` class should
be used to construct new event objects (see below).

This method takes an optional `timeout` argument, which is a Python `float` that sets
a deadline for the completion of the gRPC operation.

Streams are created by writing events. The correct value of the `expected_position`
argument when writing the first event of a new stream is `None`. Please note, it is
not possible to somehow create an "empty" stream in EventStoreDB.

The stream positions of recorded events in a stream start from zero, and form a gapless
sequence of integers. The stream position of the first recorded event in a stream is
`0`. And so when appending the second new event to a stream that has one recorded event,
the correct value of the `expected_position` argument is `0`. Similarly, the stream
position of the second recorded event in a stream is `1`, and so when appending the
third new event to a stream that has two recorded events, the correct value of the
`expected_position` argument is `1`. And so on... (There is a theoretical maximum
number of recorded events that any stream can have, but I'm not sure what it is;
maybe 9,223,372,036,854,775,807 because it is implemented as a `long` in C#?)

If there is a mismatch between the given value of the `expected_position` argument
and the position of the last recorded event in a stream, then an `ExpectedPositionError`
exception will be raised. This effectively accomplishes optimistic concurrency control.

If you wish to disable optimistic concurrency control when appending new events, you
can set the `expected_position` to a negative integer.

If you need to discover the current position of the last recorded event in a stream,
you can use the `get_stream_position()` method (see below).

Please note, the append events operation is atomic, so that either all
or none of the given new events will be recorded. By design, it is only
possible with EventStoreDB to atomically record new events in one stream.

In the example below, a new event is appended to a new stream.

```python
from uuid import uuid4

from esdbclient import NewEvent

# Construct new event object.
event1 = NewEvent(type='OrderCreated', data=b'data1')

# Define stream name.
stream_name1 = str(uuid4())

# Append list of events to new stream.
commit_position1 = client.append_events(
    stream_name=stream_name1,
    expected_position=None,
    events=[event1],
)
```

In the example below, two subsequent events are appended to an existing
stream.

```python
event2 = NewEvent(type='OrderUpdated', data=b'data2')
event3 = NewEvent(type='OrderDeleted', data=b'data3')

commit_position2 = client.append_events(
    stream_name=stream_name1,
    expected_position=0,
    events=[event2, event3],
)
```

If the operation is successful, this method returns an integer
representing the overall "commit position" as it was when the operation
was completed. Otherwise, an exception will be raised.

A "commit position" is a monotonically increasing integer representing
the position of the recorded event in a "total order" of all recorded
events in the database across all streams. It is the actual position
of the event record on disk, and there are usually large differences
between successive commits. In consequence, the sequence of commit
positions is not gapless. Indeed, there are usually large differences
between the commit positions of successive recorded events.

The "commit position" returned by `append_events()` is that of the last
recorded event in the given batch of new events.

The "commit position" returned in this way can therefore be used to wait
for a downstream component to have processed all the events that were recorded.

For example, consider a user interface command that results in the recording
of new events, and a query into an eventually consistent materialized
view in a downstream component that is updated from these events. If the new
events have not yet been processed, the view would be stale. The "commit position"
can be used by the user interface to poll the downstream component until it has
processed those new events, after which time the view will not be stale.


### Append event

*requires leader*

The `append_event()` method can be used to write a single new event to a stream.

Three arguments are required, `stream_name`, `expected_position` and `event`.

This method works in the same way as `append_events()`, however `event` is expected
to be a single `NewEvent`.

This method takes an optional `timeout` argument, which is a Python `float` that sets
a deadline for the completion of the gRPC operation.

Since the handling of a command in your application may result in one or many
new events, and the results of handling a command should be recorded atomically,
and the writing of new events generated by a command handler is usually a concern
that is factored out and used everywhere in a project, it is quite usual in a project
to only use `append_events()` to record new events. For this reason, an example is
not provided here.


### Idempotent append operations

Sometimes it may happen that a new event is successfully recorded and then somehow
the connection to the database gets interrupted before the successful call can return
successfully to the client. In case of an error when appending an event, it may be
desirable to retry appending the same event at the same position. If the event was
in fact successfully recorded, it is convenient for the retry to return successfully
without raising an error due to optimistic concurrency control (as described above).

The example below shows the `append_events()` method being called again with
`event3` and `expected_position=2`. We can see that repeating the call to
`append_events()` returns successfully.

```python
# Retry appending event3.
commit_position_retry = client.append_events(
    stream_name=stream_name1,
    expected_position=0,
    events=[event2, event3],
)
```

We can see that the same commit position is returned as above.

```python
assert commit_position_retry == commit_position2
```

We can also see the stream has been unchanged despite calling the append_events()
twice with the same arguments, by calling `read_stream_events()`. That is, there
are still only three events in the stream.

```python
events = client.read_stream_events(
    stream_name=stream_name1
)

assert len(events) == 3
```

This idempotent behaviour is activated because the `NewEvent` class has an `id`
attribute that, by default, is assigned a new and unique version-4 UUID when an
instance of `NewEvent` is constructed. If events with the same `id` are appended
at the same `expected_position`, the stream will be unchanged, the operation will
complete successfully, and the same commit position will be returned to the caller.

```python
from uuid import UUID


assert isinstance(event1.id, UUID)
assert isinstance(event2.id, UUID)
assert isinstance(event3.id, UUID)

assert event1.id != event2.id
assert event2.id != event3.id

assert events[0].id == event1.id
assert events[1].id == event2.id
assert events[2].id == event3.id
```

It is possible to set the `id` constructor argument of `NewEvent` when instantiating
the `NewEvent` class, but in the examples above we have been using the default
behaviour, which is that the `id` value is generated when the `NewEvent` class is
instantiated.


### Read stream events

The `read_stream_events()` method can be used to obtain the recorded events in a
stream. It returns a sequence of recorded events objects. The received recorded event
object are instances of the `RecordedEvent` class (see below).

This method has one required argument, `stream_name`, which is the name of
the stream from which to read events. By default, the recorded events in the
stream are returned in the order they were recorded.

The method `read_stream_events()` also supports four optional arguments,
`stream_position`, `backwards`, `limit`, and `timeout`.

The optional `stream_position` argument is an optional integer that can be used to
indicate the position in the stream from which to start reading. This argument is
`None` by default, which means the stream will be read either from the start of the
stream (the default behaviour), or from the end of the stream if `backwards` is
`True` (see below). When reading a stream from a specific position in the stream, the
recorded event at that position will be included, both when reading forwards
from that position, and when reading backwards from that position.

The optional `backwards` argument is a boolean, by default `False`, which means the
stream will be read forwards by default, so that events are returned in the
order they were appended, If `backwards` is `True`, the stream will be read
backwards, so that events are returned in reverse order.

The optional `limit` argument is an integer which limits the number of events that will
be returned. The default value is `sys.maxint`.

The optional `timeout` argument is a Python `float` which sets a deadline for
the completion of the gRPC operation.

The example below shows how to read the recorded events of a stream
forwards from the start of the stream to the end of the stream. The
name of a stream is given when calling the method.

```python
events = client.read_stream_events(
    stream_name=stream_name1
)
```

Now that we have a sequence of event objects, we can check we got the
three events that were appended to the stream, and that they are
ordered exactly as they were appended.

```python
assert len(events) == 3

assert events[0].stream_name == stream_name1
assert events[0].stream_position == 0
assert events[0].type == event1.type
assert events[0].data == event1.data

assert events[1].stream_name == stream_name1
assert events[1].stream_position == 1
assert events[1].type == event2.type
assert events[1].data == event2.data

assert events[2].stream_name == stream_name1
assert events[2].stream_position == 2
assert events[2].type == event3.type
assert events[2].data == event3.data
```

The example below shows how to read recorded events in a stream forwards from
a specific stream position to the end of the stream.

```python
events = client.read_stream_events(
    stream_name=stream_name1,
    stream_position=1,
)

assert len(events) == 2

assert events[0].stream_name == stream_name1
assert events[0].stream_position == 1
assert events[0].type == event2.type
assert events[0].data == event2.data

assert events[1].stream_name == stream_name1
assert events[1].stream_position == 2
assert events[1].type == event3.type
assert events[1].data == event3.data
```

The example below shows how to read the recorded events in a stream backwards from
the end of the stream to the start of the stream.

```python
events = client.read_stream_events(
    stream_name=stream_name1,
    backwards=True,
)

assert len(events) == 3

assert events[0].stream_name == stream_name1
assert events[0].stream_position == 2
assert events[0].type == event3.type
assert events[0].data == event3.data

assert events[1].stream_name == stream_name1
assert events[1].stream_position == 1
assert events[1].type == event2.type
assert events[1].data == event2.data
```

The example below shows how to read a limited number (two) of the recorded events
in a stream forwards from the start of the stream.

```python
events = client.read_stream_events(
    stream_name=stream_name1,
    limit=2,
)

assert len(events) == 2

assert events[0].stream_name == stream_name1
assert events[0].stream_position == 0
assert events[0].type == event1.type
assert events[0].data == event1.data

assert events[1].stream_name == stream_name1
assert events[1].stream_position == 1
assert events[1].type == event2.type
assert events[1].data == event2.data
```

The example below shows how to read a limited number (one) of the recorded
events in a stream backwards from a given stream position.

```python
events = client.read_stream_events(
    stream_name=stream_name1,
    stream_position=2,
    backwards=True,
    limit=1,
)

assert len(events) == 1

assert events[0].stream_name == stream_name1
assert events[0].stream_position == 2
assert events[0].type == event3.type
assert events[0].data == event3.data
```

### Read all events

The method `read_all_events()` can be used to read all recorded events
in the database in the order they were recorded. It returns an iterable of
events that have been recorded in the database. Iterating over this iterable
object will stop when it has yielded the last recorded event. The received
recorded event object are instances of the `RecordedEvent` class (see below).

This method supports seven optional arguments, `commit_position`, `backwards`,
`filter_exclude`, `filter_include`, `filter_by_stream_name`, `limit`, and `timeout`.

The optional `commit_position` argument is an optional integer that can be used to
specify the commit position from which to start reading. This argument is `None` by
default, meaning that all the events will be read either from the start, or
from the end if `backwards` is `True` (see below). Please note, if specified,
the specified position must be an actually existing commit position, because
any other number will result in a server error (at least in EventStoreDB v21.10).

The optional `backwards` argument is a boolean which is by default `False` meaning the
events will be read forwards by default, so that events are returned in the
order they were committed, If `backwards` is `True`, the events will be read
backwards, so that events are returned in reverse order.

The optional `filter_exclude` argument is a sequence of regular expressions that
match recorded events that should not be included. This argument is ignored
if `filter_include` is set to a non-empty sequence. By default, this argument is set
to match the event types of "system events", so that EventStoreDB system events
will not normally be included. See the Notes section below for more information
about filter expressions.

The argument `filter_include` optional is a sequence of regular expressions
that match recorded events that should be included. By default, this argument
is an empty tuple. If this argument is set to a non-empty sequence, the
`filter_exclude` argument is ignored.

The argument `filter_by_stream_name` optional is a boolean value that indicates whether
the filter will apply to event types or stream names. By default, this value is `False`
and so the filtering will apply to the event type strings of recorded events.

The argument `limit` optional is an integer which limits the number of events that will
be returned. The default value is `sys.maxint`.

The optional `timeout` argument is a Python `float` which sets a
deadline for the completion of the gRPC operation.

The filtering of events is done on the EventStoreDB server. The
`limit` argument is applied on the server after filtering. See below for
more information about filter regular expressions.

When reading forwards from a specific commit position, the event at the specified
position WILL be included. However, when reading backwards, the event at the
specified position will NOT be included. (This non-inclusive behaviour, of excluding
the specified commit position when reading all streams backwards, differs from the
behaviour when reading a stream backwards from a specific stream position.)

The example below shows how to read all events in the database in the
order they were recorded.

```python
events = list(client.read_all_events())

assert len(events) >= 3
```

The example below shows how to read all recorded events from a specific commit position.

```python
events = list(
    client.read_all_events(
        commit_position=commit_position1
    )
)

assert len(events) == 3

assert events[0].stream_name == stream_name1
assert events[0].stream_position == 0
assert events[0].type == event1.type
assert events[0].data == event1.data

assert events[1].stream_name == stream_name1
assert events[1].stream_position == 1
assert events[1].type == event2.type
assert events[1].data == event2.data

assert events[2].stream_name == stream_name1
assert events[2].stream_position == 2
assert events[2].type == event3.type
assert events[2].data == event3.data
```

The example below shows how to read all recorded events in reverse order.

```python
events = list(
    client.read_all_events(
        backwards=True
    )
)

assert len(events) >= 3

assert events[0].stream_name == stream_name1
assert events[0].stream_position == 2
assert events[0].type == event3.type
assert events[0].data == event3.data

assert events[1].stream_name == stream_name1
assert events[1].stream_position == 1
assert events[1].type == event2.type
assert events[1].data == event2.data

assert events[2].stream_name == stream_name1
assert events[2].stream_position == 0
assert events[2].type == event1.type
assert events[2].data == event1.data
```

The example below shows how to read a limited number (one) of the recorded events
in the database forwards from a specific commit position. Please note, when reading
all events forwards from a specific commit position, the event at the specified
position WILL be included.


```python
events = list(
    client.read_all_events(
        commit_position=commit_position1,
        limit=1,
    )
)

assert len(events) == 1

assert events[0].stream_name == stream_name1
assert events[0].stream_position == 0
assert events[0].type == event1.type
assert events[0].data == event1.data

assert events[0].commit_position == commit_position1
```

The example below shows how to read a limited number (one) of the recorded events
in the database backwards from the end. This gives the last recorded event.

```python
events = list(
    client.read_all_events(
        backwards=True,
        limit=1,
    )
)

assert len(events) == 1

assert events[0].stream_name == stream_name1
assert events[0].stream_position == 2
assert events[0].type == event3.type
assert events[0].data == event3.data
```

The example below shows how to read a limited number (one) of the recorded events
in the database backwards from a specific commit position. Please note, when reading
all events backwards from a specific commit position, the event at the specified
position WILL NOT be included.

```python
events = list(
    client.read_all_events(
        commit_position=commit_position2,
        backwards=True,
        limit=1,
    )
)

assert len(events) == 1

assert events[0].commit_position < commit_position2
```


### Get current stream position

The `get_stream_position()` method can be used to
get the "stream position" of the last recorded event in a stream.

This method has a `stream_name` argument, which is required.

This method also takes an optional `timeout` argument, that
is expected to be a Python `float`, which sets a deadline
for the completion of the gRPC operation.

The sequence of positions in a stream is gapless. It is zero-based,
so that a stream with one recorded event has a current stream
position of `0`. The current stream position is `1` when a stream has
two events, and it is `2` when there are events, and so on.

In the example below, the current stream position is obtained of the
stream to which events were appended in the examples above.
Because the sequence of stream positions is zero-based, and because
three events were appended, so the current stream position is `2`.

```python
stream_position = client.get_stream_position(
    stream_name=stream_name1
)

assert stream_position == 2
```

If a stream does not exist, the returned stream position value is `None`,
which matches the required expected position when appending the first event
of a new stream (see above).

```python
stream_position = client.get_stream_position(
    stream_name=str(uuid4())
)

assert stream_position == None
```

This method takes an optional argument `timeout` which is a Python `float` that sets
a deadline for the completion of the gRPC operation.


### Get current commit position

The method `get_commit_position()` can be used to get the current
commit position of the database.

```python
commit_position = client.get_commit_position()
```

This method takes an optional argument `timeout` which is a Python `float` that sets
a deadline for the completion of the gRPC operation.

This method can be useful to measure progress of a downstream component
that is processing all recorded events, by comparing the current commit
position with the recorded commit position of the last successfully processed
event in a downstream component.

The value of the `commit_position` argument when reading events either by using
the `read_all_events()` method or by using a catch-up subscription would usually
be determined by the recorded commit position of the last successfully processed
event in a downstream component.


### Get stream metadata

The method `get_stream_metadata()` gets the metadata for a stream, along
with the version of the stream metadata.

```python
metadata, metadata_version = client.get_stream_metadata(stream_name=stream_name1)
```

The returned `metadata` value is a Python `dict`. The returned `metadata_version`
value is either an `int`, or `None` if the stream does not exist. These values can
be passed into `set_stream_metadata()`.


### Set stream metadata

*requires leader*

The method `set_stream_metadata()` sets the metadata for a stream, along
with the version of the stream metadata.

```python
metadata["foo"] = "bar"

client.set_stream_metadata(
    stream_name=stream_name1,
    metadata=metadata,
    expected_position=metadata_version,
)
```

The `expected_position` argument should be the current version of the stream metadata.

Please refer to the EventStoreDB documentation for more information about stream
metadata.

### Delete stream

*requires leader*

The method `delete_stream()` can be used to "delete" a stream.

```python
commit_position = client.delete_stream(stream_name=stream_name1, expected_position=2)
```

After deleting a stream, it's still possible to append new events. Reading from a
deleted stream will return only events that have been appended after it was
deleted.

### Tombstone stream

*requires leader*

The method `tombstone_stream()` can be used to "tombstone" a stream.

```python
commit_position = client.tombstone_stream(stream_name=stream_name1, expected_position=2)
```

After tombstoning a stream, it's not possible to append new events.


## Catch-up subscriptions

"Catch-up" subscriptions can be used to receive events that have been recorded
in the database, and also events that are recorded after a subscription was started.

The method `subscribe_all_events()` starts a catch-up subscription to receive all
events that have been and will be recorded in the database. The method
`subscribe_stream_events()` starts a catch-up subscription to receive events from
a specific stream.

Catch-up subscriptions encapsulate a streaming gRPC call which is
kept open by the server, with newly recorded events sent to the client
as the client iterates over the subscription response.

Many catch-up subscriptions can be created, concurrently or successively, and all
will receive all the recorded events they have been requested to receive.

The received recorded event object are instances of the `RecordedEvent` class
(see below).

### How to implement exactly-once event processing

The commit positions of recorded events that are received and processed by a
downstream component are usefully recorded by the downstream component so that
the commit position of last processed event can be determined.

The last recorded commit position can be used to specify the commit position from which
to subscribe when processing is resumed. Since this commit position will represent the
position of the last successfully processed event in a downstream component, so it
will be usual to want the next event after this position, because that is the next
event that has not yet been processed. For this reason, when subscribing for events
from a specific commit position using a catch-up subscription in EventStoreDB, the
recorded event at the specified commit position will NOT be included in the sequence
of recorded events that are received.

To accomplish "exactly-once" processing of recorded events in a downstream
component when using a catch-up subscription, the commit position of a recorded
event should be recorded atomically and uniquely along with the result of processing
recorded events, for example in the same database as materialised views when
implementing eventually-consistent CQRS, or in the same database as a downstream
analytics or reporting or archiving application. By recording the commit position
of recorded events atomically with the new state that results from processing
recorded events, "dual writing" in the consumption of recorded events can be
avoided. By also recording the commit position uniquely, the new state cannot be
recorded twice, and hence the recorded state of the downstream component will be
updated only once for any recorded event. By using the greatest recorded commit
position to resume a catch-up subscription, all recorded events will eventually
be processed. The combination of the "at-most-once" condition and the "at-least-once"
condition gives the "exactly-once" condition.

The danger with "dual writing" in the consumption of recorded events is that if a
recorded event is successfully processed and new state recorded atomically in one
transaction with the commit position recorded in a separate transaction, one may
happen and not the other. If the new state is recorded but the position is lost,
and then the processing is stopped and resumed, the recorded event may be processed
twice. On the other hand, if the commit position is recorded but the new state is
lost, the recorded event may effectively not be processed at all. By either
processing an event more than once, or by failing to process an event, the recorded
state of the downstream component might be inaccurate, or possibly inconsistent, and
perhaps catastrophically so. Such consequences may or may not matter in your situation.
But sometimes inconsistencies may halt processing until the issue is resolved. You can
avoid "dual writing" in the consumption of events by atomically recording the commit
position of a recorded event along with the new state that results from processing that
event in the same atomic transaction. By making the recording of the commit positions
unique, so that transactions will be rolled back when there is a conflict, you will
prevent the results of any duplicate processing of a recorded event being committed.

Recorded events received from a catch-up subscription cannot be acknowledged back
to the EventStoreDB server. Acknowledging events, however, is an aspect of "persistent
subscriptions" (see below). Hoping to rely on acknowledging events to an upstream
component is an example of dual writing.

### Subscribe all events

The`subscribe_all_events()` method can be used to start a "catch-up" subscription
that can return events from all streams in the database.

This method can be used by a downstream component
to process recorded events with exactly-once semantics.

This method takes an optional `commit_position` argument, which can be
used to specify a commit position from which to subscribe. The default
value is `None`, which means the subscription will operate from the first
recorded event in the database. If a commit position is given, it must match
an actually existing commit position in the database. The recoded events that
are obtained will not include the event recorded at that commit position.

This method also takes four other optional arguments, `filter_exclude`,
`filter_include`, `filter_by_stream_name`, and `timeout`.

The optional argument `filter_exclude` is a sequence of regular expressions that
match recorded events that should not be included. This argument is ignored
if `filter_include` is set to a non-empty sequence. By default, this argument is set
to match the event types of "system events", so that EventStoreDB system events
will not normally be included. See the Notes section below for more information
about filter expressions.

The optional argument `filter_include` is a sequence of regular expressions
that match recorded events that should be included. By default, this argument
is an empty tuple. If this argument is set to a non-empty sequence, the
`filter_exclude` argument is ignored.

The optional argument `filter_by_stream_name` is a boolean value that indicates whether
the filter will apply to event types or stream names. By default, this value is `False`
and so the filtering will apply to the event type strings of recorded events.

Please note, the filtering happens on the EventStoreDB server, and the
`limit` argument is applied on the server after filtering. See below for
more information about filter regular expressions.

The optional `timeout` argument is a Python `float` which sets a
deadline for the completion of the gRPC operation.

This method returns a Python iterator that yields recorded events, including events
that are recorded after the subscription was created. Iterating over this object will
therefore not stop, unless the connection to the database is lost. The call will
be ended when the iterator object is deleted from memory, which will happen when the
iterator object goes out of scope or is explicitly deleted (see below). The call
may also be closed by the server.

The subscription object can be used directly, but it might be used within a threaded
loop dedicated to receiving events that can be stopped in a controlled way, with
recorded events put on a queue for processing in a different thread. This package
doesn't provide such a threaded or queuing object class. Just make sure to reconstruct
the subscription (and the queue) using the last recorded commit position when resuming
the subscription after an error, to be sure all events are processed once.

The example below shows how to subscribe to receive all recorded
events from the start, and then resuming from a specific commit position.
Three already-recorded events are received, and then three new events are
recorded, which are then received via the subscription.

```python

# Append an event to a new stream.
stream_name2 = str(uuid4())
event4 = NewEvent(type='OrderCreated', data=b'data4')
client.append_events(
    stream_name=stream_name2,
    expected_position=None,
    events=[event4],
)

# Subscribe from the first recorded event in the database.
subscription = client.subscribe_all_events()
received_events = []

# Process events received from the catch-up subscription.
for event in subscription:
    last_commit_position = event.commit_position
    received_events.append(event)
    if event.id == event4.id:
        break

assert received_events[-4].id == event1.id
assert received_events[-3].id == event2.id
assert received_events[-2].id == event3.id
assert received_events[-1].id == event4.id

# Append subsequent events to the stream.
event5 = NewEvent(type='OrderUpdated', data=b'data5')
client.append_events(
    stream_name=stream_name2,
    expected_position=0,
    events=[event5],
)

# Receive subsequent events from the subscription.
for event in subscription:
    last_commit_position = event.commit_position
    received_events.append(event)
    if event.id == event5.id:
        break


assert received_events[-5].id == event1.id
assert received_events[-4].id == event2.id
assert received_events[-3].id == event3.id
assert received_events[-2].id == event4.id
assert received_events[-1].id == event5.id


# Append more events to the stream.
event6 = NewEvent(type='OrderDeleted', data=b'data6')
client.append_events(
    stream_name=stream_name2,
    expected_position=1,
    events=[event6],
)


# Resume subscribing from the last commit position.
subscription = client.subscribe_all_events(
    commit_position=last_commit_position
)


# Catch up by receiving the new event from the subscription.
for event in subscription:
    received_events.append(event)
    if event.id == event6.id:
        break

assert received_events[-6].id == event1.id
assert received_events[-5].id == event2.id
assert received_events[-4].id == event3.id
assert received_events[-3].id == event4.id
assert received_events[-2].id == event5.id
assert received_events[-1].id == event6.id


# Append three more events to a new stream.
stream_name3 = str(uuid4())
event7 = NewEvent(type='OrderCreated', data=b'data7')
event8 = NewEvent(type='OrderUpdated', data=b'data8')
event9 = NewEvent(type='OrderDeleted', data=b'data9')

client.append_events(
    stream_name=stream_name3,
    expected_position=None,
    events=[event7, event8, event9],
)

# Receive the three new events from the resumed subscription.
for event in subscription:
    received_events.append(event)
    if event.id == event9.id:
        break

assert received_events[-9].id == event1.id
assert received_events[-8].id == event2.id
assert received_events[-7].id == event3.id
assert received_events[-6].id == event4.id
assert received_events[-5].id == event5.id
assert received_events[-4].id == event6.id
assert received_events[-3].id == event7.id
assert received_events[-2].id == event8.id
assert received_events[-1].id == event9.id
```

The catch-up subscription call is ended as soon as the subscription object
goes out of scope or is explicitly deleted from memory.

```python
# End the subscription.
del subscription
```

### Subscribe stream events

The`subscribe_stream_events()` method can be used to start a "catch-up" subscription
that can return events that are recorded in a single stream.

This method takes a required `stream_name` argument, which specifies the name of the stream
from which recorded events will be received.

This method also takes two optional arguments, `stream_position`, and `timeout`.

The optional `stream_position` argument specifies a position in the stream from which
recorded events will be received. The event at the specified stream position will not
be included.

The optional `timeout` argument is a Python `float` that sets
a deadline for the completion of the gRPC operation.

The example below shows how to start a catch-up subscription to a stream.

```python

# Subscribe to events from stream2, from the start.
subscription = client.subscribe_stream_events(stream_name=stream_name2)

# Read from the subscription.
events = []
for event in subscription:
    events.append(event)
    if event.id == event6.id:
        break

# Check we got events only from stream2.
assert len(events) == 3
events[0].stream_name == stream_name2
events[1].stream_name == stream_name2
events[2].stream_name == stream_name2

# Append another event to stream3.
event10 = NewEvent(type="OrderUndeleted", data=b'data10')
client.append_events(
    stream_name=stream_name3,
    expected_position=2,
    events=[event10],
)

# Append another event to stream2.
event11 = NewEvent(type="OrderUndeleted", data=b'data11')
client.append_events(
    stream_name=stream_name2,
    expected_position=2,
    events=[event11]
)

# Continue reading from the subscription.
for event in subscription:
    events.append(event)
    if event.id == event11.id:
        break

# Check we got events only from stream2.
assert len(events) == 4
events[0].stream_name == stream_name2
events[1].stream_name == stream_name2
events[2].stream_name == stream_name2
events[3].stream_name == stream_name2
```

The example below shows how to start a catch-up subscription to a stream from a
specific stream position.

```python

# Subscribe to events from stream2, from the start.
subscription = client.subscribe_stream_events(
    stream_name=stream_name2,
    stream_position=1,
)

# Read event from the subscription.
events = []
for event in subscription:
    events.append(event)
    if event.id == event11.id:
        break

# Check we got events only after position 1.
assert len(events) == 2
events[0].id == event6.id
events[0].stream_position == 2
events[0].stream_name == stream_name2
events[1].id == event11.id
events[1].stream_position == 3
events[1].stream_name == stream_name2
```

## Persistent subscriptions

### Create subscription

*requires leader*

The method `create_subscription()` can be used to create a
"persistent subscription" to EventStoreDB.

This method takes a required `group_name` argument, which is the
name of a "group" of consumers of the subscription.

This method also takes seven optional arguments, `from_end`, `commit_position`,
`filter_exclude`, `filter_include`, `filter_by_stream_name`, `consumer_strategy`,
and `timeout`.

The optional `from_end` argument can be used to specify that the group of consumers
of the subscription should only receive events that were recorded after the subscription
was created.

Alternatively, the optional `commit_position` argument can be used to specify a commit
position from which commit position the group of consumers of the subscription should
receive events. Please note, the recorded event at the specified commit position might
be included in the recorded events received by the group of consumers.

If neither `from_end` or `commit_position` are specified, the group of consumers
of the subscription will potentially receive all recorded events in the database.

The optional `filter_exclude` argument is a sequence of regular expressions that
match recorded events that should not be included. This argument is ignored
if `filter_include` is set to a non-empty sequence. By default, this argument is set
to match the event types of "system events", so that EventStoreDB system events
will not normally be received. See the Notes section below for more information
about filter expressions.

The optional `filter_include` argument is a sequence of regular expressions
that match recorded events that should be received. By default, this argument
is an empty tuple. If this argument is set to a non-empty sequence, the
`filter_exclude` argument is ignored.

The optional `filter_by_stream_name` argument is a boolean value that indicates whether
the filter will apply to event types or stream names. By default, this value is `False`
and so the filtering will apply to the event type strings of recorded events.

The optional `consumer_strategy` argument is a Python `str` that defines
the consumer strategy for this persistent subscription. The value of this argument
can be `'DispatchToSingle'`, `'RoundRobin'`, `'Pinned'`, or `'PinnedByCorrelation'`. The
default value is `'DispatchToSingle'`.

The optional `timeout` argument is a Python `float` which sets a
deadline for the completion of the gRPC operation.

The method `create_subscription()` does not return a value, because
recorded events are obtained by the group of consumers of the subscription
using the `read_subscription()` method.

In the example below, a persistent subscription is created.

```python
# Create a persistent subscription.
group_name = f"group-{uuid4()}"
client.create_subscription(group_name=group_name)
```

### Read subscription

*requires leader*

The method `read_subscription()` can be used by a group of consumers to receive
recorded events from a persistent subscription created using `create_subscription`.

This method takes a required `group_name` argument, which is
the name of a "group" of consumers of the subscription specified
when `create_subscription()` was called.

This method also takes an optional `timeout` argument, that
is expected to be a Python `float`, which sets a deadline
for the completion of the gRPC operation.

This method returns a 2-tuple: a "read request" object and a "read response" object.

```python
read_req, read_resp = client.read_subscription(group_name=group_name)
```

The "read response" object is an iterator that yields recorded events from
the specified commit position.

The "read request" object has an `ack()` method that should be used by a consumer
in a group to acknowledge to the server that it has received and successfully
processed a recorded event. This will prevent that recorded event being received
by another consumer in the same group. The `ack()` method takes an `event_id`
argument, which is the ID of the recorded event that has been received.

The "read request" object also has an `nack()` method that should be used by a consumer
in a group to negatively acknowledge to the server that it has received but not
successfully processed a recorded event. The `nack()` method takes an `event_id`
argument, which is the ID of the recorded event that has been received, and an
`action` argument, which should be a Python `str`, either `'unknown'`, `'park'`,
`'retry'`, `'skip'`, or `'stop'`.

The example below iterates over the "read response" object, and calls `ack()`
on the "read response" object. The for loop breaks when we have received
the last event, so that we can continue with the examples below.

```python
events = []
for event in read_resp:
    events.append(event)

    # Acknowledge the received event.
    read_req.ack(event_id=event.id)

    # Break when the last event has been received.
    if event.id == event11.id:
        break
```

The received events are the events we appended above.

```python
assert events[-11].id == event1.id
assert events[-10].id == event2.id
assert events[-9].id == event3.id
assert events[-8].id == event4.id
assert events[-7].id == event5.id
assert events[-6].id == event6.id
assert events[-5].id == event7.id
assert events[-4].id == event8.id
assert events[-3].id == event9.id
assert events[-2].id == event10.id
assert events[-1].id == event11.id
```

The "read request" object also has an `nack()` method that can be used by a consumer
in a group to acknowledge to the server that it has failed successfully to
process a recorded event. This will allow that recorded event to be received
by this or another consumer in the same group.

It might be more useful to encapsulate the request and response objects and to iterate
over the "read response" in a separate thread, to call back to a handler function when
a recorded event is received, and call `ack()` if the handler does not raise an
exception, and to call `nack()` if an exception is raised. The example below shows how
this might be done.

```python
from threading import Thread


class SubscriptionReader:
    def __init__(self, client, group_name, callback):
        self.client = client
        self.group_name = group_name
        self.callback = callback
        self.thread = Thread(target=self.read_subscription, daemon=True)
        self.error = None

    def start(self):
        self.thread.start()

    def join(self):
        self.thread.join()

    def read_subscription(self):
        req, resp = self.client.read_subscription(group_name=self.group_name)
        for event in resp:
            try:
                self.callback(event)
            except Exception as e:
                # req.nack(event.id)  # not yet implemented....
                self.error = e
                break
            else:
                req.ack(event.id)


# Create another persistent subscription.
group_name = f"group-{uuid4()}"
client.create_subscription(group_name=group_name)

events = []

def handle_event(event):
    events.append(event)
    if event.id == event11.id:
        raise Exception()


reader = SubscriptionReader(
    client=client,
    group_name=group_name,
    callback=handle_event
)

reader.start()
reader.join()

assert events[-1].id == event11.id
```

Please note, when processing events in a downstream component, the commit position of
the last successfully processed event is usefully recorded by the downstream component
so that the commit position can be determined by the downstream component from its own
recorded when it is restarted. This commit position can be used to specify the commit
position from which to subscribe. Since this commit position represents the position of
the last successfully processed event in a downstream component, so it will be usual to
want to read from the next event after this position, because that is the next event
that needs to be processed. However, when subscribing for events using a persistent
subscription in EventStoreDB, the event at the specified commit position MAY be returned
as the first event in the received sequence of recorded events, and so it may
be necessary to check the commit position of the received events and to discard
any  recorded event object that has a commit position equal to the commit position
specified in the request.

Whilst there are some advantages of persistent subscriptions, in particular the
processing of recorded events by a group of consumers, by tracking in the
upstream server the position in the commit sequence of events that have been processed,
there is a danger of "dual writing" in the consumption of events. Reliability
in processing of recorded events by a group of consumers will rely instead on
idempotent handling of duplicate messages, and resilience to out-of-order delivery.

### Get subscription info

*requires leader*

The `get_subscription_info()` method can be used to get information for a
persistent subscription.

This method has one required argument, `group_name`, which
should match the value of the argument used when calling `create_subscription()`.

This method also takes an optional `timeout` argument, that
is expected to be a Python `float`, which sets a deadline
for the completion of the gRPC operation.

```python
subscription_info = client.get_subscription_info(
    group_name=group_name,
)
```

The returned value is a `SubscriptionInfo` object.

### List subscriptions

*requires leader*

The `list_subscriptions()` method can be used to get information for all
existing persistent subscriptions.

This method takes an optional `timeout` argument, that
is expected to be a Python `float`, which sets a deadline
for the completion of the gRPC operation.

```python
subscriptions = client.list_subscriptions()
```

The returned value is a list of `SubscriptionInfo` objects.

### Delete subscription

*requires leader*

The `delete_subscription()` method can be used to delete a persistent
subscription.

This method has one required argument, `group_name`, which
should match the value of argument used when calling `create_subscription()`.

This method also takes an optional `timeout` argument, that
is expected to be a Python `float`, which sets a deadline
for the completion of the gRPC operation.

```python
client.delete_subscription(
    group_name=group_name,
)
```

### Create stream subscription

*requires leader*

The `create_stream_subscription()` method can be used to create a persistent
subscription for a stream.

This method has two required arguments, `group_name` and `stream_name`. The
`group_name` argument names the group of consumers that will receive events
from this subscription. The `stream_name` argument specifies which stream
the subscription will follow. The values of both these arguments are expected
to be Python `str` objects.

The method also takes four optional arguments, `stream_position`, `from_end`,
`consumer_strategy`, and `timeout`.

This optional `stream_position` argument specifies a stream position from
which to subscribe. The recorded event at this stream
position will be received when reading the subscription.

This optional `from_end` argument is a Python `bool`.
By default, the value of this argument is False. If this argument is set
to a True value, reading from the subscription will receive only events
recorded after the subscription was created. That is, it is not inclusive
of the current stream position.

The optional `consumer_strategy` argument is a Python `str` that defines
the consumer strategy for this persistent subscription. The value of this argument
can be `'DispatchToSingle'`, `'RoundRobin'`, `'Pinned'`, or `'PinnedByCorrelation'`. The
default value is `'DispatchToSingle'`.

This method also takes an optional `timeout` argument, that
is expected to be a Python `float`, which sets a deadline
for the completion of the gRPC operation.

This method does not return a value. Events can be received by iterating
over the value returned by calling `read_stream_subscription()` (see below).

The example below creates a persistent stream subscription from the start of the stream.

```python
# Create a persistent stream subscription from start of the stream.
group_name1 = f"group-{uuid4()}"
client.create_stream_subscription(
    group_name=group_name1,
    stream_name=stream_name2,
)
```

The example below creates a persistent stream subscription from a stream position.

```python
# Create a persistent stream subscription from a stream position.
group_name2 = f"group-{uuid4()}"
client.create_stream_subscription(
    group_name=group_name2,
    stream_name=stream_name2,
    stream_position=2
)
```

The example below creates a persistent stream subscription from the end of the stream.

```python
# Create a persistent stream subscription from end of the stream.
group_name3 = f"group-{uuid4()}"
client.create_stream_subscription(
    group_name=group_name3,
    stream_name=stream_name2,
    from_end=True
)
```

### Read stream subscription

*requires leader*

The `read_stream_subscription()` method can be used to create a persistent
subscription for a stream.

This method has two required arguments, `group_name` and `stream_name`, which
should match the values of arguments used when calling `create_stream_subscription()`.

This method also takes an optional `timeout` argument, that
is expected to be a Python `float`, which sets a deadline
for the completion of the gRPC operation.

Just like `read_subscription`, this method returns a 2-tuple: a "read request" object
and a "read response" object.

```python
read_req, read_resp = client.read_stream_subscription(
    group_name=group_name1,
    stream_name=stream_name2,
)
```

The example below iterates over the "read response" object, and calls `ack()`
on the "read response" object. The for loop breaks when we have received
the last event in the stream, so that we can finish the examples in this
documentation.

```python
events = []
for event in read_resp:
    events.append(event)

    # Acknowledge the received event.
    read_req.ack(event_id=event.id)

    # Break when the last event has been received.
    if event.id == event11.id:
        break
```

We can check we received all the events that were appended to `stream_name2`
in the examples above.

```python
assert len(events) == 4
assert events[0].stream_name == stream_name2
assert events[0].id == event4.id
assert events[1].stream_name == stream_name2
assert events[1].id == event5.id
assert events[2].stream_name == stream_name2
assert events[2].id == event6.id
assert events[3].stream_name == stream_name2
assert events[3].id == event11.id
```

### Get stream subscription info

*requires leader*

The `get_stream_subscription_info()` method can be used to get information for a
persistent subscription for a stream.

This method has two required arguments, `group_name` and `stream_name`, which
should match the values of arguments used when calling `create_stream_subscription()`.

This method also takes an optional `timeout` argument, that
is expected to be a Python `float`, which sets a deadline
for the completion of the gRPC operation.

```python
subscription_info = client.get_stream_subscription_info(
    group_name=group_name1,
    stream_name=stream_name2,
)
```

The returned value is a `SubscriptionInfo` object.

### List stream subscriptions

*requires leader*

The `list_stream_subscriptions()` method can be used to get information for all
the persistent subscriptions for a stream.

This method has one required argument, `stream_name`.

This method also takes an optional `timeout` argument, that
is expected to be a Python `float`, which sets a deadline
for the completion of the gRPC operation.

```python
subscriptions = client.list_stream_subscriptions(
    stream_name=stream_name2,
)
```

The returned value is a list of `SubscriptionInfo` objects.

### Delete stream subscription

*requires leader*

The `delete_stream_subscription()` method can be used to delete a persistent
subscription for a stream.

This method has two required arguments, `group_name` and `stream_name`, which
should match the values of arguments used when calling `create_stream_subscription()`.

This method also takes an optional `timeout` argument, that
is expected to be a Python `float`, which sets a deadline
for the completion of the gRPC operation.

```python
client.delete_stream_subscription(
    group_name=group_name1,
    stream_name=stream_name2,
)
```

## Connection

### Reconnect

The `reconnect()` method can be used to manually reconnect the client to a
suitable EventStoreDB node. This method uses the same routine for reading the
cluster node states and then connecting to a suitable node according to the
client's node preference that is used when the client is constructed. This
method is thread-safe, and it is "conservative" in that, when it is called
by several threads at the same time, only one reconnection will occur.
Concurrent attempts to reconnect will block until the client has reconnected
successfully, and then they will all return normally.

```python
client.reconnect()
```

An example of when it might be desirable to reconnect manually is when (for performance
reasons) the node preference is for the client to be connected to a follower node in the
cluster, and, after a cluster leader election, the follower node becomes a leader node.
Reconnecting to a follower node in this case is currently beyond the capabilities of
this client, but this behavior might be implemented in a future release.

Please note, all the client methods are decorated with `@autoreconnect` (which calls the
`reconnect()` method) and a `@retry` decorator that will retry operations that fail
due to connection issues. The `@autoreconnect` decorator will reconnect to an available
node in the cluster when the server that the client has been connected to has become
unavailable, or if the client's gRPC channel happens to have been closed, and also when
a method is called that requires a leader but the node that the client has been
connected to stops being the leader then the client will reconnect to the current
leader. The `@retry` decorator will then retry operations that have failed due to
these connection issues. This means that a method that has failed due to connection
issues will be retried after the client has reconnected.

Please also note, the exceptions to this behaviour have to do with the methods that
return iterators. For example, an event-processing component that iterates over the
response to a catch-up subscription method call will always need to be monitored for
errors, and, if it fails after it has started iterating over the response, the catch-up
subscription will need to be restarted from the event-processing component's last saved
commit position. In this case, the client will automatically reconnect to a node in the
cluster when the subsequent call to a catch-up subscription method is made. You just
need to handle the exception that occurs from the iterator, then read your last saved
commit position, and then restart your event-processing component, using the same
`ESDBClient` instance, but with a new catch-up subscription iterator. Similarly, when
reading persistent subscriptions: if there are connectivity issues after you have
started iterating over the response, then the server call will need to be restarted.
For the operations return an iterator, the decorators mentioned above will have exited
before any subsequent gRPC streaming error occurs.


### Close

The `close()` method can be used to cleanly close the gRPC connection.

```python
client.close()
```

## Notes

### Connection strings

The EventStoreDB connection string is a URI that conforms with one of two possible
schemes, either the "esdb" scheme or the "esdb+discover" scheme. The syntax and
semantics of the EventStoreDB URI schemes are explained below. The syntax is
defined using [EBNF](https://en.wikipedia.org/wiki/Extended_Backus–Naur_form).

The "esdb" URI scheme can be defined in the following way.

    esdb-uri = "esdb://" , [ user-info , "@" ] , grpc-target, { "," , grpc-target } , [ "?" , query-string ] ;

In the "esdb" URI scheme, after the optional user info string, there must be at least
one gRPC target. If there are several gRPC targets, they must be separated from each
other with the "," character. Each gRPC target should indicate an EventStoreDB gRPC
server socket, by specifying a host and a port number separated with the ":" character.
The host may be a hostname that can be resolved to an IP address, or an IP address.

    grpc-target = ( hostname | ip-address ) , ":" , port-number ;


The "esdb+discover" URI scheme can be defined in the following way.

    esdb-discover-uri = "esdb+discover://" , [ user-info, "@" ] , cluster-domainname , [ "?" , query-string ] ;

In the "esdb+discover" URI scheme, after the user info string, there must be a domain
name which should identify a cluster of EventStoreDB servers. The client will use a DNS
server to resolve the domain name to a list of addresses of EventStoreDB servers,
by querying for 'A' records. In this case, the port number "2113" will be used to
construct gRPC targets from the addresses obtained from 'A' records provided by the
DNS server. Therefore, if you want to use the "esdb+discover" URI scheme, you will
need to configure DNS when setting up your EventStoreDB cluster.

With both the "esdb" and "esdb+disocver" URI schemes, the client firstly obtains
a list of gRPC targets: either directly from "esdb" connection strings; or indirectly
from "esdb+discover" connection strings via DNS. This list of targets is known as the
"gossip seed". The client will then attempt to connect to each gRPC target in turn,
attempting to call the EventStoreDB Gossip API to obtain information about the
EventStoreDB cluster. A member of the cluster is selected by the client, according
to the "node preference" option (see below). The client may then need to close its
connection and reconnect to the selected server.

In both the "esdb" and "esdb+discover" schemes, the URI may include a user info string.
If it exists in the URI, the user info string must be separated from the rest of the URI
with the "@" character. The user info string must include a username and a password,
separated with the ":" character.

    user-info = username , ":" , password ;

The user info is sent by the client as "call credentials" in each call to a "secure"
server, in a "basic auth" authorization header. This authorization header is used by
the server to authenticate the client. The authorization header is not sent to
"insecure" servers.

In both the "esdb" and "esdb+discover" schemes, the optional query string must be one
or many field-value arguments, separated from each other with the "&" character.

    query-string = field-value, { "&", field-value } ;

Each field-value argument must be one of the supported fields, and an
appropriate value, separated with the "=" character.

    field-value = ( "Tls", "=" , "true" | "false" )
                | ( "TlsVerifyCert", "=" , "true" | "false" )
                | ( "ConnectionName", "=" , string )
                | ( "NodePreference", "=" , "leader" | "follower" | "readonlyreplica" | "random" )
                | ( "DefaultDeadline", "=" , integer )
                | ( "GossipTimeout", "=" , integer )
                | ( "MaxDiscoverAttempts", "=" , integer )
                | ( "DiscoveryInterval", "=" , integer )
                | ( "MaxDiscoverAttempts", "=" , integer )
                | ( "KeepAliveInterval", "=" , integer )
                | ( "KeepAliveInterval", "=" , integer ) ;

The table below describes the query field-values supported by this client.

| Field               | Value                                                                 | Description                                                                                                                                                       |
|---------------------|-----------------------------------------------------------------------|-------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| Tls                 | "true", "false" (default: "true")                                     | If "true" the client will create a "secure" gRPC channel. If "false" the client will create an "insecure" gRPC channel. This must match the server configuration. |
| TlsVerifyCert       | "true", "false" (default: "true")                                     | This value is currently ignored.                                                                                                                                  |
| ConnectionName      | string (default: auto-generated version-4 UUID)                       | Sent in call metadata for every call, to identify the client to the cluster.                                                                                      |
| NodePreference      | "leader", "follower", "readonlyreplica", "random" (default: "leader") | The node state preferred by the client. The client will select a node from the cluster info received from the Gossip API according to this preference.            |
| DefaultDeadline     | integer (default: `None`)                                             | The default value (in seconds) of the `timeout` argument of client "write" methods such as `append_events()`.                                                     |
| GossipTimeout       | integer (default: 5)                                                  | The default value (in seconds) of the `timeout` argument of gossip read methods, such as `read_gossip()`.                                                         |
| MaxDiscoverAttempts | integer (default: 10)                                                 | The number of attempts to read gossip when connecting or reconnecting to a cluster member.                                                                        |
| DiscoveryInterval   | integer (default: 100)                                                | How long to wait (in milliseconds) between gossip retries.                                                                                                        |
| KeepAliveInterval   | integer (default: `None`)                                             | The value of the "grpc.keepalive_ms" gRPC channel option.                                                                                                         |
| KeepAliveTimeout    | integer (default: `None`)                                             | The value of the "grpc.keepalive_timeout_ms" gRPC channel option.                                                                                                 |


Here are some examples of EventStoreDB connection string URIs.

    # Get cluster info from secure server socket localhost:2113,
    # and use "admin" and "changeit" as username and password
    # when making calls to EventStoreDB API methods.

    esdb://admin:changeit@localhost:2113


    # Get cluster info from insecure server socket 127.0.0.1:2114

    esdb://127.0.0.1:2114?Tls=false


    # Get cluster info from addresses in 'A' records for cluster1.example.com,
    # and use a default deadline for making calls to EventStore API method.

    esdb+discover://admin:changeit@cluster1.example.com?DefaultDeadline=5


    # Get cluster info from either localhost:2111 or localhost:2112 or
    # localhost:2113, and then connect to a follower node in the cluster.

    esdb://admin:changeit@localhost:2111,localhost:2112,localhost:2113?NodePreference=follower


    # Get cluster info from addresses in 'A' records for cluster1.example.com,
    # and configure "keep alive" timeout and interval in the gRPC channel.

    esdb+discover://admin:changeit@cluster1.example.com?KeepAliveInterval=10000&KeepAliveTimeout=10000


Please note, the client is insensitive to the case of fields and values. If fields are
repeated in the query string, the query string will be parsed without error. However,
the connection options used by the client will use the value of the first field. All
the other field-values in the query string with the same field name will be ignored.
Fields without values will also be ignored.

If the client's node preference is "leader" and the node becomes a
"follower", the client will attempt to reconnect to the current leader when a method
is called that expects to call a leader. Methods which mutate the state of the database
expect to call a leader. For such methods, the HTTP header"requires-leader" is set to
"true", and this header is observed by the server, and so a node which is not a leader
that receives such a request will return an error. This error is detected by the client,
which will then close the current gRPC connection and create a new connection to the
leader. The request will then be retried with the leader.

If the client's node preference is "follower" and there are no follower
nodes in the cluster, then the client will raise an exception. Similarly, if the
client's node preference is "readonlyreplica" and there are no read-only replica
nodes in the cluster, then the client will also raise an exception.

The gRPC channel option "grpc.max_receive_message_length" is automatically
configured to the value `17 * 1024 * 1024`. This value cannot be changed.


### Regular expression filters

The filter arguments in `read_all_events()`, `subscribe_all_events()`,
`create_subscription()` and `get_commit_position()` are applied to the `type`
attribute of recorded events.

The default value of the `filter_exclude` arguments is designed to exclude
EventStoreDB "system" and "persistence subscription config" events, which
otherwise would be included. System events generated by EventStoreDB all
have `type` strings that start with the `$` sign. Persistence subscription
events generated when manipulating persistence subscriptions all have `type`
strings that start with `PersistentConfig`.

For example, to match the type of EventStoreDB system events, use the regular
expression `r'\$.+'`. Please note, the constant `ESDB_SYSTEM_EVENTS_REGEX` is
set to `r'\$.+'`. You can import this value
(`from esdbclient import ESDB_SYSTEM_EVENTS_REGEX`) and use
it when building longer sequences of regular expressions.

Similarly, to match the type of EventStoreDB persistence subscription events, use the
regular expression `r'PersistentConfig\d+'`. The constant `ESDB_PERSISTENT_CONFIG_EVENTS_REGEX`
is set to `r'PersistentConfig\d+'`. You can also import this value
(`from esdbclient import ESDB_PERSISTENT_CONFIG_EVENTS_REGEX`) and use it when building
longer sequences of regular expressions.

The constant `DEFAULT_EXCLUDE_FILTER` is a sequence of regular expressions that match
the events that EventStoreDB generates internally, events that are extraneous to those
which you append using the `append_events()` method.

For example, to exclude system events and persistence subscription configuration events,
and snapshots, you might use the sequence `DEFAULT_EXCLUDE_FILTER + ['.*Snapshot']` as
the value of the `filter_exclude` argument when calling `read_all_events()`,
`subscribe_all_events()`, `create_subscription()` or `get_commit_position()`.

### New event objects

The `NewEvent` class is used when appending events.

The required argument `type` is a Python `str`, used to indicate the type of
the event that will be recorded.

The required argument `data` is a Python `bytes` object, used to indicate the data of
the event that will be recorded.

The optional argument `metadata` is a Python `bytes` object, used to indicate any
metadata of the event that will be recorded. The default value is an empty `bytes`
object.

The optional argument `content_type` is a Python `str`, used to indicate the
type of the data that will be recorded for this event. The default value is
`application/json`, which indicates that the `data` was serialised using JSON.
An alternative value for this argument is `application/octet-stream`.

The optional argument `id` is a Python `UUID` object, used to specify the unique ID
of the event that will be recorded. This value will default to a new version-4 UUID.

```python
new_event1 = NewEvent(
    type='OrderCreated',
    data=b'{"name": "Greg"}',
)
assert new_event1.type == 'OrderCreated'
assert new_event1.data == b'{"name": "Greg"}'
assert new_event1.metadata == b''
assert new_event1.content_type == 'application/json'
assert isinstance(new_event1.id, UUID)

event_id = uuid4()
new_event2 = NewEvent(
    type='ImageCreated',
    data=b'01010101010101',
    metadata=b'{"a": 1}',
    content_type='application/octet-stream',
    id=event_id,
)
assert new_event2.type == 'ImageCreated'
assert new_event2.data == b'01010101010101'
assert new_event2.metadata == b'{"a": 1}'
assert new_event2.content_type == 'application/octet-stream'
assert new_event2.id == event_id
```

### Recorded event objects

The `RecordedEvent` class is used when reading events.

The attribute `type` is a Python `str`, used to indicate the type of event
that was recorded.

The attribute `data` is a Python `bytes` object, used to indicate the data of the
event that was recorded.

The attribute `metadata` is a Python `bytes` object, used to indicate the metadata of
the event that was recorded.

The attribute `content_type` is a Python `str`, used to indicate the type of
data that was recorded for this event (usually `application/json` to indicate that
this data can be parsed as JSON, but alternatively `application/octet-stream` to
indicate that it is something else).

The attribute `id` is a Python `UUID` object, used to indicate the unique ID of the
event that was recorded. Please note, when recorded events are returned from a call
to `read_stream_events()` in EventStoreDB v21.10, the commit position is not actually
set in the response. This attribute is typed as an optional value (`Optional[UUID]`),
and in the case of using EventStoreDB v21.10 the value of this attribute will be `None`
when reading recorded events from a stream. Recorded events will however have this
values set when reading recorded events from `read_all_events()` and from both
catch-up and persistent subscriptions.

The attribute `stream_name` is a Python `str`, used to indicate the name of the
stream in which the event was recorded.

The attribute `stream_position` is a Python `int`, used to indicate the position in the
stream at which the event was recorded.

The attribute `commit_position` is a Python `int`, used to indicate the commit position
at which the event was recorded.

```python
from esdbclient.events import RecordedEvent

recorded_event = RecordedEvent(
    type='OrderCreated',
    data=b'{}',
    metadata=b'',
    content_type='application/json',
    id=uuid4(),
    stream_name='stream1',
    stream_position=0,
    commit_position=512,
)
```

### Asyncio client

The `esdbclient` package also includes an early version of an asynchronous I/O
gRPC Python client. It follows exactly the same behaviors as the multithreaded
`ESDBClient`, but uses `grpc.aio` package and the `asyncio` module, instead of
`grpc` and `threading`.

The `async` function `AsyncioESDBClient` constructs the client, and connects to
a server. It can be imported from `esdbclient`, and can be called with the same
arguments as `ESDBClient`.

```python
from esdbclient import AsyncioESDBClient
```

The asynchronous I/O client has `async` methods `append_events()`,
`read_stream_events()`, `read_all_events()`, `subscribe_all_events()`,
`delete_stream()`, `tombstone_stream()`, and `reconnect()`.

These methods are equivalent to the methods on `ESDBClient`. They have the same
method signatures, and can be called with the same arguments, to the same effect.
The methods which appear on `ESDBClient` but not on `AsyncioESDBClient` will be
added soon.

The example below demonstrates `append_events()`, `read_stream_events()` and
`subscribe_all_events()` methods. These are the most useful methods for writing
an event-sourced application, allowing new aggregate events to be recorded, the
recorded events of an aggregate to be obtained so aggregates can be reconstructed,
and the state of an application to propagated and processed with "exactly-once"
semantics.

```python
import asyncio

async def demonstrate_asyncio_client():

    # Construct client.
    client = await AsyncioESDBClient(
        uri=os.getenv("ESDB_URI"),
        root_certificates=os.getenv("ESDB_ROOT_CERTIFICATES"),
    )

    # Append events.
    stream_name = str(uuid4())
    event1 = NewEvent("OrderCreated", data=b'{}')
    event2 = NewEvent("OrderUpdated", data=b'{}')
    event3 = NewEvent("OrderDeleted", data=b'{}')

    await client.append_events(
        stream_name=stream_name,
        expected_position=None,
        events=[event1, event2, event3]
    )

    # Read stream events.
    recorded = await client.read_stream_events(stream_name)
    assert len(recorded) == 3
    assert recorded[0].id == event1.id
    assert recorded[1].id == event2.id
    assert recorded[2].id == event3.id


    # Subscribe all events.
    received = []
    async for event in await client.subscribe_all_events():
        received.append(event)
        if event.id == event3.id:
            break
    assert received[-3].id == event1.id
    assert received[-2].id == event2.id
    assert received[-1].id == event3.id


    # Close the client.
    await client.close()


# Run the demo.
asyncio.get_event_loop().run_until_complete(
    demonstrate_asyncio_client()
)
```

## Contributors

### Install Poetry

The first thing is to check you have Poetry installed.

    $ poetry --version

If you don't, then please [install Poetry](https://python-poetry.org/docs/#installing-with-the-official-installer).

    $ curl -sSL https://install.python-poetry.org | python3 -

It will help to make sure Poetry's bin directory is in your `PATH` environment variable.

But in any case, make sure you know the path to the `poetry` executable. The Poetry
installer tells you where it has been installed, and how to configure your shell.

Please refer to the [Poetry docs](https://python-poetry.org/docs/) for guidance on
using Poetry.

### Setup for PyCharm users

You can easily obtain the project files using PyCharm (menu "Git > Clone...").
PyCharm will then usually prompt you to open the project.

Open the project in a new window. PyCharm will then usually prompt you to create
a new virtual environment.

Create a new Poetry virtual environment for the project. If PyCharm doesn't already
know where your `poetry` executable is, then set the path to your `poetry` executable
in the "New Poetry Environment" form input field labelled "Poetry executable". In the
"New Poetry Environment" form, you will also have the opportunity to select which
Python executable will be used by the virtual environment.

PyCharm will then create a new Poetry virtual environment for your project, using
a particular version of Python, and also install into this virtual environment the
project's package dependencies according to the project's `poetry.lock` file.

You can add different Poetry environments for different Python versions, and switch
between them using the "Python Interpreter" settings of PyCharm. If you want to use
a version of Python that isn't installed, either use your favourite package manager,
or install Python by downloading an installer for recent versions of Python directly
from the [Python website](https://www.python.org/downloads/).

Once project dependencies have been installed, you should be able to run tests
from within PyCharm (right-click on the `tests` folder and select the 'Run' option).

Because of a conflict between pytest and PyCharm's debugger and the coverage tool,
you may need to add ``--no-cov`` as an option to the test runner template. Alternatively,
just use the Python Standard Library's ``unittest`` module.

You should also be able to open a terminal window in PyCharm, and run the project's
Makefile commands from the command line (see below).

### Setup from command line

Obtain the project files, using Git or suitable alternative.

In a terminal application, change your current working directory
to the root folder of the project files. There should be a Makefile
in this folder.

Use the Makefile to create a new Poetry virtual environment for the
project and install the project's package dependencies into it,
using the following command.

    $ make install-packages

It's also possible to also install the project in 'editable mode'.

    $ make install

Please note, if you create the virtual environment in this way, and then try to
open the project in PyCharm and configure the project to use this virtual
environment as an "Existing Poetry Environment", PyCharm sometimes has some
issues (don't know why) which might be problematic. If you encounter such
issues, you can resolve these issues by deleting the virtual environment
and creating the Poetry virtual environment using PyCharm (see above).

### Project Makefile commands

You can start EventStoreDB using the following command.

    $ make start-eventstoredb

You can run tests using the following command (needs EventStoreDB to be running).

    $ make test

You can stop EventStoreDB using the following command.

    $ make stop-eventstoredb

You can check the formatting of the code using the following command.

    $ make lint

You can reformat the code using the following command.

    $ make fmt

Tests belong in `./tests`. Code-under-test belongs in `./esdbclient`.

Edit package dependencies in `pyproject.toml`. Update installed packages (and the
`poetry.lock` file) using the following command.

    $ make update-packages

