Metadata-Version: 2.1
Name: flowsaber
Version: 0.1.3.6.6
Summary: A dataflow based workflow framework
Home-page: https://github.com/zhqu1148980644/flowsaber
Author: bakezq
Author-email: zhongquan789@gmail.com
License: MIT
Keywords: dataflow,workflow,asyncio
Platform: UNKNOWN
Classifier: Development Status :: 3 - Alpha
Classifier: Operating System :: POSIX
Classifier: Programming Language :: Python
Classifier: Programming Language :: Python :: 3
Classifier: Programming Language :: Python :: 3.7
Classifier: Programming Language :: Python :: 3.8
Classifier: Programming Language :: Python :: 3.9
Classifier: License :: OSI Approved :: MIT License
Classifier: Intended Audience :: Science/Research
Classifier: Topic :: Scientific/Engineering :: Bio-Informatics
Classifier: Topic :: Software Development :: Libraries
Requires-Python: >=3.7, <4
Description-Content-Type: text/markdown
Requires-Dist: makefun
Requires-Dist: cloudpickle
Requires-Dist: dask
Requires-Dist: distributed
Requires-Dist: psutil
Requires-Dist: aiohttp
Requires-Dist: pydantic
Requires-Dist: fire
Requires-Dist: ariadne
Requires-Dist: uvicorn
Requires-Dist: dnspython
Requires-Dist: motor
Provides-Extra: test
Requires-Dist: pytest ; extra == 'test'
Requires-Dist: pytest-cov ; extra == 'test'
Requires-Dist: httpimport ; extra == 'test'
Requires-Dist: autodocsumm ; extra == 'test'
Requires-Dist: pysam ; extra == 'test'
Requires-Dist: matplotlib ; extra == 'test'

# A dataflow based workflow framework.
### work in progress































### Features

- Dataflow-like flow/task composing syntax inspired from [nextflow](https://github.com/nextflow-io/nextflow) 's DSL2.
- Pure python: No DSL, Import/Compose/Modify Task/Flow python objects at will.
    - Extensible and interactive due to dynamic nature of Python.
        - Task Cache.
        - ...
- Distributable: Use Dask distributed as Task executor, can deploy in local, cluster, cloud.
- Support hybrid model inspired from [prefect](https://github.com/PrefectHQ/prefect).
    - Build Flow in Local python or web UI(In progress).
    - Schedule/Monitor flow execution in remote server through python or web UI(In progress).

### Install

```bash
pip install flowsaber
```

### Example

- A minimal working example consists most features and usages of `flowsaber`.

```python
from flowsaber.api import *

@task
def add(self, num):  # self is optional
    return num + 1

@task
def multiply(num1, num2):
    return num1 * num2

@shell
def write(num):
    """echo {num} > 1.txt"""
    return '*.txt'

@task
def read(f: File):
    return open(str(f)).readlines()

@flow
def sub_flow(num):
    return add(num) | map_(lambda x: x ** 2) | add

@flow
def my_flow(num):
    [sub_flow(num), sub_flow(num)] | multiply \
    | write | read | flatten \
    | map_(lambda x: int(x.strip())) \
    | view

num_ch = Channel.values(1, 2, 3, 4, 5, 6, 7, 8)
# resolve dependencies
workflow = my_flow(num=num_ch)
run(workflow)
```


### Example to run in remote

##### Start server(API endpoint)
In bash shell.
```bash
flowsaber server
```

##### Start agent(Flow dispatcher)
In bash shell.
```bash
flowsaber agent --server "http://127.0.0.1:8000" --id test
```

##### Create flow and schedule for running
In python script or IPython console.
```python
from flowsaber.api import *
@task
def add(num):
    print("This is meesage send by print to stdout in task")
    print("This is meesage send by print to stderr in task", file= sys.stderr)
    a = 1
    for i in range(10000000):
        a += 1
    return num + 1

@flow
def myflow(num):
    return num | add | add | view | add | view

num_ch = Channel.values(*list(range(10)))
f = myflow(num_ch)

run(f, server_address="http://127.0.0.1:8000", agent_id="test")
```

### Test

```bash
python -m pytest tests -s -o log_cli=True -vvvv
```


### TODO

- [ ] Web interface.
- [ ] Pbs/Torque executor
- [ ] More cache mode.
- [ ] Supportrun in Cloud platform.
- [ ] Run CWL script, Convert between CWL and flowsaber flow.

### Reference
- [nextflow](https://github.com/nextflow-io/nextflow)
- [prefect](https://github.com/PrefectHQ/prefect)


