pl.task.each
Creates a stage that runs the function f
for each element in the data but the stage itself yields no elements. Its useful for sink stages that perform certain actions such as writting to disk, saving to a database, etc, and dont produce any results. For example:
import pypeln as pl
def process_image(image_path):
image = load_image(image_path)
image = transform_image(image)
save_image(image_path, image)
files_paths = get_file_paths()
stage = pl.process.each(process_image, file_paths, workers=4)
pl.process.run(stage)
or alternatively
files_paths = get_file_paths()
pl.process.each(process_image, file_paths, workers=4, run=True)
Note
Because of concurrency order is not guaranteed.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
f |
EachFn |
A function with signature |
required |
workers |
int |
The number of workers the stage should contain. |
1 |
maxsize |
int |
The maximum number of objects the stage can hold simultaneously, if set to |
0 |
timeout |
float |
Seconds before stoping the worker if its current task is not yet completed. Defaults to |
0 |
on_start |
Callable |
A function with signature |
None |
on_done |
Callable |
A function with signature |
None |
run |
bool |
Whether or not to execute the stage immediately. If each is running inside another coroutine / task then avoid using |
False |
Returns:
Type | Description |
---|---|
Union[pypeln.task.stage.Stage[~B], NoneType, pypeln.utils.Partial[Union[pypeln.task.stage.Stage[~B]]]] |
If the |
Source code in pypeln/task/api/each.py
def each(
f: EachFn,
stage: tp.Union[
Stage[A], tp.Iterable[A], tp.AsyncIterable[A], pypeln_utils.Undefined
] = pypeln_utils.UNDEFINED,
workers: int = 1,
maxsize: int = 0,
timeout: float = 0,
on_start: tp.Callable = None,
on_done: tp.Callable = None,
run: bool = False,
) -> tp.Union[tp.Optional[Stage[B]], pypeln_utils.Partial[tp.Optional[Stage[B]]]]:
"""
Creates a stage that runs the function `f` for each element in the data but the stage itself yields no elements. Its useful for sink stages that perform certain actions such as writting to disk, saving to a database, etc, and dont produce any results. For example:
```python
import pypeln as pl
def process_image(image_path):
image = load_image(image_path)
image = transform_image(image)
save_image(image_path, image)
files_paths = get_file_paths()
stage = pl.process.each(process_image, file_paths, workers=4)
pl.process.run(stage)
```
or alternatively
```python
files_paths = get_file_paths()
pl.process.each(process_image, file_paths, workers=4, run=True)
```
!!! note
Because of concurrency order is not guaranteed.
Arguments:
f: A function with signature `f(x) -> None`. `f` can accept additional arguments by name as described in [Advanced Usage](https://cgarciae.github.io/pypeln/advanced/#dependency-injection).
workers: The number of workers the stage should contain.
maxsize: The maximum number of objects the stage can hold simultaneously, if set to `0` (default) then the stage can grow unbounded.
timeout: Seconds before stoping the worker if its current task is not yet completed. Defaults to `0` which means its unbounded.
on_start: A function with signature `on_start(worker_info?) -> kwargs?`, where `kwargs` can be a `dict` of keyword arguments that can be consumed by `f` and `on_done`. `on_start` can accept additional arguments by name as described in [Advanced Usage](https://cgarciae.github.io/pypeln/advanced/#dependency-injection).
on_done: A function with signature `on_done(stage_status?)`. This function is executed once per worker when the worker finishes. `on_done` can accept additional arguments by name as described in [Advanced Usage](https://cgarciae.github.io/pypeln/advanced/#dependency-injection).
run: Whether or not to execute the stage immediately. If each is running inside another coroutine / task then avoid using `run=True` since it will block the event loop, use `await pl.task.each(...)` instead.
Returns:
If the `stage` parameters is not given then this function returns a `Partial`, else if `run=False` (default) it return a new stage, if `run=True` then it runs the stage and returns `None`.
"""
if isinstance(stage, pypeln_utils.Undefined):
return pypeln_utils.Partial(
lambda stage: each(
f,
stage=stage,
workers=workers,
maxsize=maxsize,
timeout=timeout,
on_start=on_start,
on_done=on_done,
)
)
stage = to_stage(stage, maxsize=maxsize)
stage = Stage(
process_fn=Each(f),
workers=workers,
maxsize=maxsize,
timeout=timeout,
total_sources=1,
dependencies=[stage],
on_start=on_start,
on_done=on_done,
f_args=pypeln_utils.function_args(f),
)
if not run:
return stage
for _ in stage:
pass