Metadata-Version: 2.1
Name: tembo-pgmq-python
Version: 0.8.0
Summary: Python client for the PGMQ Postgres extension.
License: PostgreSQL
Author: Adam Hendel
Author-email: adam@tembo.io
Requires-Python: >=3.9
Classifier: License :: OSI Approved
Classifier: Programming Language :: Python :: 3
Classifier: Programming Language :: Python :: 3.9
Classifier: Programming Language :: Python :: 3.10
Classifier: Programming Language :: Python :: 3.11
Classifier: Programming Language :: Python :: 3.12
Requires-Dist: orjson (>=3,<4)
Requires-Dist: psycopg[binary,pool] (>=3,<4)
Project-URL: Documentation, https://github.com/tembo-io/pgmq/tree/main/tembo-pgmq-python
Project-URL: Homepage, https://github.com/tembo-io/pgmq
Project-URL: Repository, https://github.com/tembo-io/pgmq/tree/main/tembo-pgmq-python
Description-Content-Type: text/markdown

# Tembo's Python Client for PGMQ

## Installation

Install with `pip` from pypi.org:

```bash
pip install tembo-pgmq-python
```

Dependencies:

Postgres running the [Tembo PGMQ extension](https://github.com/tembo-io/tembo/tree/main/pgmq).

## Usage

### Start a Postgres Instance with the Tembo extension installed

```bash
docker run -d --name postgres -e POSTGRES_PASSWORD=postgres -p 5432:5432 quay.io/tembo/pg16-pgmq:latest
```

### Using Environment Variables

Set environment variables:

```bash
export PG_HOST=127.0.0.1
export PG_PORT=5432
export PG_USERNAME=postgres
export PG_PASSWORD=postgres
export PG_DATABASE=test_db
```

Initialize a connection to Postgres using environment variables:

```python
from tembo_pgmq_python import PGMQueue, Message

queue = PGMQueue()
```

### Initialize a connection to Postgres without environment variables

```python
from tembo_pgmq_python import PGMQueue, Message

queue = PGMQueue(
    host="0.0.0.0",
    port="5432",
    username="postgres",
    password="postgres",
    database="postgres"
)
```

### Create a queue 

```python
queue.create_queue("my_queue")
```

### or a partitioned queue

```python
queue.create_partitioned_queue("my_partitioned_queue", partition_interval=10000)
```
### List all queues

```python
queues = queue.list_queues()
for q in queues:
    print(f"Queue name: {q}")
```

### Send a message

```python
msg_id: int = queue.send("my_queue", {"hello": "world"})
```

### Send a batch of messages

```python
msg_ids: list[int] = queue.send_batch("my_queue", [{"hello": "world"}, {"foo": "bar"}])
```

### Read a message, set it invisible for 30 seconds

```python
read_message: Message = queue.read("my_queue", vt=30)
print(read_message)
```

### Read a batch of messages

```python
read_messages: list[Message] = queue.read_batch("my_queue", vt=30, batch_size=5)
for message in read_messages:
    print(message)
```

### Read messages with polling

The `read_with_poll` method allows you to repeatedly check for messages in the queue until either a message is found or the specified polling duration is exceeded. This can be useful in scenarios where you want to wait for new messages to arrive without continuously querying the queue in a tight loop.

In the following example, the method will check for up to 5 messages in the queue `my_queue`, making the messages invisible for 30 seconds (`vt`), and will poll for a maximum of 5 seconds (`max_poll_seconds`) with intervals of 100 milliseconds (`poll_interval_ms`) between checks.

```python
read_messages: list[Message] = queue.read_with_poll("my_queue", vt=30, qty=5, max_poll_seconds=5, poll_interval_ms=100)
for message in read_messages:
    print(message)
```

This method will continue polling until it either finds the specified number of messages (`qty`) or the `max_poll_seconds` duration is reached. The `poll_interval_ms` parameter controls the interval between successive polls, allowing you to avoid hammering the database with continuous queries.

### Archive the message after we're done with it. Archived messages are moved to an archive table

```python
archived: bool = queue.archive("my_queue", read_message.msg_id)
```

### Archive a batch of messages

```python
archived_ids: list[int] = queue.archive_batch("my_queue", [msg_id1, msg_id2])
```

### Delete a message completely

```python
read_message: Message = queue.read("my_queue")
deleted: bool = queue.delete("my_queue", read_message.msg_id)
```

### Delete a batch of messages

```python
deleted_ids: list[int] = queue.delete_batch("my_queue", [msg_id1, msg_id2])
```

### Set the visibility timeout (VT) for a specific message

```python
updated_message: Message = queue.set_vt("my_queue", msg_id, 60)
print(updated_message)
```

### Pop a message, deleting it and reading it in one transaction

```python
popped_message: Message = queue.pop("my_queue")
print(popped_message)
```

### Purge all messages from a queue

```python
purged_count: int = queue.purge("my_queue")
print(f"Purged {purged_count} messages from the queue.")
```

### Detach an archive from a queue

```python
queue.detach_archive("my_queue")
```

### Drop a queue

```python
dropped: bool = queue.drop_queue("my_queue")
print(f"Queue dropped: {dropped}")
```

### Validate the length of a queue name

```python
queue.validate_queue_name("my_queue")
```

### Get queue metrics

The `metrics` method retrieves various statistics for a specific queue, such as the queue length, the age of the newest and oldest messages, the total number of messages, and the time of the metrics scrape.

```python
metrics = queue.metrics("my_queue")
print(f"Metrics: {metrics}")
```

### Access individual metrics

You can access individual metrics directly from the `metrics` method's return value:

```python
metrics = queue.metrics("my_queue")
print(f"Queue name: {metrics.queue_name}")
print(f"Queue length: {metrics.queue_length}")
print(f"Newest message age (seconds): {metrics.newest_msg_age_sec}")
print(f"Oldest message age (seconds): {metrics.oldest_msg_age_sec}")
print(f"Total messages: {metrics.total_messages}")
print(f"Scrape time: {metrics.scrape_time}")
```

### Get metrics for all queues

The `metrics_all` method retrieves metrics for all queues, allowing you to iterate through each queue's metrics.

```python
all_metrics = queue.metrics_all()
for metrics in all_metrics:
    print(f"Queue name: {metrics.queue_name}")
    print(f"Queue length: {metrics.queue_length}")
    print(f"Newest message age (seconds): {metrics.newest_msg_age_sec}")
    print(f"Oldest message age (seconds): {metrics.oldest_msg_age_sec}")
    print(f"Total messages: {metrics.total_messages}")
    print(f"Scrape time: {metrics.scrape_time}")
```




