pl.task.ordered
Creates a stage that sorts its elements based on their order of creation on the source iterable(s) of the pipeline.
import pypeln as pl
import random
import time
def slow_squared(x):
time.sleep(random.random())
return x ** 2
stage = range(5)
stage = pl.process.map(slow_squared, stage, workers = 2)
stage = pl.process.ordered(stage)
print(list(stage)) # [0, 1, 4, 9, 16]
Note
ordered
will work even if the previous stages are from different pypeln
modules, but it may not work if you introduce an itermediate external iterable stage.
Warning
This stage will not yield util it accumulates all of the elements from the previous stage, use this only if all elements fit in memory.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
stage |
Union[pypeln.task.stage.Stage[~A], Iterable[~A], AsyncIterable[~A], pypeln.utils.Undefined] |
A Stage, Iterable, or AsyncIterable. |
<pypeln.utils.Undefined object at 0x7f27e00aaaf0> |
maxsize |
int |
The maximum number of objects the stage can hold simultaneously, if set to |
0 |
Returns:
Type | Description |
---|---|
Union[pypeln.task.stage.Stage[~A], pypeln.utils.Partial[pypeln.task.stage.Stage[~A]]] |
If the |
Source code in pypeln/task/api/ordered.py
def ordered(
stage: tp.Union[
Stage[A], tp.Iterable[A], tp.AsyncIterable[A], pypeln_utils.Undefined
] = pypeln_utils.UNDEFINED,
maxsize: int = 0,
) -> tp.Union[Stage[A], pypeln_utils.Partial[Stage[A]]]:
"""
Creates a stage that sorts its elements based on their order of creation on the source iterable(s) of the pipeline.
```python
import pypeln as pl
import random
import time
def slow_squared(x):
time.sleep(random.random())
return x ** 2
stage = range(5)
stage = pl.process.map(slow_squared, stage, workers = 2)
stage = pl.process.ordered(stage)
print(list(stage)) # [0, 1, 4, 9, 16]
```
!!! note
`ordered` will work even if the previous stages are from different `pypeln` modules, but it may not work if you introduce an itermediate external iterable stage.
!!! warning
This stage will not yield util it accumulates all of the elements from the previous stage, use this only if all elements fit in memory.
Arguments:
stage: A Stage, Iterable, or AsyncIterable.
maxsize: The maximum number of objects the stage can hold simultaneously, if set to `0` (default) then the stage can grow unbounded.
Returns:
If the `stage` parameters is given then this function returns an iterable, else it returns a `Partial`.
"""
if isinstance(stage, pypeln_utils.Undefined):
return pypeln_utils.Partial(lambda stage: ordered(stage))
stage = to_stage(stage, maxsize=maxsize)
return Stage(
process_fn=Ordered(),
workers=1,
maxsize=0,
timeout=0,
total_sources=1,
dependencies=[stage],
on_start=None,
on_done=None,
f_args=[],
)