To remove in the future –– dask.local
Asynchronous Shared-Memory Scheduler for Dask Graphs.
This scheduler coordinates several workers to execute tasks in a dask graph in parallel. It depends on a concurrent.futures.Executor
and a corresponding Queue for worker-to-scheduler communication.
It tries to execute tasks in an order which maintains a small memory footprint throughout execution. It does this by running tasks that allow us to release data resources.
When we complete a task we add more data in to our set of available data; this new data makes new tasks available. We preferentially choose tasks that were just made available in a last-in-first-out fashion. We implement this as a simple stack. This results in more depth-first rather than breadth first behavior which encourages us to process batches of data to completion before starting in on new data when possible.
When the addition of new data readies multiple tasks simultaneously we add tasks to the stack in sorted order so that tasks with greater keynames are run first. This can be handy to break ties in a predictable fashion.
Many functions pass around a state
variable that holds the current state of the computation. This variable consists of several other dictionaries and sets, explained below.
dependencies: {x: [a, b ,c]} a,b,c, must be run before x
dependents: {a: [x, y]} a must run before x or y
### Data
cache: available concrete data. {key: actual-data}
released: data that we've seen, used, and released because it is no longer needed
### Jobs
ready: A fifo stack of ready-to-run tasks
running: A set of tasks currently in execution
finished: A set of finished tasks
waiting: which tasks are still waiting on others :: {key: {keys}} Real-time equivalent of dependencies
waiting_data: available data to yet-to-be-run-tasks :: {key: {keys}} Real-time equivalent of dependents
>>> import pprint # doctest: +SKIP >>> dsk = {'x': 1, 'y': 2, 'z': (inc, 'x'), 'w': (add, 'z', 'y')} # doctest: +SKIP >>> pprint.pprint(start_state_from_dask(dsk)) # doctest: +SKIP {'cache': {'x': 1, 'y': 2}, 'dependencies': {'w': {'z', 'y'}, 'x': set(), 'y': set(), 'z': {'x'}}, 'dependents': defaultdict(None, {'w': set(), 'x': {'z'}, 'y': {'w'}, 'z': {'w'}}), 'finished': set(), 'ready': ['z'], 'released': set(), 'running': set(), 'waiting': {'w': {'z'}}, 'waiting_data': {'x': {'z'}, 'y': {'w'}, 'z': {'w'}}}
We build this scheduler with out-of-core array operations in mind. To this end we have encoded some particular optimizations.
When we choose a new task to execute we often have many options. Policies at this stage are cheap and can significantly impact performance. One could imagine policies that expose parallelism, drive towards a particular output, etc..
Our current policy is to run tasks that were most recently made available.
We hold on to intermediate computations either in memory or on disk.
For very cheap computations that may emit new copies of the data, like np.transpose
or possibly even x + 1
we choose not to store these as separate pieces of data / tasks. Instead we combine them with the computations that require them. This may result in repeated computation but saves significantly on space and computation complexity.
See the function inline_functions
for more information.
Hover to see nodes names; edges to Self not shown, Caped at 50 nodes.
Using a canvas is more power efficient and can get hundred of nodes ; but does not allow hyperlinks; , arrows or text (beyond on hover)
SVG is more flexible but power hungry; and does not scale well to 50 + nodes.
All aboves nodes referred to, (or are referred from) current nodes; Edges from Self to other have been omitted (or all nodes would be connected to the central node "self" which is not useful). Nodes are colored by the library they belong to, and scaled with the number of references pointing them