distributed 2021.10.0

NotesParameters

Contains logic to dynamically resize a Dask cluster based on current use. This class needs to be paired with a system that can create and destroy Dask workers using a cluster resource manager. Typically it is built into already existing solutions, rather than used directly by users. It is most commonly used from the .adapt(...) method of various Dask cluster classes.

Notes

Subclasses can override Adaptive.target and Adaptive.workers_to_close to control when the cluster should be resized. The default implementation checks if there are too many tasks per worker or too little memory available (see Scheduler.adaptive_target ). The values for interval, min, max, wait_count and target_duration can be specified in the dask config under the distributed.adaptive key.

Parameters

cluster: object :

Must have scale and scale_down methods/coroutines

interval : timedelta or str, default "1000 ms"

Milliseconds between checks

wait_count: int, default 3 :

Number of consecutive times that a worker should be suggested for removal before we remove it.

target_duration: timedelta or str, default "5s" :

Amount of time we want a computation to take. This affects how aggressively we scale up.

worker_key: Callable[WorkerState] :

Function to group workers together when scaling down See Scheduler.workers_to_close for more information

minimum: int :

Minimum number of workers to keep around

maximum: int :

Maximum number of workers to keep around

**kwargs: :

Extra parameters to pass to Scheduler.workers_to_close

Adaptively allocate workers based on scheduler load. A superclass.

Examples

This is commonly used from existing Dask classes, like KubeCluster

This example is valid syntax, but we were not able to check execution
>>> from dask_kubernetes import KubeCluster
... cluster = KubeCluster()
... cluster.adapt(minimum=10, maximum=100)

Alternatively you can use it from your own Cluster class by subclassing from Dask's Cluster superclass

This example is valid syntax, but we were not able to check execution
>>> from distributed.deploy import Cluster
... class MyCluster(Cluster):
...  def scale_up(self, n):
...  """ Bring worker count up to n """
...  def scale_down(self, workers):
...  """ Remove worker addresses from cluster """
This example is valid syntax, but we were not able to check execution
>>> cluster = MyCluster()
... cluster.adapt(minimum=10, maximum=100)
See :

Local connectivity graph

Hover to see nodes names; edges to Self not shown, Caped at 50 nodes.

Using a canvas is more power efficient and can get hundred of nodes ; but does not allow hyperlinks; , arrows or text (beyond on hover)

SVG is more flexible but power hungry; and does not scale well to 50 + nodes.

All aboves nodes referred to, (or are referred from) current nodes; Edges from Self to other have been omitted (or all nodes would be connected to the central node "self" which is not useful). Nodes are colored by the library they belong to, and scaled with the number of references pointing them


File: /distributed/deploy/adaptive.py#16
type: <class 'type'>
Commit: