quickpool.quickpool
1import time 2from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor, _base 3from typing import Any, Callable 4 5from noiftimer import Timer 6from printbuddies import ProgBar, print_in_place 7 8 9class _QuickPool: 10 def __init__( 11 self, 12 functions: list[Callable[..., Any]], 13 args_list: list[tuple[Any, ...]] = [], 14 kwargs_list: list[dict[str, Any]] = [], 15 max_workers: int | None = None, 16 ): 17 """Quickly implement multi-threading/processing with an optional progress bar display. 18 19 #### :params: 20 21 `functions`: A list of functions to be executed. 22 23 `args_list`: A list of tuples where each tuple consists of positional arguments to be passed to each successive function in `functions` at execution time. 24 25 `kwargs_list`: A list of dictionaries where each dictionary consists of keyword arguments to be passed to each successive function in `functions` at execution time. 26 27 `max_workers`: The maximum number of concurrent threads or processes. If `None`, the max available to the system will be used. 28 29 The return values of `functions` will be returned as a list by this class' `execute` method. 30 31 The relative ordering of `functions`, `args_list`, and `kwargs_list` matters as `args_list` and `kwargs_list` will be distributed to each function squentially. 32 33 i.e. 34 >>> for function_, args, kwargs in zip(functions, args_list, kwargs_list): 35 >>> function_(*args, **kwargs) 36 37 If `args_list` and/or `kwargs_list` are shorter than the `functions` list, empty tuples and dictionaries will be added to them, respectively. 38 39 e.g 40 >>> import time 41 >>> def dummy(seconds: int, return_val: int)->int: 42 >>> time.sleep(seconds) 43 >>> return return_val 44 >>> num = 10 45 >>> pool = ThreadPool([dummy]*10, [(i,) for i in range(num)], [{"return_val": i} for i in range(num)]) 46 >>> results = pool.execute() 47 >>> print(results) 48 >>> [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]""" 49 self.functions = functions 50 self.args_list = args_list 51 self.kwargs_list = kwargs_list 52 self.max_workers = max_workers 53 54 @property 55 def executor(self) -> _base.Executor: 56 raise NotImplementedError 57 58 @property 59 def submissions(self) -> list: 60 num_functions = len(self.functions) 61 num_args = len(self.args_list) 62 num_kwargs = len(self.kwargs_list) 63 functions = self.functions 64 args_list = self.args_list 65 kwargs_list = self.kwargs_list 66 # Pad args_list and kwargs_list if they're shorter than len(functions) 67 if num_args < num_functions: 68 args_list.extend([tuple() for _ in range(num_functions - num_args)]) 69 if num_kwargs < num_functions: 70 kwargs_list.extend([dict() for _ in range(num_functions - num_kwargs)]) 71 return [ 72 (function_, args, kwargs) 73 for function_, args, kwargs in zip(functions, args_list, kwargs_list) 74 ] 75 76 def execute( 77 self, 78 show_progbar: bool = True, 79 prefix: str = "", 80 suffix: str = "", 81 progbar_update_period: float = 0.001, 82 ) -> list[Any]: 83 """Execute the supplied functions with their arguments, if any. 84 85 Returns a list of function call results. 86 87 #### :params: 88 89 `show_progbar`: If `True`, print a progress bar to the terminal showing how many functions have finished executing. 90 91 `prefix`: String to display at the front of the progbar (will always include a runtime clock). 92 93 `suffix`: String to display after the progbar. 94 95 `progbar_update_period`: How often, in seconds, to check the number of completed functions. Only relevant if `show_progbar` is `True`. 96 """ 97 with self.executor as executor: 98 workers = [ 99 executor.submit(submission[0], *submission[1], **submission[2]) 100 for submission in self.submissions 101 ] 102 if show_progbar: 103 num_workers = len(workers) 104 with ProgBar(num_workers) as bar: 105 while ( 106 num_complete := len( 107 [worker for worker in workers if worker.done()] 108 ) 109 ) < num_workers: 110 bar.display( 111 f"{prefix+' '}{bar.runtime}".strip(), # Remove the space if prefix is an empty string 112 counter_override=num_complete, 113 suffix=suffix, 114 ) 115 time.sleep(progbar_update_period) 116 bar.display(f"{bar.runtime}", counter_override=num_complete) 117 return [worker.result() for worker in workers] 118 119 120class ProcessPool(_QuickPool): 121 @property 122 def executor(self) -> ProcessPoolExecutor: 123 return ProcessPoolExecutor(self.max_workers) 124 125 126class ThreadPool(_QuickPool): 127 @property 128 def executor(self) -> ThreadPoolExecutor: 129 return ThreadPoolExecutor(self.max_workers) 130 131 132def update_and_wait( 133 function: Callable[..., Any], 134 message: str | Callable[[], Any] = "", 135 *args, 136 **kwargs, 137) -> Any: 138 """While `function` runs with `*args` and `**kwargs`, 139 print out an optional `message` (a runtime clock will be appended to `message`) at 1 second intervals. 140 141 Returns the output of `function`. 142 143 >>> def main(): 144 >>> def trash(n1: int, n2: int) -> int: 145 >>> time.sleep(10) 146 >>> return n1 + n2 147 >>> val = update_and_wait(trash, "Waiting on trash", 10, 22) 148 >>> print(val) 149 >>> main() 150 >>> Waiting on trash | runtime: 9s 993ms 462us 151 >>> 32""" 152 153 timer = Timer().start() 154 155 def update(): 156 if isinstance(message, str): 157 display_message = f"{message} |" 158 else: 159 display_message = f"{message()} |" 160 print_in_place( 161 f"{display_message} runtime: {timer.elapsed_str}".strip(), True 162 ) # Remove the space if display_message is an empty string 163 time.sleep(1) 164 165 with ThreadPoolExecutor() as pool: 166 worker = pool.submit(function, *args, **kwargs) 167 while not worker.done(): 168 update() 169 print() 170 return worker.result()
121class ProcessPool(_QuickPool): 122 @property 123 def executor(self) -> ProcessPoolExecutor: 124 return ProcessPoolExecutor(self.max_workers)
Inherited Members
127class ThreadPool(_QuickPool): 128 @property 129 def executor(self) -> ThreadPoolExecutor: 130 return ThreadPoolExecutor(self.max_workers)
Inherited Members
def
update_and_wait( function: Callable[..., Any], message: Union[str, Callable[[], Any]] = '', *args, **kwargs) -> Any:
133def update_and_wait( 134 function: Callable[..., Any], 135 message: str | Callable[[], Any] = "", 136 *args, 137 **kwargs, 138) -> Any: 139 """While `function` runs with `*args` and `**kwargs`, 140 print out an optional `message` (a runtime clock will be appended to `message`) at 1 second intervals. 141 142 Returns the output of `function`. 143 144 >>> def main(): 145 >>> def trash(n1: int, n2: int) -> int: 146 >>> time.sleep(10) 147 >>> return n1 + n2 148 >>> val = update_and_wait(trash, "Waiting on trash", 10, 22) 149 >>> print(val) 150 >>> main() 151 >>> Waiting on trash | runtime: 9s 993ms 462us 152 >>> 32""" 153 154 timer = Timer().start() 155 156 def update(): 157 if isinstance(message, str): 158 display_message = f"{message} |" 159 else: 160 display_message = f"{message()} |" 161 print_in_place( 162 f"{display_message} runtime: {timer.elapsed_str}".strip(), True 163 ) # Remove the space if display_message is an empty string 164 time.sleep(1) 165 166 with ThreadPoolExecutor() as pool: 167 worker = pool.submit(function, *args, **kwargs) 168 while not worker.done(): 169 update() 170 print() 171 return worker.result()
While function runs with *args and **kwargs,
print out an optional message (a runtime clock will be appended to message) at 1 second intervals.
Returns the output of function.
>>> def main():
>>> def trash(n1: int, n2: int) -> int:
>>> time.sleep(10)
>>> return n1 + n2
>>> val = update_and_wait(trash, "Waiting on trash", 10, 22)
>>> print(val)
>>> main()
>>> Waiting on trash | runtime: 9s 993ms 462us
>>> 32