
The task module lets you create pipelines using objects from python's asyncio module according to Pypeln's general architecture. Use this module when you are in need to perform efficient asynchronous IO operations and DONT need to perform heavy CPU operations.

Most functions in this module return a pl.task.Stage object which implement the Iterable, AsyncIterable, and Awaitable interfaces which enables you to combine it seamlessly with regular Python code.


You can call await con any pl.thread.Stage to get back the results of its computation:

import pypeln as pl
import asyncio
from random import random

async def slow_add1(x):
    await asyncio.sleep(random()) # <= some slow computation
    return x + 1

async def slow_gt3(x):
    await asyncio.sleep(random()) # <= some slow computation
    return x > 3

async def main()
    data = range(10) # [0, 1, 2, ..., 9] 

    stage =, data, workers=3, maxsize=4)
    stage = pl.task.filter(slow_gt3, stage, workers=2)

    data = await stage # e.g. [5, 6, 9, 4, 8, 10, 7]

When calling await on a stage you will get back the same result if you called list on it with be big difference that you wont block the current thread while waiting for the computation to materialize.


task Stages are asynchronous generators so you can iterate through them using async for to get access each new element as soon as it become available:

import pypeln as pl
import asyncio
from random import random

async def slow_add1(x):
    await asyncio.sleep(random()) # <= some slow computation
    return x + 1

async def slow_gt3(x):
    await asyncio.sleep(random()) # <= some slow computation
    return x > 3

async def main()
    data = range(10) # [0, 1, 2, ..., 9] 

    stage =, data, workers=3, maxsize=4)
    stage = pl.task.filter(slow_gt3, stage, workers=2)

    async for element in stage:
        pritn(element) # 5, 6, 9, 4, 8, 10, 7

When iterating a stage using async for you will get back the same result as if you called the normal for on it with be big difference that you wont block the current thread while waiting for the next element.

Event Loop

When you run a task stage all the tasks will be scheduled in the event loop on the current thread if it exists, else Pypeln will create and keep alive a new event loop.
