Metadata-Version: 2.1
Name: pulsar-thread
Version: 0.1.0
Summary: A connect pulsar message queue package, support multi-threaded production and consumption
Home-page: https://gitee.com/maxbanana/pulsar
Author: hongbo liu
Author-email: 782027465@qq.com
License: Apache
Platform: UNKNOWN
Classifier: Programming Language :: Python :: 3
Classifier: License :: OSI Approved :: Apache Software License
Classifier: Operating System :: OS Independent
Requires-Python: >=3.6
Description-Content-Type: text/markdown
License-File: LICENSE

# pulsar-thread

### 一、介绍
一个连接pulsar消息队列的包，优点是：支持多线程生产和消费。

A connect pulsar message queue package, support multi-threaded production and consumption.

1.  本包是以pulsar-client为基础创建的
2.  pulsar-client使用链接：https://pulsar.apache.org/docs/en/client-libraries-python/
3.  默认 schema=pulsar.schema.StringSchema()
4.  若想使用其他的schema， 使用方法与pulsar-client相同, 详情可看上面pulsar-client使用链接
5.  默认的多线程最大数thread_count为5个

### 二、使用说明

#### 1. 连接 （client）

     import pulsar_thread as pt
     client = pt.client('pulsar://0.0.0.0:6655')
     
     #请将 0.0.0.0:6655 换成你的pulsar地址

#### 2. 生产者（producer）

     import json
     import pulsar_thread as pt
     
     # 1. 连接client
     client = pt.client('pulsar://0.0.0.0:6655')
        
     # 模拟要发送的数据
     data = {'name':'jack', 'age':25, 'have': ['item1', 'item2']}
     data2 = {'name':'rose', 'age':28, 'have': ['item1', 'item2']}
     data3 = {'name':'joe', 'age':28, 'have': ['item1', 'item2']}
     data4 = {'name':'mark', 'age':28, 'have': ['item1', 'item2']}
     
     # 2. 将要发送的数据和topic组合成字典
     # {'topic_1': msg_1, ... , 'topic_n': msg_n}
     
     data_dict = {'test1':[json.dumps(data), json.dumps(data2)], 'test2':[json.dumps(data3), json.dumps(data4)]}
     
     # 3. 创建生产者
     producer = client.create_producer()
     
     
     # 4. 发送消息 
     # 可选 4.1 同步发送send 或 4.2 异步发送send_async
     
     # 4.1 同步发送
     # 可选 4.1.1 默认模式 或 4.1.2 自定义模式
     
     # 4.1.1 默认模式
     # 默认参数：thread_count=5, schema=pulsar.schema.StringSchema()
     # 默认多线程最大数thread_count为5个, schema是以StringSchema()字符串模式
     
     result = producer.send(data_dict)
     
     # 4.1.2 自定义模式
     # schema参数设置规范详见pulsar-client的使用
     
     result = producer.send_async(data_dict,
                                  thread_count=10,
                                  schema=pulsar.schema.StringSchema()) 
     
     # 4.2 异步发送
     # 可选 4.2.1 默认模式 或 4.2.2 自定义模式
     
     # 4.2.1 默认模式
     # 默认参数：callback=None, thread_count=5, schema=pulsar.schema.StringSchema()
     # 默认回调函数callback为None, 多线程最大数thread_count为5个, schema是以StringSchema()字符串模式
     
     result = producer.send_async(data_dict)
     
     # 4.2.2 自定义模式（callback， schema参数设置规范详见pulsar-client的使用）
     
     result = producer.send_async(data_dict, 
                                  callback=None,
                                  thread_count=10,
                                  schema=pulsar.schema.StringSchema()) 
     

#### 3.消费者（consumer）

    import json
    import pulsar_thread as pt

    # 业务程序，处理消息队列发来的消息
    # msg 是 接收的消息队列传来的消息
    def deal_msg(msg):
        print(msg.value())
        import time
        time.sleep(5)
        print(json.loads(msg.data()))

    # 1. 连接
    client = pt.client('pulsar://0.0.0.0:6655')
    
    # 2. 创建consumer
    # 可从多个 topic 里接收数据
    # 默认接收的 schema=pulsar.schema.StringSchema()
    # 格式：consumer = client.create_consumer(['topic_1', ......, 'topic_n'], 消费者的名字, schema=pulsar.schema.StringSchema())
    
    
    consumer = client.create_consumer(['test1', 'test2'], 'my-subscription')
    
    
    # 3. 接收数据并用业务程序（例：deal_msg）处理
    # 可选 3.1  单线程处理 consumer.receive() 或 
    # 3.2 多线程处理 consumer.receive_thread()
    
    
    # 3.1  单线程处理
    # 可选 3.1.1 默认模式 或 3.1.2 自定义模式
    # 阻塞模式，消费一个，业务程序处理一个，业务程序处理完成，再消费下一个
    
    # 3.1.1 默认模式
    # 默认参数：timeout_millis=None, logger=None
    # 默认 订阅超时限制 timeout_millis(慎用) 为 None, 单位上ms
    # 默认 日志收集器logger 为 None
    
    
    consumer.receive(deal_msg)
    
    
    # 3.1.2 自定义模式
    
    import logging,sys

    def LogSet():
        # 获取logger实例，如果参数为空则返回root logger
        logger = logging.getLogger("test.log")
        # 指定logger输出格式
        formatter = logging.Formatter('%(asctime)s %(pathname)s %(lineno)d %(levelname)-8s: %(message)s')
        # 文件日志
        file_handler = logging.FileHandler("./test.log")
        file_handler.setFormatter(formatter)  # 可以通过setFormatter指定输出格式
        # 控制台日志
        console_handler = logging.StreamHandler(sys.stdout)
        console_handler.formatter = formatter  # 也可以直接给formatter赋值
        # 为logger添加的日志处理器
        logger.addHandler(file_handler)
        logger.addHandler(console_handler)
        # 指定日志的最低输出级别，默认为WARN级别
        logger.setLevel(logging.DEBUG)
        # 移除一些日志处理器
        return logger, file_handler
    
    # 获取 logger
    logger,_=LogSet()
    
    consumer.receive(deal_msg, timeout_millis=5000, logger=logger)
    
    # 3.2 多线程处理
    # 可选 3.2.1 默认模式 或 3.2.2 自定义模式
    # 可以使用多线程进行并发消费，处理业务数据，提高效率
    
    
    # 3.2.1 默认模式
    # 默认参数：thread_count=5, timeout_millis=None, logger=None
    # 默认 最大线程数thread_count为5个
    # 默认 订阅超时限制 timeout_millis(慎用) 为 None, 单位上ms
    # 默认 日志收集器logger 为 None
    
    consumer.receive_thread(deal_msg)
    
    # 3.1.2 自定义模式
    
    # 例 1
    consumer.receive_thread(deal_msg, 2)
    
    # 例 2
    consumer.receive_thread(deal_msg, 10, timeout_millis=5000, logger=logger)


### 三、使用 pulsar-client（pulsar.Client）连接时

pulsar_thread的create_producer和create_consumer的使用

##### 1. 生产者（producer）


    import json
    import pulsar_thread as pt
    import pulsar
    
    # 使用 pulsar 连接
    client = pulsar.Client('pulsar://0.0.0.0:6655')
    
    data = {'name':'jack', 'age':25, 'have': ['item1', 'item2']}
    data2 = {'name':'rose', 'age':28, 'have': ['item1', 'item2']}
    data3 = {'name':'joe', 'age':28, 'have': ['item1', 'item2']}
    data4 = {'name':'mark', 'age':28, 'have': ['item1', 'item2']}
    
    data_dict = {'test1':[json.dumps(data), json.dumps(data2)], 'test2':[json.dumps(data3), json.dumps(data4)]}
    
    # 使用 pulsar_thread 创建生产者
    producer = pt.create_producer(client)
    
    result = producer.send(data_dict)
    
##### 2. 消费者（consumer）


    import json
    import pulsar_thread as pt
    import pulsar
    
    
    def deal_msg(msg):
        print(msg.value())
        import time
        time.sleep(5)
        print(json.loads(msg.data()))
    
    # 使用 pulsar 连接
    client = pulsar.Client('pulsar://0.0.0.0:6655')
    
    # 使用 pulsar_thread 创建消费者
    consumer = pt.create_consumer(client, ['test1', 'test2'], 'my-subscription')
    
    consumer.receive_thread(deal_msg, 2)
    

### 四、关于schema模式

##### 1. schema支持的模式

| schema | note |
|:--------:|:-------------:|
| BytesSchema | Get the raw payload as a bytes object. No serialization/deserialization are performed. This is the default schema mode |
| StringSchema | Encode/decode payload as a UTF-8 string. Uses str objects |
| JsonSchema | Require record definition. Serializes the record into standard JSON payload |
| AvroSchema | Require record definition. Serializes in AVRO format |


##### 2. schema参数用法和pulsar-client相同
若想拓展使用schema，请移步至pulsar-client文档，阅读使用schema。
pulsar-client文档链接：https://pulsar.apache.org/docs/en/client-libraries-python/
    






