pl.task.map

Creates a stage that maps a function f over the data. Its intended to behave like python's built-in map function but with the added concurrency.

import pypeln as pl
import time
from random import random

def slow_add1(x):
    time.sleep(random()) # <= some slow computation
    return x + 1

data = range(10) # [0, 1, 2, ..., 9]
stage = pl.process.map(slow_add1, data, workers=3, maxsize=4)

data = list(stage) # e.g. [2, 1, 5, 6, 3, 4, 7, 8, 9, 10]

Note

Because of concurrency order is not guaranteed.

Parameters:

Name Type Description Default
f MapFn

A function with the signature f(x) -> y. f can accept special additional arguments by name as described in Advanced Usage.

required
stage Union[pypeln.task.stage.Stage[~A], Iterable[~A], AsyncIterable[~A], pypeln.utils.Undefined]

A Stage, Iterable or AsyncIterable.

<pypeln.utils.Undefined object at 0x7f54c588db20>
workers int

The number of workers the stage should contain.

1
maxsize int

The maximum number of objects the stage can hold simultaneously, if set to 0 (default) then the stage can grow unbounded.

0
timeout float

Seconds before stoping the worker if its current task is not yet completed. Defaults to 0 which means its unbounded.

0
on_start Callable

A function with signature on_start(worker_info?) -> kwargs?, where kwargs can be a dict of keyword arguments that can be consumed by f and on_done. on_start can accept additional arguments by name as described in Advanced Usage.

None
on_done Callable

A function with signature on_done(stage_status?). This function is executed once per worker when the worker finishes. on_done can accept additional arguments by name as described in Advanced Usage.

None

Returns:

Type Description
Union[pypeln.task.stage.Stage[~B], pypeln.utils.Partial[pypeln.task.stage.Stage[~B]]]

Returns a Stage if the stage parameters is given, else it returns a Partial.

Source code in pypeln/task/api/map.py
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
def map(
    f: MapFn,
    stage: tp.Union[
        Stage[A], tp.Iterable[A], tp.AsyncIterable[A], pypeln_utils.Undefined
    ] = pypeln_utils.UNDEFINED,
    workers: int = 1,
    maxsize: int = 0,
    timeout: float = 0,
    on_start: tp.Callable = None,
    on_done: tp.Callable = None,
) -> tp.Union[Stage[B], pypeln_utils.Partial[Stage[B]]]:
    """
    Creates a stage that maps a function `f` over the data. Its intended to behave like python's built-in `map` function but with the added concurrency.

    ```python
    import pypeln as pl
    import time
    from random import random

    def slow_add1(x):
        time.sleep(random()) # <= some slow computation
        return x + 1

    data = range(10) # [0, 1, 2, ..., 9]
    stage = pl.process.map(slow_add1, data, workers=3, maxsize=4)

    data = list(stage) # e.g. [2, 1, 5, 6, 3, 4, 7, 8, 9, 10]
    ```

    !!! note
        Because of concurrency order is not guaranteed. 

    Arguments:
        f: A function with the signature `f(x) -> y`. `f` can accept special additional arguments by name as described in [Advanced Usage](https://cgarciae.github.io/pypeln/advanced/#dependency-injection).
        stage: A Stage, Iterable or AsyncIterable.
        workers: The number of workers the stage should contain.
        maxsize: The maximum number of objects the stage can hold simultaneously, if set to `0` (default) then the stage can grow unbounded.
        timeout: Seconds before stoping the worker if its current task is not yet completed. Defaults to `0` which means its unbounded. 
        on_start: A function with signature `on_start(worker_info?) -> kwargs?`, where `kwargs` can be a `dict` of keyword arguments that can be consumed by `f` and `on_done`. `on_start` can accept additional arguments by name as described in [Advanced Usage](https://cgarciae.github.io/pypeln/advanced/#dependency-injection).
        on_done: A function with signature `on_done(stage_status?)`. This function is executed once per worker when the worker finishes. `on_done` can accept additional arguments by name as described in [Advanced Usage](https://cgarciae.github.io/pypeln/advanced/#dependency-injection).

    Returns:
        Returns a `Stage` if the `stage` parameters is given, else it returns a `Partial`.
    """

    if isinstance(stage, pypeln_utils.Undefined):
        return pypeln_utils.Partial(
            lambda stage: map(
                f,
                stage=stage,
                workers=workers,
                maxsize=maxsize,
                timeout=timeout,
                on_start=on_start,
                on_done=on_done,
            )
        )

    stage = to_stage(stage)

    return Stage(
        process_fn=Map(f),
        workers=workers,
        maxsize=maxsize,
        timeout=timeout,
        total_sources=1,
        dependencies=[stage],
        on_start=on_start,
        on_done=on_done,
        f_args=pypeln_utils.function_args(f),
    )