The scheduler tracks the current state of workers, data, and computations. The scheduler listens for events and responds by controlling workers appropriately. It continuously tries to use the workers to execute an ever growing dask graph.
All events are handled quickly, in linear time with respect to their input (which is often of constant size) and generally within a millisecond. To accomplish this the scheduler tracks a lot of state. Every operation maintains the consistency of this state.
The scheduler communicates with the outside world through Comm objects. It maintains a consistent and valid view of the world even when listening to several clients at once.
A Scheduler is typically started either with the dask-scheduler
executable:
$ dask-scheduler Scheduler started at 127.0.0.1:8786
Or within a LocalCluster a Client starts up without connection information:
>>> c = Client() # doctest: +SKIP >>> c.cluster.scheduler # doctest: +SKIP Scheduler(...)
Users typically do not interact with the scheduler directly but rather with the client object Client
.
State
The scheduler contains the following state variables. Each variable is listed along with what it stores and a brief description.
tasks: {task key: TaskState}
Tasks currently known to the scheduler
unrunnable: {TaskState}
Tasks in the "no-worker" state
workers: {worker key: WorkerState}
Workers currently connected to the scheduler
idle: {WorkerState}
:
Set of workers that are not fully utilized
saturated: {WorkerState}
:
Set of workers that are not over-utilized
host_info: {hostname: dict}
:
Information about each worker host
clients: {client key: ClientState}
Clients currently connected to the scheduler
services: {str: port}
:
Other services running on this scheduler, like Bokeh
loop: IOLoop
:
The running Tornado IOLoop
client_comms: {client key: Comm}
For each client, a Comm object used to receive task requests and report task status updates.
stream_comms: {worker key: Comm}
For each worker, a Comm object from which we both accept stimuli and report results
task_duration: {key-prefix: time}
Time we expect certain functions to take, e.g. {'sum': 0.25}
Dynamic distributed task scheduler
The following pages refer to to this document either explicitly or contain code examples using this.
distributed.deploy.spec.SpecCluster
distributed.client.Client
Hover to see nodes names; edges to Self not shown, Caped at 50 nodes.
Using a canvas is more power efficient and can get hundred of nodes ; but does not allow hyperlinks; , arrows or text (beyond on hover)
SVG is more flexible but power hungry; and does not scale well to 50 + nodes.
All aboves nodes referred to, (or are referred from) current nodes; Edges from Self to other have been omitted (or all nodes would be connected to the central node "self" which is not useful). Nodes are colored by the library they belong to, and scaled with the number of references pointing them