
# Examples

## Simplest example

```python
from pyperaptor import Pipeline, Node

def sum1(x):
    return x + 1

def a_generator():
    for x in range(10):
        yield x

p = Pipeline()
p += Node(sum1)

p.lock()

result = p.push(2)
print(result)

# 3
```

## Using process for multiple inputs

```python
from pyperaptor import Pipeline, Node

def sum1(x):
    return x + 1

def a_generator():
    for x in range(10):
        yield x

p = Pipeline()
p += Node(sum1)

p.lock()

results = p.process(list(a_generator()))
print(results)

# [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
```

## Simple example using a generator inside the pipeline

```python
from pyperaptor import Pipeline, Node

def sum1(x):
    return x + 1

def a_generator():
    for x in range(10):
        yield x

p = Pipeline()
p += Node(a_generator) + \
     sum1


p.lock()

results = p.process()
print(results)

# [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
```

## Making a pipeline parallel

```python
from pyperaptor import Pipeline, Node

def sum1(x):
    return x + 1

def a_generator():
    for x in range(10):
        yield x

p = Pipeline(parallel = True, workers=10)
p += Node(a_generator) + \
     sum1

p.lock()

results = p.process()
print(results)

# [6, 5, 1, 10, 9, 3, 7, 2, 8, 4]
```

## Using a Device to avoid racing condition

First, let's see the problem happening:
```python
from pyperaptor import Pipeline, Device, Node

def sum1(x):
    return x + 1

def printer(x):
    print(x, end=" ")
    return x

def a_generator():
    for x in range(10):
        yield x

p = Pipeline(parallel=True, workers=10)
p += Node(sum1) + \
     printer

p.lock()

p.process(list(a_generator()))
# 1 2 3 465 8  7 9 10
```
The above output fails on spacing items because the terminal output is being used by many threads at same time.
To solve this problem let's guarantee that this will not happen

```python
from pyperaptor import Pipeline, Device, Node

def sum1(x):
    return x + 1

def printer(x):
    print(x, end=" ")
    return x

def a_generator():
    for x in range(10):
        yield x
        
TERM = Device("term1")
        
p = Pipeline(parallel=True, workers=10)
p += Node(sum1) + \
     Node(printer, dev=TERM)

p.lock()

p.process(list(a_generator()))
# 1 2 3 4 5 10 7 8 9 6
```

## Holding and refering to previous value
* Not available in parallel mode

```python
from pyperaptor import Pipeline, Node

def sum1(x):
    return x + 1

def printer(*x):
    print(*x)
    return x

p = Pipeline()
p += Node(sum1, keyName="sum1_result", hold=True) + \
     Node(printer, refer=["sum1_result"])

p.lock()
p.process(list(range(10)))
# 1 1
# 2 2
# 3 3
# 4 4
# ...
```

* Buffering single thread and processing in parallel

```python
from pyperaptor import Device, Node, Pipeline
import time, copy, threading

def heavy_work(x):
    # doing something heavy
    time.sleep(1)
    print("doing heavy work at ", threading.current_thread().name)
    return True

def wrapper(x = None):
    if x:
        return heavy_work(x)

def slow_number_yielder():
    for a in range(5):
        time.sleep(0.2)
        yield a
        
class Buffer():
    def __init__(self, limit_break = 5):
        self.__lim__ = limit_break
        self.elems = []
        
    def __call__(self, x):
        print("calling buffer")
        self.elems.append(x)
        if len(self.elems) == self.__lim__ :
            self.elems.append(x)
            c = copy.deepcopy(self.elems)
            self.elems.clear()
            return c

BUFFER = Device("buffer")
        
p = Pipeline([wrapper], parallel=True, workers=5)

fs = [
    slow_number_yielder,
    Node(Buffer(), dev=BUFFER),
    p
]

q = Pipeline(fs)

p.lock()
q.lock()

result = q.process()
print(result)
```
