Skip to content

flowsaber/aiosaber

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

6 Commits
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

A concurrent streaming package

Install with PyPi Github release Version Downloads Downloads per week Build Status codecov license

  • Dataflow based functional syntax.
  • Implicitly parallelism for both async and non-async functions.
  • Composable for both flows and tasks.
  • Extensible with middlewares.

Installation

pip install aiosaber

Example

  • check tests for more examples.
from aiosaber import *
@task
def add(self, num):
    for i in range(100000):
        num += 1
    return num

@task
async def multiply(num1, num2):
    return num1 * num2

@flow
def sub_flow(num):
    return add(num) | map_(lambda x: x ** 2) | add

@flow
def my_flow(num):
    [sub_flow(num), sub_flow(num)] | multiply | view

num_ch = Channel.values(*list(range(100)))
f = my_flow(num_ch)
asyncio.run(f.start())