pl.process.filter
Creates a stage that filter the data given a predicate function f
. It is intended to behave like python's built-in filter
function but with the added concurrency.
import pypeln as pl
import time
from random import random
def slow_gt3(x):
time.sleep(random()) # <= some slow computation
return x > 3
data = range(10) # [0, 1, 2, ..., 9]
stage = pl.process.filter(slow_gt3, data, workers=3, maxsize=4)
data = list(stage) # e.g. [5, 6, 3, 4, 7, 8, 9]
Note
Because of concurrency order is not guaranteed.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
f |
FilterFn |
A function with signature |
required |
stage |
Union[pypeln.process.stage.Stage[~A], Iterable[~A], pypeln.utils.Undefined] |
A Stage or Iterable. |
<pypeln.utils.Undefined object at 0x7f27e00aaaf0> |
workers |
int |
The number of workers the stage should contain. |
1 |
maxsize |
int |
The maximum number of objects the stage can hold simultaneously, if set to |
0 |
timeout |
float |
Seconds before stoping the worker if its current task is not yet completed. Defaults to |
0 |
on_start |
Callable |
A function with signature |
None |
on_done |
Callable |
A function with signature |
None |
Returns:
Type | Description |
---|---|
Union[pypeln.process.stage.Stage[~B], pypeln.utils.Partial[pypeln.process.stage.Stage[~B]]] |
Returns a |
Source code in pypeln/process/api/filter.py
def filter(
f: FilterFn,
stage: tp.Union[
Stage[A], tp.Iterable[A], tp.Iterable[A], pypeln_utils.Undefined
] = pypeln_utils.UNDEFINED,
workers: int = 1,
maxsize: int = 0,
timeout: float = 0,
on_start: tp.Callable = None,
on_done: tp.Callable = None,
) -> tp.Union[Stage[B], pypeln_utils.Partial[Stage[B]]]:
"""
Creates a stage that filter the data given a predicate function `f`. It is intended to behave like python's built-in `filter` function but with the added concurrency.
```python
import pypeln as pl
import time
from random import random
def slow_gt3(x):
time.sleep(random()) # <= some slow computation
return x > 3
data = range(10) # [0, 1, 2, ..., 9]
stage = pl.process.filter(slow_gt3, data, workers=3, maxsize=4)
data = list(stage) # e.g. [5, 6, 3, 4, 7, 8, 9]
```
!!! note
Because of concurrency order is not guaranteed.
Arguments:
f: A function with signature `f(x) -> bool`. `f` can accept additional arguments by name as described in [Advanced Usage](https://cgarciae.github.io/pypeln/advanced/#dependency-injection).
stage: A Stage or Iterable.
workers: The number of workers the stage should contain.
maxsize: The maximum number of objects the stage can hold simultaneously, if set to `0` (default) then the stage can grow unbounded.
timeout: Seconds before stoping the worker if its current task is not yet completed. Defaults to `0` which means its unbounded.
on_start: A function with signature `on_start(worker_info?) -> kwargs?`, where `kwargs` can be a `dict` of keyword arguments that can be consumed by `f` and `on_done`. `on_start` can accept additional arguments by name as described in [Advanced Usage](https://cgarciae.github.io/pypeln/advanced/#dependency-injection).
on_done: A function with signature `on_done(stage_status?)`. This function is executed once per worker when the worker finishes. `on_done` can accept additional arguments by name as described in [Advanced Usage](https://cgarciae.github.io/pypeln/advanced/#dependency-injection).
Returns:
Returns a `Stage` if the `stage` parameters is given, else it returns a `Partial`.
"""
if isinstance(stage, pypeln_utils.Undefined):
return pypeln_utils.Partial(
lambda stage: filter(
f,
stage=stage,
workers=workers,
maxsize=maxsize,
timeout=timeout,
on_start=on_start,
on_done=on_done,
)
)
stage = to_stage(stage, maxsize=maxsize)
return Stage(
process_fn=Filter(f),
workers=workers,
maxsize=maxsize,
timeout=timeout,
total_sources=stage.workers,
dependencies=[stage],
on_start=on_start,
on_done=on_done,
use_threads=False,
f_args=pypeln_utils.function_args(f),
)