Metadata-Version: 2.1
Name: curv_amqp
Version: 1.0.5
Summary: UNKNOWN
Home-page: http://github.com/rep-ai/curv_amqp
License: MIT
Description: # curv_amqp
        Pika framework that handles reconnecting while using a blocking connection and has helpful defaults, building blocks, type hints, and a priority requeue method
        ## Installing package
        ```shell script
        pip install curv-amqp
        ```
        
        ## Installing for local development
        ```shell script
        git clone https://github.com/rep-ai/curv_amqp.git
        cd curv_amqp
        pip install -r requirements.txt
        pip install -e .
        ```
        
        ## Basic Usage
        ```python
        from curv_amqp.connection import Connection, ConnectionParameters
        from curv_amqp.publisher import Publisher
        from curv_amqp.consumer import Consumer, ConsumerMessage
        host = 'localhost'
        queue_name = 'test'
        connection = Connection(parameters=ConnectionParameters(host))
        
        publisher = Publisher(connection=connection)
        publisher.publish(routing_key=queue_name, body=b'message')
        
        consumer = Consumer(connection=connection)
        
        def on_message_callback(message: ConsumerMessage):
            print('message.body:', message.body)
            message.ack()
            # you don't have to stop consuming here - but you do have to stop the consumer in this thread
            # eventually since consumer.consume is blocking
            message.consumer.stop_consuming()
        
        consumer.consume(queue=queue_name, prefetch_count=1, on_message_callback=on_message_callback)
        
        ```
        ## Usage
        ```python
        from argparse import ArgumentParser
        
        from curv_amqp.connection import Connection, URLParameters, ConnectionParameters
        from curv_amqp.publisher import Publisher
        from curv_amqp.consumer import Consumer, ConsumerMessage
        from curv_amqp.exceptions import ChannelClosedError, ConnectionClosedError, RequeueRetryCountError
        
        
        def main():
            parser = ArgumentParser()
            parser.add_argument('--url', type=str, default='localhost', help='amqp url or localhost - '
                                                                             'localhost assumes rabbitmq is installed - '
                                                                             '"brew install rabbitmq"')
            parser.add_argument('--queue', type=str, default='test-queue-name', help='amqp queue name')
            parser.add_argument('--body', type=str, default='your message', help='amqp message body')
            args = parser.parse_args()
            # pass in URLParameters or ConnectionParameters
            # its recommended that a single connection per process is used.
            url: str = args.url
            parameters = ConnectionParameters(url) if url is 'localhost' else URLParameters(url)
            queue_name = args.queue
            body = bytes(args.body, encoding='utf-8')
        
            connection = Connection(parameters=parameters)
            # testing auto reconnect for connection
            connection.blocking_connection.close()
            # its required that two different channels are used for a publisher and consumer
            # NOTE: will automatically declare queue for you
            publisher = Publisher(connection=connection)
            # testing auto reconnect for channel / publisher
            publisher.blocking_channel.close()
            publisher.publish(routing_key=queue_name, body=body)
        
            consumer = Consumer(connection=connection)
            # testing auto reconnect for channel / consumer
            consumer.blocking_channel.close()
        
            def on_message_callback(message: ConsumerMessage):
                print('message.body:', message.body)
                print('message.properties.priority', message.properties.priority)
                try:
                    message.priority_requeue(publisher)
                except RequeueRetryCountError as ex:
                    print(ex)
                    message.ack()
                    message.consumer.stop_consuming()
        
            consumer.consume(queue=queue_name, prefetch_count=1, on_message_callback=on_message_callback)
        
            publisher.publish(routing_key=queue_name, body=body)
            for msg in consumer.consume_generator(queue=queue_name, prefetch_count=1, auto_ack=True, inactivity_timeout=1):
                print('msg.body:', msg.body)
        
            try:
                # testing proper channel close
                publisher.close()
                publisher.publish(routing_key=queue_name, body=body)
            except ChannelClosedError as e:
                print(e)
        
            try:
                # testing proper connection close
                publisher = Publisher(connection=connection)
                connection.close()
                publisher.publish(routing_key=queue_name, body=body)
            except ConnectionClosedError as e:
                print(e)
        
        
        if __name__ == '__main__':
            main()
        
        ```
        
        ## Updating package
        ```shell script
        # update __version__ in __init__.py
        python setup.py sdist bdist_wheel
        twine check dist/*
        twine upload dist/*
        ```
Platform: UNKNOWN
Description-Content-Type: text/markdown
