Metadata-Version: 2.1
Name: ctodd-python-lib-kafka
Version: 1.0.2
Summary: Python utilities used for interacting with Apache Kafka
Home-page: https://github.com/ChristopherHaydenTodd/ctodd-python-lib-kafka
Author: Christopher H. Todd
Author-email: Christopher.Hayden.Todd@gmail.com
License: MIT
Keywords: python,libraries,Kafka,consumer,producer
Platform: UNKNOWN
Classifier: Development Status :: 5 - Production/Stable
Classifier: Intended Audience :: Developers
Classifier: Natural Language :: English
Classifier: Programming Language :: Python
Classifier: Programming Language :: Python :: 3
Classifier: Programming Language :: Python :: 3.6
Classifier: Programming Language :: Python :: 3.7
Classifier: Programming Language :: Python :: 3.8
Requires-Python: >3.6
Description-Content-Type: text/markdown
Requires-Dist: simplejson
Requires-Dist: confluent-kafka

# Christopher H. Todd's Python Library For Interacting With Kafka

The ctodd-python-lib-kafka project is responsible for interacting with Apache Kafka. This includes producing and consuming records from topics, utilizing .avro format, and other tasks in creating event driven applications with Python.

## Table of Contents

- [Dependencies](#dependencies)
- [Libraries](#libraries)
- [Example Scripts](#example-scripts)
- [Notes](#notes)
- [TODO](#todo)

## Dependencies

### Python Packages

- confluent-kafka==0.11.6
- simplejson==3.16.0

## Libraries

### [kafka_admin_helpers.py](https://github.com/ChristopherHaydenTodd/ctodd-python-lib-kafka/blob/master/kafka_helpers/kafka_admin_helpers.py)

This library is used to interacting with Kafka Admin functionality. This
includes getting the admin object that will return details about kafka
state.

Functions:

```
def get_kafka_admin_client(kafka_brokers):
    """
    Purpose:
        Get a Kafka Admin Client Object. Allows for polling information about Kafka
        configuration and creating objects in Kafka
    Args:
        kafka_brokers (List of Strings): List of host:port combinations for kakfa
            brokers
    Return:
        kafka_admin_client (Kafka Admin Client Obj): Kafka Admin Client Obj for the
            brokers
    """
```

### [kafka_consumer_helpers.py](https://github.com/ChristopherHaydenTodd/ctodd-python-lib-kafka/blob/master/kafka_helpers/kafka_consumer_helpers.py)

This library is used to aid in creating kafka consumers.

Functions:

```
def get_kafka_consumer(
    kafka_brokers,
    consumer_group="default",
    timeout=6000,
    offset_start="latest",
    get_stats=True
):
    """
    Purpose:
        Get a Kafka Consumer Object (not yet connected to a topic)
    Args:
        kafka_brokers (List of Strings): List of host:port combinations for kakfa brokers
        consumer_group (String): Consumer group to consume as. default is "default"
        timeout (String): Timeout in ms if no messages are found (during poll). Default
            is 6000
        offset_start (String): Where to start consuming with respect to the consumer
            group/topic offset. Default is "latest", which ignores any messages in the
            topic before the consumer begins consuming
        get_stats (Bool): Whether or not to print statistics. Default is True
    Return:
        kafka_consumer (Kafka Consumer Obj): Kafka Consumer Object
    """
```

```
def consume_topic(kafka_consumer, kafka_topics):
    """
    Purpose:
        Consume Kafka Topics
    Args:
        kafka_consumer (Kafka Consumer Obj): Kafka Consumer Object
        kafka_topics (List of Strings): List of Kafka Topics to Consume.
    Yields:
        msg (Kafka Message Obj): Message Obj returned from the topic
    """
```

### [kafka_exceptions.py](https://github.com/ChristopherHaydenTodd/ctodd-python-lib-kafka/blob/master/kafka_helpers/kafka_exceptions.py)

File for holding custom exception types that will be generated by the kafka_helpers libraries

Classes:

```
class TopicNotFound(Exception):
    """
    Purpose:
        The TopicNotFound will be raised when attempting to consume a topic that
        does not exist
    """
```


### [kafka_general_helpers.py](https://github.com/ChristopherHaydenTodd/ctodd-python-lib-kafka/blob/master/kafka_helpers/kafka_general_helpers.py)

This library is used to interact with kafka not specificlly related to consuming or producing messages

Functions:

#### N/A


### [kafka_producer_helpers.py](https://github.com/ChristopherHaydenTodd/ctodd-python-lib-kafka/blob/master/kafka_helpers/kafka_producer_helpers.py)

This library is used to aid in creating kafka producers.

Functions:

```
def get_kafka_producer(kafka_brokers, get_stats=True):
    """
    Purpose:
        Get a Kafka Producer Object (not yet connected to a topic)
    Args:
        kafka_brokers (List of Strings): List of host:port combinations for kakfa brokers
        get_stats (Bool): Whether or not to print statistics. Default is True
    Return:
        kafka_producer (Kafka Producer Obj): Kafka Producer Object
    """
```

```
def produce_message(kafka_producer, kafka_topic, msg):
    """
    Purpose:
        Consume Kafka Topics
    Args:
        kafka_producer (Kafka Producer Obj): Kafka Producer Object
        kafka_topic (String): Kafka Topic to Produce message to.
        msg (String): Message to produce to Kafka
    Returns:
        N/A
    """
```

```
def produce_results_callback(err, msg):
    """
    Purpose:
        Optional per-message delivery callback (triggered by poll() or
        flush()) when a message has been successfully delivered or
        permanently failed delivery (after retries).
    Args:
        err (String): Error Message
        msg (Object): Kafka Callback Message Object
    Return:
        N/A
    """
```


### [kafka_topic_helpers.py](https://github.com/ChristopherHaydenTodd/ctodd-python-lib-kafka/blob/master/kafka_helpers/kafka_topic_helpers.py)

This library is used to interact with kafka topics. This includes getting
a list of the topics, finding details about a topic, creating topics, and
more.

Functions:

```
def get_topics(kafka_admin_client, return_system_topics=False):
    """
    Purpose:
        Get a List of Kafka Topics.
    Args:
        kafka_admin_client (Kafka Admin Client Obj): Kafka Admin Client Obj for the
            brokers
    Return:
        kafka_topics (Dict of Kafka Topics): Key is the topic name and value is a
            Kafka metadata object that has basic topic information
    """
```

```
def create_kafka_topic(
    kafka_admin_client, topic_name, topic_replication=1, topic_partitions=1
):
    """
    Purpose:
        Create a Kafka Topic
    Args:
        kafka_admin_client (Kafka Admin Client Obj): Kafka Admin Client Obj for the
            brokers
        topic_name (String): Name of the topic to create
        topic_replication (Int): Replication factor for the new topic
        topic_partitions (Int): Number of partitions to devide the topic into
    Return:
        N/A
    """
```

## Example Scripts

Example executable Python scripts/modules for testing and interacting with the library. These show example use-cases for the libraries and can be used as templates for developing with the libraries or to use as one-off development efforts.

### [consume_from_kafka_topic.py](https://github.com/ChristopherHaydenTodd/ctodd-python-lib-kafka/blob/master/example_usage/consume_from_kafka_topic.py)

```
    Purpose:
        Consume from a Kafka Topic

    Steps:
        - Connect to Kafka
        - Create Consumer Object
        - Poll Topic
        - Parse Message
        - Print Message

    example script call:
        python3 consume_from_kafka_topic.py --topic="test-env-topic" \
            --broker="0.0.0.0:9092" --consumer-group="test-env-consumer"
```

### [produce_to_kafka_topic.py](https://github.com/ChristopherHaydenTodd/ctodd-python-lib-kafka/blob/master/example_usage/kafka_producer_helpers.py)

```
    Purpose:
        Produce to a Kafka Topic

    Steps:
        - Connect to Kafka
        - Create Producer Object
        - Prompt for Input
        - Parse Input
        - Produce Input to Kafka

    example script call:
        python3 produce_to_kafka_topic.py --topic="test-env-topic" \
            --broker="localhost:9092"
```

### [create_kakfa_topic.py](https://github.com/ChristopherHaydenTodd/ctodd-python-lib-kafka/blob/master/example_usage/create_kakfa_topic.py)

```
    Purpose:
        Create a Kafka Topic. Takes in replication and parition information

    Steps:
        - Connect to Kafka
        - Create Kafka Admin Client
        - Create Topic In Kafka

    function call:
        ---
    example script call:
        python3 create_kafka_topic.py --topic-name="test-env-topic" \
            --topic-replication=3 --topic-partitions=4 \
            --broker="localhost:9092"
```

## Notes

 - Relies on f-string notation, which is limited to Python3.6.  A refactor to remove these could allow for development with Python3.0.x through 3.5.x

## TODO

 - Unittest framework in place, but lacking tests


