quickpool.quickpool

  1import time
  2from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor, _base
  3from typing import Any, Callable
  4
  5from noiftimer import Timer
  6from printbuddies import ProgBar, print_in_place
  7
  8
  9class _QuickPool:
 10    def __init__(
 11        self,
 12        functions: list[Callable[..., Any]],
 13        args_list: list[tuple[Any, ...]] = [],
 14        kwargs_list: list[dict[str, Any]] = [],
 15        max_workers: int | None = None,
 16    ):
 17        """Quickly implement multi-threading/processing with an optional progress bar display.
 18
 19        #### :params:
 20
 21        `functions`: A list of functions to be executed.
 22
 23        `args_list`: A list of tuples where each tuple consists of positional arguments to be passed to each successive function in `functions` at execution time.
 24
 25        `kwargs_list`: A list of dictionaries where each dictionary consists of keyword arguments to be passed to each successive function in `functions` at execution time.
 26
 27        `max_workers`: The maximum number of concurrent threads or processes. If `None`, the max available to the system will be used.
 28
 29        The return values of `functions` will be returned as a list by this class' `execute` method.
 30
 31        The relative ordering of `functions`, `args_list`, and `kwargs_list` matters as `args_list` and `kwargs_list` will be distributed to each function squentially.
 32
 33        i.e.
 34        >>> for function_, args, kwargs in zip(functions, args_list, kwargs_list):
 35        >>>     function_(*args, **kwargs)
 36
 37        If `args_list` and/or `kwargs_list` are shorter than the `functions` list, empty tuples and dictionaries will be added to them, respectively.
 38
 39        e.g
 40        >>> import time
 41        >>> def dummy(seconds: int, return_val: int)->int:
 42        >>>     time.sleep(seconds)
 43        >>>     return return_val
 44        >>> num = 10
 45        >>> pool = ThreadPool([dummy]*10, [(i,) for i in range(num)], [{"return_val": i} for i in range(num)])
 46        >>> results = pool.execute()
 47        >>> print(results)
 48        >>> [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]"""
 49        self.functions = functions
 50        self.args_list = args_list
 51        self.kwargs_list = kwargs_list
 52        self.max_workers = max_workers
 53
 54    @property
 55    def executor(self) -> _base.Executor:
 56        raise NotImplementedError
 57
 58    @property
 59    def submissions(self) -> list:
 60        num_functions = len(self.functions)
 61        num_args = len(self.args_list)
 62        num_kwargs = len(self.kwargs_list)
 63        functions = self.functions
 64        args_list = self.args_list
 65        kwargs_list = self.kwargs_list
 66        # Pad args_list and kwargs_list if they're shorter than len(functions)
 67        if num_args < num_functions:
 68            args_list.extend([tuple() for _ in range(num_functions - num_args)])
 69        if num_kwargs < num_functions:
 70            kwargs_list.extend([dict() for _ in range(num_functions - num_kwargs)])
 71        return [
 72            (function_, args, kwargs)
 73            for function_, args, kwargs in zip(functions, args_list, kwargs_list)
 74        ]
 75
 76    def execute(
 77        self,
 78        show_progbar: bool = True,
 79        prefix: str = "",
 80        suffix: str = "",
 81        progbar_update_period: float = 0.001,
 82    ) -> list[Any]:
 83        """Execute the supplied functions with their arguments, if any.
 84
 85        Returns a list of function call results.
 86
 87        #### :params:
 88
 89        `show_progbar`: If `True`, print a progress bar to the terminal showing how many functions have finished executing.
 90
 91        `prefix`: String to display at the front of the progbar (will always include a runtime clock).
 92
 93        `suffix`: String to display after the progbar.
 94
 95        `progbar_update_period`: How often, in seconds, to check the number of completed functions. Only relevant if `show_progbar` is `True`.
 96        """
 97        with self.executor as executor:
 98            workers = [
 99                executor.submit(submission[0], *submission[1], **submission[2])
100                for submission in self.submissions
101            ]
102            if show_progbar:
103                num_workers = len(workers)
104                with ProgBar(num_workers) as bar:
105                    while (
106                        num_complete := len(
107                            [worker for worker in workers if worker.done()]
108                        )
109                    ) < num_workers:
110                        bar.display(
111                            f"{prefix+' '}{bar.runtime}".strip(),  # Remove the space if prefix is an empty string
112                            counter_override=num_complete,
113                            suffix=suffix,
114                        )
115                        time.sleep(progbar_update_period)
116                    bar.display(f"{bar.runtime}", counter_override=num_complete)
117            return [worker.result() for worker in workers]
118
119
120class ProcessPool(_QuickPool):
121    @property
122    def executor(self) -> ProcessPoolExecutor:
123        return ProcessPoolExecutor(self.max_workers)
124
125
126class ThreadPool(_QuickPool):
127    @property
128    def executor(self) -> ThreadPoolExecutor:
129        return ThreadPoolExecutor(self.max_workers)
130
131
132def update_and_wait(
133    function: Callable[..., Any],
134    message: str | Callable[[], Any] = "",
135    *args,
136    **kwargs,
137) -> Any:
138    """While `function` runs with `*args` and `**kwargs`,
139    print out an optional `message` (a runtime clock will be appended to `message`) at 1 second intervals.
140
141    Returns the output of `function`.
142
143    >>> def main():
144    >>>   def trash(n1: int, n2: int) -> int:
145    >>>      time.sleep(10)
146    >>>      return n1 + n2
147    >>>   val = update_and_wait(trash, "Waiting on trash", 10, 22)
148    >>>   print(val)
149    >>> main()
150    >>> Waiting on trash | runtime: 9s 993ms 462us
151    >>> 32"""
152
153    timer = Timer().start()
154
155    def update():
156        if isinstance(message, str):
157            display_message = f"{message} |"
158        else:
159            display_message = f"{message()} |"
160        print_in_place(
161            f"{display_message} runtime: {timer.elapsed_str}".strip(), True
162        )  # Remove the space if display_message is an empty string
163        time.sleep(1)
164
165    with ThreadPoolExecutor() as pool:
166        worker = pool.submit(function, *args, **kwargs)
167        while not worker.done():
168            update()
169    print()
170    return worker.result()
class ProcessPool(_QuickPool):
121class ProcessPool(_QuickPool):
122    @property
123    def executor(self) -> ProcessPoolExecutor:
124        return ProcessPoolExecutor(self.max_workers)
Inherited Members
_QuickPool
_QuickPool
execute
class ThreadPool(_QuickPool):
127class ThreadPool(_QuickPool):
128    @property
129    def executor(self) -> ThreadPoolExecutor:
130        return ThreadPoolExecutor(self.max_workers)
Inherited Members
_QuickPool
_QuickPool
execute
def update_and_wait( function: Callable[..., Any], message: Union[str, Callable[[], Any]] = '', *args, **kwargs) -> Any:
133def update_and_wait(
134    function: Callable[..., Any],
135    message: str | Callable[[], Any] = "",
136    *args,
137    **kwargs,
138) -> Any:
139    """While `function` runs with `*args` and `**kwargs`,
140    print out an optional `message` (a runtime clock will be appended to `message`) at 1 second intervals.
141
142    Returns the output of `function`.
143
144    >>> def main():
145    >>>   def trash(n1: int, n2: int) -> int:
146    >>>      time.sleep(10)
147    >>>      return n1 + n2
148    >>>   val = update_and_wait(trash, "Waiting on trash", 10, 22)
149    >>>   print(val)
150    >>> main()
151    >>> Waiting on trash | runtime: 9s 993ms 462us
152    >>> 32"""
153
154    timer = Timer().start()
155
156    def update():
157        if isinstance(message, str):
158            display_message = f"{message} |"
159        else:
160            display_message = f"{message()} |"
161        print_in_place(
162            f"{display_message} runtime: {timer.elapsed_str}".strip(), True
163        )  # Remove the space if display_message is an empty string
164        time.sleep(1)
165
166    with ThreadPoolExecutor() as pool:
167        worker = pool.submit(function, *args, **kwargs)
168        while not worker.done():
169            update()
170    print()
171    return worker.result()

While function runs with *args and **kwargs, print out an optional message (a runtime clock will be appended to message) at 1 second intervals.

Returns the output of function.

>>> def main():
>>>   def trash(n1: int, n2: int) -> int:
>>>      time.sleep(10)
>>>      return n1 + n2
>>>   val = update_and_wait(trash, "Waiting on trash", 10, 22)
>>>   print(val)
>>> main()
>>> Waiting on trash | runtime: 9s 993ms 462us
>>> 32