Metadata-Version: 2.1
Name: taskproc
Version: 0.18.0
Summary: A lightweight task processing library written in pure Python
Home-page: https://github.com/koheimiya/taskproc
Author-Email: Kohei Miyaguchi <koheimiyaguchi@gmail.com>
License: MIT License
Project-URL: Homepage, https://github.com/koheimiya/taskproc
Project-URL: Repository, https://github.com/koheimiya/taskproc
Requires-Python: <4.0,>=3.10
Requires-Dist: typing-extensions<5.0.0,>=4.5.0
Requires-Dist: diskcache<6.0.0,>=5.6.1
Requires-Dist: cloudpickle<3.0.0,>=2.2.1
Requires-Dist: networkx<4.0,>=3.1
Requires-Dist: click<9.0.0,>=8.1.3
Requires-Dist: tqdm<5.0.0,>=4.65.0
Requires-Dist: python-dotenv<2.0.0,>=1.0.0
Requires-Dist: pudb>=2022.1.3; extra == "dev"
Provides-Extra: dev
Description-Content-Type: text/markdown

# taskproc

A lightweight pipeline building/execution/management tool written in pure Python.
Internally, it depends on `DiskCache`, `cloudpickle` `networkx` and `concurrent.futures`.

## Why `taskproc`?
I needed a pipeline-handling library that is thin and flexible as much as possible.
* `Luigi` is not flexible enough: The definition of the dependencies and the definition of the task computation is tightly coupled at `luigi.Task`s, 
which is super cumbersome if one tries to edit the pipeline structure without changing the computation of each task.
* `Airflow` is too big and clumsy: It requires a message broker backend separately installed and run in background. It is also incompatible with non-pip package manager (such as Poetry).
* Most of the existing libraries tend to build their own ecosystems that unnecessarily forces the user to follow the specific way of handling pipelines.

`taskproc` aims to provide a language construct for defining computation by composition, ideally as simple as Python's built-in sytax of functions, with the support of automatic and configurable parallel execution and cache management.  

#### Features
* Decomposing long and complex computation into tasks, i.e., smaller units of work with dependencies.
* Executing them in a distributed way, supporting multithreading/multiprocessing and local container/cluster-based dispatching.
* Automatically creating/discarding/reusing caches per task. 

#### Nonfeatures
* Periodic scheduling
* Automatic retry
* External service integration (GCP, AWS, ...)
* Graphical user interface

## Installation

```
pip install taskproc
```

## Example
See [here](examples/ml_taskfile.py) for a typical usage of `taskproc`.

## Documentation

### Defining task

Pipeline is a directed acyclic graph (DAG) of tasks with a single sink node (i.e., final task), where task is a unit of work represented with a class.
Each task and its upstream dependencies are specified with a class definition like so:
```python
from taskproc import Task, Const, Cache

class Choose(Task):
    """ Compute the binomial coefficient. """

    def __init__(self, n: int, k: int):
        # The upstream tasks and the other instance attributes are prepared here.
        # It thus recursively defines all the tasks we need to run this task,
        # i.e., the entire upstream pipeline.

        if 0 < k < n:
            self.prev1 = Choose(n - 1, k - 1)
            self.prev2 = Choose(n - 1, k)
        elif k == 0 or k == n:
            # We can just pass a value to a requirement slot directly without running tasks.
            self.prev1 = Const(0)
            self.prev2 = Const(1)
        else:
            raise ValueError(f'{(n, k)}')

    def run_task(self) -> int:
        # Here we define the main computation of the task,
        # which is delayed until it is necessary.

        # The return values of the prerequisite tasks are accessible via `.get_result()`:
        return self.prev1.get_result() + self.prev2.get_result()

with Cache('./cache'):
    # Construct a task with its upstreams.
    # Instantiation of `Task` should be inside `Cache`.
    task = Choose(6, 3)

# To run the task graph, use the `run_graph()` method.
ans, stats = task.run_graph()  # `ans` should be 6 Choose 3, which is 20.

# It greedily executes all the necessary tasks in the graph as parallel as possible
# and then produces the return value of the task on which we call `run_graph()`, as well as the execution stats.
# The return values of the intermediate tasks are cached at `./cache`
# and reused on the fly whenever possible.
```

### Deleting cache

It is possible to selectively discard cache: 
```python
with Cache('./cache'):
    # After some modificaiton of `Choose(3, 3)`,
    # selectively discard the cache corresponding to the modification.
    Choose(3, 3).clear_task()

    # `ans` is recomputed tracing back to the computation of `Choose(3, 3)`.
    ans, _ = Choose(6, 3).run_graph()
    
    # Delete all the cache associated with `Choose`.
    Choose.clear_all_tasks()            
```

### Task Composition

The arguments of the `__init__` method can be anything JSON serializable + `Future`s,
where `Future` is either `Task` or `Const`.
```python
class MyTask(Task):
    def __init__(self, param1, param2):
        ...

with Cache('./cache'):
    MyTask(
        param1={
            'upstream_task0': UpstreamTask(),
            'other_params': [1, 2],
            ...
        },
        param2={ ... }
    }).run_graph()
```

List/dict of upstream tasks can be registered with `FutureList` and `FutureDict`:
```python
from taskproc import FutureList, FutureDict

class SummarizeScores(Task):

    def __init__(self, task_dict: dict[str, Future[float]]):
        self.score_list = FutureList([MyScore(i) for i in range(10)])
        self.score_dict = FutureDict(task_dict)

    def run_task(self) -> float:
        # `.get_result()` evaluates `FutureList[float]` and `FutureDict[str, float]` into
        # `list[float]` and `dict[str, float]`, respectively.
        return sum(self.score_dict.get_result().values()) / len(self.score_dict.get_result())
```

The output of the `run_task` method should be serializable with `cloudpickle`,
which is then compressed with `gzip`.
The compression level can be changed as follows (defaults to 9).
```python
class NoCompressionTask(Task):
    task_compress_level = 0
    ...
```

If the output is a list or a dictionary, one can directly access its element by indexing.
The result is also a `Future`.
```python
class MultiOutputTask(Task):
    def run_task(self) -> dict[str, int]:
        return {'foo': 42, ...}

class DownstreamTask(Task):
    def __init__(self):
        self.dep = MultiOutputTask()['foo']
```

### Data directories

Use `task.task_directory` to get a fresh path dedicated to each task.
The directory is automatically created and managed along with the task cache:
The contents of the directory are cleared at each task call and persist until the task is cleared.
```python
class TrainModel(Task):
    def run_task(self) -> str:
        ...
        model_path = self.task_directory / 'model.bin'
        model.save(model_path)
        return model_path
```


### Job scheduling and prefixes
Tasks can be run with job schedulers using `task_prefix_command`, which will be inserted just before each task call.
```python

class TaskWithJobScheduler(Task):
    task_prefix_command = 'jbsub -interactive -tty -queue x86_1h -cores 16+1 -mem 64g'
    ...
```

### Execution policy configuration

One can control the task execution with `concurrent.futures.Executor` class:
```python
from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor

class MyTask(Task):
    ...

with Cache('./cache'):
    # Limit the number of parallel workers
    MyTask().run_graph(executor=ProcessPoolExecutor(max_workers=2))
    
    # Thread-based parallelism
    MyTask().run_graph(executor=ThreadPoolExecutor())
```

One can also control the concurrency at a task/channel level:
```python
class TaskUsingGPU(Task):
    task_channel = 'gpu'
    ...

class AnotherTaskUsingGPU(Task):
    task_channel = ['gpu', 'memory']
    ...

with Cache('./cache'):
    # Queue-level concurrency control
    SomeDownstreamTask().run_graph(rate_limits={'gpu': 1})
    SomeDownstreamTask().run_graph(rate_limits={'memory': 1})
    
    # Task-level concurrency control
    SomeDownstreamTask().run_graph(rate_limits={TaskUsingGPU.task_name: 1})

```

### Commandline tool
`Task` have a utility classmethod to run with commandline arguments.
For example,
```python
# taskfile.py

class Main(Task):
    ...


if __name__ == '__main__':
    Main.cli()
```
Use `--help` option for more details.


### Built-in properties/methods
Below is the list of the built-in properties/methods of `Task`. Do not override these attributes in the subclass.

| Name | Owner | Type | Description |
|--|--|--|--|
| `task_name`            | class    | property | String id of the task class |
| `task_id`              | instance | property | Integer id of the task, unique within the same task class  |
| `task_args`            | instance | property | The arguments of the task in JSON |
| `task_directory`       | instance | property | Path to the data directory of the task |
| `task_stdout`          | instance | property | Path to the task's stdout |
| `task_stderr`          | instance | property | Path to the task's stderr |
| `run_task`             | instance | method   | Run the task |
| `run_graph`            | instance | method   | Run the task after necessary upstream tasks and save the results in the cache |
| `get_task_result`      | instance | method   | Directly get the result of the task (fails if the cache is missing) |
| `to_json`              | instance | method   | Serialize itself as a JSON dictionary |
| `clear_task`           | instance | method   | Clear the cache of the task instance |
| `clear_all_tasks`      | class    | method   | Clear the cache of the task class |
| `cli`                  | class    | method   | `run_graph` with command line arguments |

## TODO
- Better UX
    - Add option to not cache result (need to address timestamp peeking and value passing).
    - Validate non-Future task argument with type hint.
- Optional
    - Simple task graph visualizer.
    - Pydantic/dataclass support in task arguments (as an incompatible, but better-UX object with TypedDict).
