dask 2021.10.0

To remove in the future –– dask.local

Asynchronous Shared-Memory Scheduler for Dask Graphs.

This scheduler coordinates several workers to execute tasks in a dask graph in parallel. It depends on a concurrent.futures.Executor and a corresponding Queue for worker-to-scheduler communication.

It tries to execute tasks in an order which maintains a small memory footprint throughout execution. It does this by running tasks that allow us to release data resources.

Task Selection Policy

When we complete a task we add more data in to our set of available data; this new data makes new tasks available. We preferentially choose tasks that were just made available in a last-in-first-out fashion. We implement this as a simple stack. This results in more depth-first rather than breadth first behavior which encourages us to process batches of data to completion before starting in on new data when possible.

When the addition of new data readies multiple tasks simultaneously we add tasks to the stack in sorted order so that tasks with greater keynames are run first. This can be handy to break ties in a predictable fashion.

State

Many functions pass around a state variable that holds the current state of the computation. This variable consists of several other dictionaries and sets, explained below.

Constant state

  1. dependencies: {x: [a, b ,c]} a,b,c, must be run before x

  2. dependents: {a: [x, y]} a must run before x or y

Changing state

### Data

  1. cache: available concrete data. {key: actual-data}

  2. released: data that we've seen, used, and released because it is no longer needed

### Jobs

  1. ready: A fifo stack of ready-to-run tasks

  2. running: A set of tasks currently in execution

  3. finished: A set of finished tasks

  4. waiting: which tasks are still waiting on others :: {key: {keys}} Real-time equivalent of dependencies

  5. waiting_data: available data to yet-to-be-run-tasks :: {key: {keys}} Real-time equivalent of dependents

Examples

>>> import pprint  # doctest: +SKIP
>>> dsk = {'x': 1, 'y': 2, 'z': (inc, 'x'), 'w': (add, 'z', 'y')}  # doctest: +SKIP
>>> pprint.pprint(start_state_from_dask(dsk))  # doctest: +SKIP
{'cache': {'x': 1, 'y': 2},
 'dependencies': {'w': {'z', 'y'}, 'x': set(), 'y': set(), 'z': {'x'}},
 'dependents': defaultdict(None, {'w': set(), 'x': {'z'}, 'y': {'w'}, 'z': {'w'}}),
 'finished': set(),
 'ready': ['z'],
 'released': set(),
 'running': set(),
 'waiting': {'w': {'z'}},
 'waiting_data': {'x': {'z'}, 'y': {'w'}, 'z': {'w'}}}

Optimizations

We build this scheduler with out-of-core array operations in mind. To this end we have encoded some particular optimizations.

Compute to release data

When we choose a new task to execute we often have many options. Policies at this stage are cheap and can significantly impact performance. One could imagine policies that expose parallelism, drive towards a particular output, etc..

Our current policy is to run tasks that were most recently made available.

Inlining computations

We hold on to intermediate computations either in memory or on disk.

For very cheap computations that may emit new copies of the data, like np.transpose or possibly even x + 1 we choose not to store these as separate pieces of data / tasks. Instead we combine them with the computations that require them. This may result in repeated computation but saves significantly on space and computation complexity.

See the function inline_functions for more information.

Examples

See :

Local connectivity graph

Hover to see nodes names; edges to Self not shown, Caped at 50 nodes.

Using a canvas is more power efficient and can get hundred of nodes ; but does not allow hyperlinks; , arrows or text (beyond on hover)

SVG is more flexible but power hungry; and does not scale well to 50 + nodes.

All aboves nodes referred to, (or are referred from) current nodes; Edges from Self to other have been omitted (or all nodes would be connected to the central node "self" which is not useful). Nodes are colored by the library they belong to, and scaled with the number of references pointing them


File: /dask/local.py#0
type: <class 'module'>
Commit: