Metadata-Version: 2.1
Name: async-pool-executor
Version: 0.1
Summary: async_pool_executor,its api like the concurrent.futures
Home-page: UNKNOWN
Author: bfzs
Author-email: ydf0509@sohu.com
Maintainer: ydf
Maintainer-email: ydf0509@sohu.com
License: BSD License
Description: ## pip install async_pool_executor
        
        
        
        ## 主要功能
        
        ```
        主要功能是仿照 concurrent.futures 的线程池报的submit shutdown方法。
        
        使得在做生产 消费 时候，无需学习烦人的异步 loop 、 run_until_complete ，可以直接在同步函数中 submit。
        生产和消费不需要在同一个loop中，喜欢同步编程思维的人可以用这个。
        
        ```
        
        
        ## 实现代码
        
        ```python
        
        import asyncio
        import atexit
        import time
        import traceback
        from threading import Thread
        
        
        class AsyncPoolExecutor:
            """
            使api和线程池一样，最好的性能做法是submit也弄成 async def，生产和消费在同一个线程同一个loop一起运行，但会对调用链路的兼容性产生破坏，从而调用方式不兼容线程池。
            """
        
            def __init__(self, size, loop=None):
                """
        
                :param size: 同时并发运行的协程任务数量。
                :param loop:
                """
                self._size = size
                self.loop = loop or asyncio.new_event_loop()
                self._sem = asyncio.Semaphore(self._size, loop=self.loop)
                self._queue = asyncio.Queue(maxsize=size, loop=self.loop)
                t = Thread(target=self._start_loop_in_new_thread)
                t.setDaemon(True)  # 设置守护线程是为了有机会触发atexit，使程序自动结束，不用手动调用shutdown
                t.start()
                self._can_be_closed_flag = False
                atexit.register(self.shutdown)
        
        
            def submit(self, func, *args, **kwargs):
                future = asyncio.run_coroutine_threadsafe(self._produce(func, *args, **kwargs), self.loop)  # 这个 run_coroutine_threadsafe 方法也有缺点，消耗的性能巨大。
                future.result()  # 阻止过快放入，放入超过队列大小后，使submit阻塞。
        
            async def _produce(self, func, *args, **kwargs):
                await self._queue.put((func, args, kwargs))
        
            async def _consume(self):
                while True:
                    func, args, kwargs = await self._queue.get()
                    if func == 'stop':
                        break
                    try:
                        await func(*args, **kwargs)
                    except Exception as e:
                        traceback.print_exc()
        
            def _start_loop_in_new_thread(self, ):
                # self._loop.run_until_complete(self.__run())  # 这种也可以。
                # self._loop.run_forever()
        
                # asyncio.set_event_loop(self.loop)
                self.loop.run_until_complete(asyncio.wait([self._consume() for _ in range(self._size)], loop=self.loop))
                self._can_be_closed_flag = True
        
            def shutdown(self):
                for _ in range(self._size):
                    self.submit('stop', )
                while not self._can_be_closed_flag:
                    time.sleep(0.1)
                self.loop.close()
                print('关闭循环')
        
        
        if __name__ == '__main__':
            import nb_log
            async def async_f(x):
                await asyncio.sleep(2)
                print(x)
        
            pool  =AsyncPoolExecutor(3)
            for i in range(30):
                pool.submit(async_f,i)
        ```
Keywords: async_pool_executor,threadpoolexecutor,sync,async
Platform: all
Classifier: Development Status :: 4 - Beta
Classifier: Operating System :: OS Independent
Classifier: Intended Audience :: Developers
Classifier: License :: OSI Approved :: BSD License
Classifier: Programming Language :: Python
Classifier: Programming Language :: Python :: Implementation
Classifier: Programming Language :: Python :: 3.6
Classifier: Topic :: Software Development :: Libraries
Description-Content-Type: text/markdown
