asyncio_taskpool.pool module

Definitions of the task pool classes.

The BaseTaskPool is a parent class and not intended for direct use. The TaskPool and SimpleTaskPool are subclasses intended for direct use. While the former allows for heterogeneous collections of tasks that can be entirely unrelated to one another, the latter requires a preemptive decision about the function and its arguments upon initialization and only allows to dynamically control the number of tasks running at any given point in time.

For further details about the classes check their respective documentation.

class asyncio_taskpool.pool.BaseTaskPool(pool_size: float = inf, name: str | None = None)

Bases: object

The base class for task pools. Not intended to be used directly.

cancel(*task_ids: int, msg: str | None = None) None

Cancels the tasks with the specified IDs.

Each task ID must belong to a task still running within the pool.

Note that once a pool has been flushed (see below), IDs of tasks that have ended previously will be forgotten.

Parameters:
  • *task_ids – Arbitrary number of integers. Each must be an ID of a task still running within the pool.

  • msg (optional) – Passed to the Task.cancel() method of every task specified by the task_ids.

Raises:
  • AlreadyCancelled – One of the task_ids belongs to a task that has been cancelled.

  • AlreadyEnded – One of the task_ids belongs to a task that has ended.

  • InvalidTaskID – One of the task_ids is not known to the pool.

cancel_all(msg: str | None = None) None

Cancels all tasks still running within the pool (including meta tasks).

If any methods such launched meta tasks belonging to that group, these meta tasks are cancelled before the actual tasks are cancelled. This means that any tasks “queued” to be started by a meta task will never even start.

Parameters:

msg (optional) – Passed to the Task.cancel() method of every task.

cancel_group(group_name: str, msg: str | None = None) None

Cancels an entire group of tasks.

The task group is subsequently forgotten by the pool.

If any methods such launched meta tasks belonging to that group, these meta tasks are cancelled before the actual tasks are cancelled. This means that any tasks “queued” to be started by a meta task will never even start.

Parameters:
  • group_name – The name of the group of tasks (and meta tasks) that shall be cancelled.

  • msg (optional) – Passed to the Task.cancel() method of every task in the group.

Raises:

TaskGroupNotFound – No task group named group_name exists in the pool.

async flush(return_exceptions: bool = False) None

Gathers (i.e. awaits) all ended/cancelled tasks in the pool.

The tasks are subsequently forgotten by the pool. This method exists mainly to free up memory of unneeded Task objects. It also gets rid of unneeded (ended/cancelled) meta tasks.

It blocks, only if any of the tasks block while catching a asyncio.CancelledError or any of the callbacks registered for the tasks block.

Parameters:

return_exceptions (optional) – Passed directly into gather.

async gather_and_close(return_exceptions: bool = False) None

Gathers (i.e. awaits) all tasks in the pool, then closes it.

Once this method is called, no more tasks can be started in the pool.

Note that this method may block indefinitely as long as any task in the pool is not done. This includes meta tasks launched by other methods, which may or may not even end by themselves. To avoid this, make sure to call cancel_all() first.

This method may also block, if one of the tasks blocks while catching a asyncio.CancelledError or if any of the callbacks registered for a task blocks for whatever reason.

Parameters:

return_exceptions (optional) – Passed directly into gather.

Raises:

PoolStillUnlocked – The pool has not been locked yet.

get_group_ids(*group_names: str) Set[int]

Returns the set of IDs of all tasks in the specified groups.

Parameters:

*group_names – Must be names of task groups that exists within the pool.

Returns:

Set of integers representing the task IDs belonging to the specified groups.

Raises:

TaskGroupNotFound – One of the specified group_names does not exist in the pool.

property is_full: bool

False if the number of running tasks is less than the pool size.

When the pool is full, any call to start a new task within it will block, until there is enough room for it.

property is_locked: bool

True if the pool has been locked (see below).

lock() None

Disallows any more tasks to be started in the pool.

property num_cancelled: int

Number of tasks in the pool that have been cancelled.

At the moment a task’s cancel_callback is fired, it is considered to be cancelled and no longer running, until its end_callback is fired, at which point it is considered ended (instead of cancelled).

property num_ended: int

Number of tasks in the pool that have stopped running.

At the moment a task’s end_callback is fired, it is considered ended and no longer running (or cancelled). When a task is cancelled, it is not immediately considered ended; only after its cancel_callback has returned, does it then actually end.

property num_running: int

Number of tasks in the pool that are still running.

At the moment a task’s end_callback or cancel_callback is fired, it is no longer considered running.

property pool_size: int

Maximum number of concurrently running tasks allowed in the pool.

unlock() None

Allows new tasks to be started in the pool.

async until_closed() bool

Waits until the pool has been closed.

This method itself does not close the pool, but blocks until then.

Returns:

True once the pool is closed.

class asyncio_taskpool.pool.SimpleTaskPool(func: Callable[_P, Coroutine[_R, Any, Any]], args: _P.args = (), kwargs: _P.kwargs = None, end_callback: EndCB | None = None, cancel_callback: CancelCB | None = None, pool_size: float = inf, name: str | None = None)

Bases: BaseTaskPool

Simplified task pool class.

A SimpleTaskPool instance can manage an arbitrary number of concurrent tasks, but they must come from a single coroutine function, called with the same arguments.

The coroutine function and its arguments are defined upon initialization.

As long as there is room in the pool, more tasks can be added. (By default, there is no pool size limit.) Each task started in the pool receives a unique ID, which can be used to cancel specific tasks at any moment. However, since all tasks come from the same function-arguments-combination, the specificity of the cancel() method is probably unnecessary. Instead, a simpler stop() method is introduced.

Adding tasks blocks only if the pool is full at that moment.

property func_name: str

Name of the coroutine function used in the pool.

start(num: int) str

Starts specified number of new tasks in the pool as a new group.

Because this method delegates the spawning of the tasks to a meta task, it never blocks. However, just because this method returns immediately, this does not mean that any task was started or that any number of tasks will start soon, as this is solely determined by the BaseTaskPool.pool_size and num.

If the entire task group is cancelled before num tasks have spawned, since the meta task is cancelled first, the number of tasks spawned will end up being less than num.

Parameters:

num – The number of new tasks to start.

Returns:

The name of the newly created task group in the form 'start-group-{idx}' (with idx being an incrementing index).

stop(num: int) List[int]

Cancels specified number of tasks in the pool and returns their IDs.

The tasks are canceled in LIFO order, meaning tasks started later will be stopped before those started earlier.

Parameters:

num – The number of tasks to cancel; if num >= BaseTaskPool.num_running, all tasks are cancelled.

Returns:

List of IDs of the tasks that have been cancelled (in the order they were cancelled).

stop_all() List[int]

Cancels all running tasks and returns their IDs.

class asyncio_taskpool.pool.TaskPool(pool_size: float = inf, name: str | None = None)

Bases: BaseTaskPool

General purpose task pool class.

Attempts to emulate part of the interface of multiprocessing.pool.Pool from the stdlib.

A TaskPool instance can manage an arbitrary number of concurrent tasks from any coroutine function. Tasks in the pool can all belong to the same coroutine function, but they can also come from any number of different and unrelated coroutine functions.

As long as there is room in the pool, more tasks can be added. (By default, there is no pool size limit.) Each task started in the pool receives a unique ID, which can be used to cancel specific tasks at any moment.

Adding tasks blocks only if the pool is full at that moment.

apply(func: Callable[_P, Coroutine[_R, Any, Any]], args: _P.args = (), kwargs: _P.kwargs = None, num: int = 1, group_name: str | None = None, end_callback: EndCB | None = None, cancel_callback: CancelCB | None = None) str

Creates tasks with the supplied arguments to be run in the pool.

Each coroutine looks like func(*args, **kwargs), meaning the args and kwargs are unpacked and passed into func before creating each task, and this is done num times.

All the new tasks are added to the same task group.

Because this method delegates the spawning of the tasks to a meta task, it never blocks. However, just because this method returns immediately, this does not mean that any task was started or that any number of tasks will start soon, as this is solely determined by the BaseTaskPool.pool_size and num.

If the entire task group is cancelled before num tasks have spawned, since the meta task is cancelled first, the number of tasks spawned will end up being less than num.

Parameters:
  • func – The coroutine function to use for spawning the new tasks within the task pool.

  • args (optional) – The positional arguments to pass into each function call.

  • kwargs (optional) – The keyword-arguments to pass into each function call.

  • num (optional) – The number of tasks to spawn with the specified parameters. Defaults to 1.

  • group_name (optional) – Name of the task group to add the new tasks to. By default, a unique name is constructed in the form 'apply-{name}-group-{idx}' (with name being the name of the func and idx being an incrementing index).

  • end_callback (optional) – A callback to execute after a task has ended. It is run with the task’s ID as its only positional argument.

  • cancel_callback (optional) – A callback to execute after cancellation of a task. It is run with the task’s ID as its only positional argument.

Returns:

Name of the newly created group (see the group_name parameter).

Raises:
doublestarmap(func: Callable[..., AnyCoroutine], kwargs_iter: Iterable[KwArgsT], num_concurrent: int = 1, group_name: str | None = None, end_callback: EndCB | None = None, cancel_callback: CancelCB | None = None) str

Creates coroutines and runs them as new tasks in the pool.

Like map() except that the elements of kwargs_iter are expected to be iterables themselves to be unpacked as keyword-arguments to the function. Each coroutine then looks like func(**kwargs), kwargs being an element from kwargs_iter.

Returns:

The name of the newly created group in the form 'doublestarmap-{name}-group-{index}' (with name being the name of the func and idx being an incrementing index).

map(func: Callable[[_T], AnyCoroutine], arg_iter: Iterable[_T], num_concurrent: int = 1, group_name: str | None = None, end_callback: EndCB | None = None, cancel_callback: CancelCB | None = None) str

Creates coroutines and runs them as new tasks in the pool.

Each coroutine looks like func(arg), arg being an element taken from arg_iter. The method is a task-based equivalent of the multiprocessing.pool.Pool.map method.

All the new tasks are added to the same task group.

num_concurrent determines the (maximum) number of tasks spawned this way that shall be running concurrently at any given moment in time. As soon as one task from this method call ends, it triggers the start of a new task (assuming there is room in the pool), which consumes the next element from the arguments iterable. If the size of the pool never imposes a limit, this ensures that the number of tasks spawned and running concurrently is always equal to num_concurrent (except for when arg_iter is exhausted of course).

Because this method delegates the spawning of the tasks to a meta task, it never blocks. However, just because this method returns immediately, this does not mean that any task was started or that any number of tasks will start soon, as this is solely determined by the BaseTaskPool.pool_size and num_concurrent.

If the entire task group is cancelled, the meta task is cancelled first, which means that arg_iter may be abandoned before being fully consumed (if that is even possible).

Parameters:
  • func – The coroutine function to use for spawning the new tasks within the task pool.

  • arg_iter – The iterable of arguments; each argument is to be passed into a func call when spawning a new task.

  • num_concurrent (optional) – The number new tasks spawned by this method to run concurrently. Defaults to 1.

  • group_name (optional) – Name of the task group to add the new tasks to. If provided, it must be a name that doesn’t exist yet. By default, a unique name is constructed in the form 'map-{name}-group-{idx}' (with name being the name of the func and idx being an incrementing index).

  • end_callback (optional) – A callback to execute after a task has ended. It is run with the task’s ID as its only positional argument.

  • cancel_callback (optional) – A callback to execute after cancellation of a task. It is run with the task’s ID as its only positional argument.

Returns:

The name of the newly created group (see group_name parameter).

Raises:
starmap(func: Callable[[Unpack[_Ts]], AnyCoroutine], args_iter: Iterable[Tuple[Unpack[_Ts]]], num_concurrent: int = 1, group_name: str | None = None, end_callback: EndCB | None = None, cancel_callback: CancelCB | None = None) str

Creates coroutines and runs them as new tasks in the pool.

Like map() except that the elements of args_iter are expected to be iterables themselves to be unpacked as positional arguments to the function. Each coroutine then looks like func(*args), args being an element from args_iter.

Returns:

The name of the newly created group in the form 'starmap-{name}-group-{index}' (with name being the name of the func and idx being an incrementing index).