Metadata-Version: 2.1
Name: ipc-worker
Version: 0.0.6
Summary: ipc-worker: Inter-Process Communication , muti Process Woker works by share memory or MQ.
Home-page: https://github.com/ssbuild
Author: ssbuild
Author-email: 9727464@qq.com
License: Apache 2.0
Keywords: ipc-worker,ipc_worker,ipc,process worker,ipc,ipc mq,fast-ipc,process ipc
Platform: win32_AMD64
Classifier: Development Status :: 5 - Production/Stable
Classifier: Intended Audience :: Developers
Classifier: Intended Audience :: Education
Classifier: Intended Audience :: Science/Research
Classifier: License :: OSI Approved :: Apache Software License
Classifier: Programming Language :: C++
Classifier: Programming Language :: Python :: 3
Classifier: Programming Language :: Python :: 3.8
Classifier: Programming Language :: Python :: 3.9
Classifier: Programming Language :: Python :: 3.10
Classifier: Topic :: Scientific/Engineering
Classifier: Topic :: Scientific/Engineering :: Mathematics
Classifier: Topic :: Scientific/Engineering :: Artificial Intelligence
Classifier: Topic :: Software Development
Classifier: Topic :: Software Development :: Libraries
Classifier: Topic :: Software Development :: Libraries :: Python Modules
Requires-Python: >=3, <4
Description-Content-Type: text/markdown
Requires-Dist: se-imports (>=0.0.2)

ipc-worker: Inter-Process Communication , muti Process Woker works by share memory or MQ.

```py
support share memory (py>=3.8 and linux) and mq process worker (py >=3.6)

```
```py
# -*- coding: utf-8 -*-
# @Time    : 2021/11/23 9:35

'''
demo share memrory
recommended system linux and python >= 3.8
    recommended linux
    python 3.8

Do not recommended run in windows , it will report an error as follow
    RuntimeError:
            An attempt has been made to start a new process before the
            current process has finished its bootstrapping phase.

'''

import multiprocessing
import time,os
from ipc_worker.ipc_shm_loader import IPC_shm,SHM_process_worker



class My_worker(SHM_process_worker):
    def __init__(self,config,*args,**kwargs):
        super(My_worker,self).__init__(*args,**kwargs)
        #config info , use by yourself

        self._logger.info('Process id {}, group name {} ,shm name {}'.format(self._idx,self._group_name,self._shm_name))
        self._logger.info(config)
        self.config = config


    #Process begin trigger this func
    def run_begin(self):
        self._logger.info('worker pid {}...'.format(os.getpid()))
        self.handle = None
        pass

    # Process end trigger this func
    def run_end(self):
        if self.handle is not None:
            pass

    #any data put will trigger this func
    def run_once(self,request_data):
        #process request_data
        if isinstance(request_data,dict):
            request_data['b']  = 200
        if self.handle is not None:
            #do some thing
            pass
        return request_data


if __name__ == '__main__':
    config = {
        "anything" : "anything",
        "aa": 100
    }

    evt_quit = multiprocessing.Manager().Event()

    # group_name 为共享内存组名,需唯一
    # manager is an agent  and act as a load balancing
    # worker is real doing your work
    instance = IPC_shm(
        CLS_worker=My_worker,
        worker_args=(config,),  # must be tuple
        worker_num=10,  # number of worker Process
        manager_num=2,  # number of agent Process
        group_name='serving_shm',  # share memory name
        shm_size=1 * 1024 * 1024,  # share memory size
        queue_size=20,  # recv queue size
        is_log_time=True,  # whether log compute time
    )

    instance.start()

    #demo produce and consume message , you can process by http
    for i in range(10):
        print('produce message')
        data = {"a" : 100}
        request_id = instance.put(data)
        data = instance.get(request_id)
        print('get process result',data)
    try:
        instance.join()
    except Exception as e:
        evt_quit.set()
        instance.terminate()

    del evt_quit
```
```py
# -*- coding: utf-8 -*-
# @Time    : 2021/11/29 15:06
# @Author  : wyw


import multiprocessing
import os
from ipc_worker.ipc_zmq_loader import IPC_zmq,ZMQ_process_worker
'''
    demo ZMQ depend zmq
    pip install pyzmq

    test pass >= python3.6
'''

tmp_dir = './tmp'
if not os.path.exists(tmp_dir):
    os.mkdir(tmp_dir)

os.environ['ZEROMQ_SOCK_TMP_DIR'] = tmp_dir

class My_worker(ZMQ_process_worker):
    def __init__(self,config,*args,**kwargs):
        super(My_worker,self).__init__(*args,**kwargs)
        #config info , use by yourself
        self._logger.info('Process id {}, group name {} , identity {}'.format(self._idx,self._group_name,self._identity))
        self._logger.info(config)
        self.config = config

    #Process begin trigger this func
    def run_begin(self):
        self._logger.info('worker pid {}...'.format(os.getpid()))
        self.handle = None
        pass

    # Process end trigger this func
    def run_end(self):
        if self.handle is not None:
            pass

    #any data put will trigger this func
    def run_once(self,request_data):
        #process request_data
        if isinstance(request_data,dict):
            request_data['b'] = 200
        if self.handle is not None:
            #do some thing
            pass
        return request_data


if __name__ == '__main__':
    config = {
        "anything" : "anything",
        "aa": 100
    }

    evt_quit = multiprocessing.Manager().Event()

    # group_name 为共享内存组名,需唯一
    # manager is an agent  and act as a load balancing
    # worker is real doing your work
    instance = IPC_zmq(
        CLS_worker=My_worker,
        worker_args=(config,),  # must be tuple
        worker_num=10,  # number of worker Process
        group_name='serving_zmq',  # share memory name
        evt_quit=evt_quit,
        queue_size=20,  # recv queue size
        is_log_time=True,  # whether log compute time
    )
    instance.start()

    #demo produce and consume message , you can process by http
    for i in range(10):
        data = {"a" : 100}
        request_id = instance.put(data)

        data = instance.get(request_id)
        print('get process result',request_id,data)
    try:
        instance.join()
    except Exception as e:
        evt_quit.set()
        instance.terminate()
    del evt_quit
```


