
Creates a stage that filter the data given a predicate function f. exactly like python's built-in filter function.

import pypeln as pl
import time
from random import random

def slow_gt3(x):
    time.sleep(random()) # <= some slow computation
    return x > 3

data = range(10) # [0, 1, 2, ..., 9]
stage = pl.sync.filter(slow_gt3, data, workers=3, maxsize=4)

data = list(stage) # [3, 4, 5, ..., 9]


Name Type Description Default
f FilterFn

A function with signature f(x) -> bool. f can accept additional arguments by name as described in Advanced Usage.

stage Union[pypeln.sync.stage.Stage[~A], Iterable[~A], pypeln.utils.Undefined]

A Stage or Iterable.

<pypeln.utils.Undefined object at 0x7f27e00aaaf0>
workers int

This parameter is not used and only kept for API compatibility with the other modules.

maxsize int

This parameter is not used and only kept for API compatibility with the other modules.

timeout float

Seconds before stoping the worker if its current task is not yet completed. Defaults to 0 which means its unbounded.

on_start Callable

A function with signature on_start(worker_info?) -> kwargs?, where kwargs can be a dict of keyword arguments that can be consumed by f and on_done. on_start can accept additional arguments by name as described in Advanced Usage.

on_done Callable

A function with signature on_done(stage_status?). This function is executed once per worker when the worker finishes. on_done can accept additional arguments by name as described in Advanced Usage.



To implement timeout we use stopit.ThreadingTimeout which has some limitations.


Type Description
Union[pypeln.sync.stage.Stage[~B], pypeln.utils.Partial[pypeln.sync.stage.Stage[~B]]]

Returns a Stage if the stage parameters is given, else it returns a Partial.

Source code in pypeln/sync/api/
def filter(
    f: FilterFn,
    stage: tp.Union[
        Stage[A], tp.Iterable[A], tp.Iterable[A], pypeln_utils.Undefined
    ] = pypeln_utils.UNDEFINED,
    workers: int = 1,
    maxsize: int = 0,
    timeout: float = 0,
    on_start: tp.Callable = None,
    on_done: tp.Callable = None,
) -> tp.Union[Stage[B], pypeln_utils.Partial[Stage[B]]]:
    Creates a stage that filter the data given a predicate function `f`. exactly like python's built-in `filter` function.

    import pypeln as pl
    import time
    from random import random

    def slow_gt3(x):
        time.sleep(random()) # <= some slow computation
        return x > 3

    data = range(10) # [0, 1, 2, ..., 9]
    stage = pl.sync.filter(slow_gt3, data, workers=3, maxsize=4)

    data = list(stage) # [3, 4, 5, ..., 9]

        f: A function with signature `f(x) -> bool`. `f` can accept additional arguments by name as described in [Advanced Usage](
        stage: A Stage or Iterable.
        workers: This parameter is not used and only kept for API compatibility with the other modules.
        maxsize: This parameter is not used and only kept for API compatibility with the other modules.
        timeout: Seconds before stoping the worker if its current task is not yet completed. Defaults to `0` which means its unbounded.
        on_start: A function with signature `on_start(worker_info?) -> kwargs?`, where `kwargs` can be a `dict` of keyword arguments that can be consumed by `f` and `on_done`. `on_start` can accept additional arguments by name as described in [Advanced Usage](
        on_done: A function with signature `on_done(stage_status?)`. This function is executed once per worker when the worker finishes. `on_done` can accept additional arguments by name as described in [Advanced Usage](

    !!! warning
        To implement `timeout` we use `stopit.ThreadingTimeout` which has some limitations.

        Returns a `Stage` if the `stage` parameters is given, else it returns a `Partial`.

    if isinstance(stage, pypeln_utils.Undefined):
        return pypeln_utils.Partial(
            lambda stage: filter(

    stage_ = to_stage(stage, maxsize=maxsize)

    return Stage(