pl.thread.filter

Creates a stage that filter the data given a predicate function f. It is intended to behave like python's built-in filter function but with the added concurrency.

import pypeln as pl
import time
from random import random

def slow_gt3(x):
    time.sleep(random()) # <= some slow computation
    return x > 3

data = range(10) # [0, 1, 2, ..., 9]
stage = pl.thread.filter(slow_gt3, data, workers=3, maxsize=4)

data = list(stage) # e.g. [5, 6, 3, 4, 7, 8, 9]

Note

Because of concurrency order is not guaranteed.

Parameters:

Name Type Description Default
f FilterFn

A function with signature f(x) -> bool. f can accept additional arguments by name as described in Advanced Usage.

required
stage Union[pypeln.thread.stage.Stage[~A], Iterable[~A], pypeln.utils.Undefined]

A Stage or Iterable.

<pypeln.utils.Undefined object at 0x7f54c588db20>
workers int

The number of workers the stage should contain.

1
maxsize int

The maximum number of objects the stage can hold simultaneously, if set to 0 (default) then the stage can grow unbounded.

0
timeout float

Seconds before stoping the worker if its current task is not yet completed. Defaults to 0 which means its unbounded.

0
on_start Callable

A function with signature on_start(worker_info?) -> kwargs?, where kwargs can be a dict of keyword arguments that can be consumed by f and on_done. on_start can accept additional arguments by name as described in Advanced Usage.

None
on_done Callable

A function with signature on_done(stage_status?). This function is executed once per worker when the worker finishes. on_done can accept additional arguments by name as described in Advanced Usage.

None

Returns:

Type Description
Union[pypeln.thread.stage.Stage[~B], pypeln.utils.Partial[pypeln.thread.stage.Stage[~B]]]

Returns a Stage if the stage parameters is given, else it returns a Partial.

Source code in pypeln/thread/api/filter.py
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
def filter(
    f: FilterFn,
    stage: tp.Union[
        Stage[A], tp.Iterable[A], tp.Iterable[A], pypeln_utils.Undefined
    ] = pypeln_utils.UNDEFINED,
    workers: int = 1,
    maxsize: int = 0,
    timeout: float = 0,
    on_start: tp.Callable = None,
    on_done: tp.Callable = None,
) -> tp.Union[Stage[B], pypeln_utils.Partial[Stage[B]]]:
    """
    Creates a stage that filter the data given a predicate function `f`. It is intended to behave like python's built-in `filter` function but with the added concurrency.

    ```python
    import pypeln as pl
    import time
    from random import random

    def slow_gt3(x):
        time.sleep(random()) # <= some slow computation
        return x > 3

    data = range(10) # [0, 1, 2, ..., 9]
    stage = pl.thread.filter(slow_gt3, data, workers=3, maxsize=4)

    data = list(stage) # e.g. [5, 6, 3, 4, 7, 8, 9]
    ```

    !!! note
        Because of concurrency order is not guaranteed.

    Arguments:
        f: A function with signature `f(x) -> bool`. `f` can accept additional arguments by name as described in [Advanced Usage](https://cgarciae.github.io/pypeln/advanced/#dependency-injection).
        stage: A Stage or Iterable.
        workers: The number of workers the stage should contain.
        maxsize: The maximum number of objects the stage can hold simultaneously, if set to `0` (default) then the stage can grow unbounded.
        timeout: Seconds before stoping the worker if its current task is not yet completed. Defaults to `0` which means its unbounded. 
        on_start: A function with signature `on_start(worker_info?) -> kwargs?`, where `kwargs` can be a `dict` of keyword arguments that can be consumed by `f` and `on_done`. `on_start` can accept additional arguments by name as described in [Advanced Usage](https://cgarciae.github.io/pypeln/advanced/#dependency-injection).
        on_done: A function with signature `on_done(stage_status?)`. This function is executed once per worker when the worker finishes. `on_done` can accept additional arguments by name as described in [Advanced Usage](https://cgarciae.github.io/pypeln/advanced/#dependency-injection).

    Returns:
        Returns a `Stage` if the `stage` parameters is given, else it returns a `Partial`.
    """

    if isinstance(stage, pypeln_utils.Undefined):
        return pypeln_utils.Partial(
            lambda stage: filter(
                f,
                stage=stage,
                workers=workers,
                maxsize=maxsize,
                timeout=timeout,
                on_start=on_start,
                on_done=on_done,
            )
        )

    stage = to_stage(stage)

    return Stage(
        process_fn=Filter(f),
        workers=workers,
        maxsize=maxsize,
        timeout=timeout,
        total_sources=stage.workers,
        dependencies=[stage],
        on_start=on_start,
        on_done=on_done,
        f_args=pypeln_utils.function_args(f),
    )