pl.task.to_iterable
Creates an iterable from a stage. Use this function to when you want to have more control over how the output stage is consumed, especifically, setting the maxsize
argument can help you avoid OOM error if the consumer is slow.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
stage |
Union[pypeln.task.stage.Stage[~A], Iterable[~A], AsyncIterable[~A], pypeln.utils.Undefined] |
A Stage, Iterable, or AsyncIterable. |
<pypeln.utils.Undefined object at 0x7f27e00aaaf0> |
maxsize |
int |
The maximum number of objects the stage can hold simultaneously, if set to |
0 |
return_index |
bool |
When set to |
False |
Returns:
Type | Description |
---|---|
Union[Iterable[~A], pypeln.utils.Partial[Iterable[~A]]] |
If the |
Source code in pypeln/task/api/to_iterable.py
def to_iterable(
stage: tp.Union[
Stage[A], tp.Iterable[A], tp.AsyncIterable[A], pypeln_utils.Undefined
] = pypeln_utils.UNDEFINED,
maxsize: int = 0,
return_index: bool = False,
) -> tp.Union[tp.Iterable[A], pypeln_utils.Partial[tp.Iterable[A]]]:
"""
Creates an iterable from a stage. Use this function to when you want to have more control over how the output stage is consumed, especifically, setting the `maxsize` argument can help you avoid OOM error if the consumer is slow.
Arguments:
stage: A Stage, Iterable, or AsyncIterable.
maxsize: The maximum number of objects the stage can hold simultaneously, if set to `0` (default) then the stage can grow unbounded.
return_index: When set to `True` the resulting iterable will yield the `Elemen(index: Tuple[int, ...], value: Any)` which contains both the resulting value and the index parameter which holds information about the order of creation of the elements at the source.
Returns:
If the `stage` parameters is given then this function returns an iterable, else it returns a `Partial`.
"""
if isinstance(stage, pypeln_utils.Undefined):
return pypeln_utils.Partial(lambda stage: to_iterable(stage, maxsize=maxsize))
if isinstance(stage, Stage):
iterable = stage.to_iterable(maxsize=maxsize, return_index=return_index)
elif isinstance(stage, tp.Iterable[A]):
return stage
else:
iterable = from_iterable(stage).to_iterable(
maxsize=maxsize, return_index=return_index
)
return iterable