rebalance(self, comm=None, keys: 'Iterable[Hashable]' = None, workers: 'Iterable[str]' = None) -> dict
This operation is generally not well tested against normal operation of the scheduler. It is not recommended to use it while waiting on computations.
Algorithm
Find the mean occupancy of the cluster, defined as data managed by dask + unmanaged process memory that has been there for at least 30 seconds ( distributed.worker.memory.recent-to-old-time
). This lets us ignore temporary spikes caused by task heap usage.
Alternatively, you may change how memory is measured both for the individual workers as well as to calculate the mean through distributed.worker.memory.rebalance.measure
. Namely, this can be useful to disregard inaccurate OS memory measurements.
Discard workers whose occupancy is within 5% of the mean cluster occupancy ( distributed.worker.memory.rebalance.sender-recipient-gap
/ 2). This helps avoid data from bouncing around the cluster repeatedly.
Workers above the mean are senders; those below are recipients.
Discard senders whose absolute occupancy is below 30% ( distributed.worker.memory.rebalance.sender-min
). In other words, no data is moved regardless of imbalancing as long as all workers are below 30%.
Discard recipients whose absolute occupancy is above 60% ( distributed.worker.memory.rebalance.recipient-max
). Note that this threshold by default is the same as distributed.worker.memory.target
to prevent workers from accepting data and immediately spilling it out to disk.
Iteratively pick the sender and recipient that are farthest from the mean and move the least recently inserted key between the two, until either all senders or all recipients fall within 5% of the mean.
A recipient will be skipped if it already has a copy of the data. In other words, this method does not degrade replication. A key will be skipped if there are no recipients available with enough memory to accept the key and that don't already hold a copy.
The least recently insertd (LRI) policy is a greedy choice with the advantage of being O(1), trivial to implement (it relies on python dict insertion-sorting) and hopefully good enough in most cases. Discarded alternative policies were:
Largest first. O(n*log(n)) save for non-trivial additional data structures and risks causing the largest chunks of data to repeatedly move around the cluster like pinballs.
Least recently used (LRU). This information is currently available on the workers only and not trivial to replicate on the scheduler; transmitting it over the network would be very expensive. Also, note that dask will go out of its way to minimise the amount of time intermediate keys are held in memory, so in such a case LRI is a close approximation of LRU.
whitelist of dask keys that should be considered for moving. All other keys will be ignored. Note that this offers no guarantee that a key will actually be moved (e.g. because it is unnecessary or because there are no viable recipient workers for it).
whitelist of workers addresses to be considered as senders or recipients. All other workers will be ignored. The mean cluster occupancy will be calculated only using the whitelisted workers.
Rebalance keys so that each worker ends up with roughly the same process memory (managed+unmanaged).
The following pages refer to to this document either explicitly or contain code examples using this.
distributed.scheduler.Scheduler.replicate
Hover to see nodes names; edges to Self not shown, Caped at 50 nodes.
Using a canvas is more power efficient and can get hundred of nodes ; but does not allow hyperlinks; , arrows or text (beyond on hover)
SVG is more flexible but power hungry; and does not scale well to 50 + nodes.
All aboves nodes referred to, (or are referred from) current nodes; Edges from Self to other have been omitted (or all nodes would be connected to the central node "self" which is not useful). Nodes are colored by the library they belong to, and scaled with the number of references pointing them