distributed 2021.10.0

ParametersBackRef
run(self, function, *args, **kwargs)

This calls a function on all currently known workers immediately, blocks until those results come back, and returns the results asynchronously as a dictionary keyed by worker address. This method if generally used for side effects, such and collecting diagnostic information or installing libraries.

If your function takes an input argument named dask_worker then that variable will be populated with the worker itself.

Parameters

function : callable
*args : arguments for remote function
**kwargs : keyword arguments for remote function
workers : list

Workers on which to run the function. Defaults to all known workers.

wait : boolean (optional)

If the function is asynchronous whether or not to wait until that function finishes.

nanny : bool, defualt False

Whether to run function on the nanny. By default, the function is run on the worker process. If specified, the addresses in workers should still be the worker addresses, not the nanny addresses.

Run a function on all workers outside of task scheduling system

Examples

This example is valid syntax, but we were not able to check execution
>>> c.run(os.getpid)  # doctest: +SKIP
{'192.168.0.100:9000': 1234,
 '192.168.0.101:9000': 4321,
 '192.168.0.102:9000': 5555}

Restrict computation to particular workers with the workers= keyword argument.

This example is valid syntax, but we were not able to check execution
>>> c.run(os.getpid, workers=['192.168.0.100:9000',
...  '192.168.0.101:9000']) # doctest: +SKIP {'192.168.0.100:9000': 1234, '192.168.0.101:9000': 4321}
This example is valid syntax, but we were not able to check execution
>>> def get_status(dask_worker):
...  return dask_worker.status
This example is valid syntax, but we were not able to check execution
>>> c.run(get_hostname)  # doctest: +SKIP
{'192.168.0.100:9000': 'running',
 '192.168.0.101:9000': 'running}

Run asynchronous functions in the background:

This example is valid syntax, but we were not able to check execution
>>> async def print_state(dask_worker):  # doctest: +SKIP
...  while True:
...  print(dask_worker.status)
...  await asyncio.sleep(1)
This example is valid syntax, but we were not able to check execution
>>> c.run(print_state, wait=False)  # doctest: +SKIP
See :

Back References

The following pages refer to to this document either explicitly or contain code examples using this.

distributed.client.Client.run_on_scheduler

Local connectivity graph

Hover to see nodes names; edges to Self not shown, Caped at 50 nodes.

Using a canvas is more power efficient and can get hundred of nodes ; but does not allow hyperlinks; , arrows or text (beyond on hover)

SVG is more flexible but power hungry; and does not scale well to 50 + nodes.

All aboves nodes referred to, (or are referred from) current nodes; Edges from Self to other have been omitted (or all nodes would be connected to the central node "self" which is not useful). Nodes are colored by the library they belong to, and scaled with the number of references pointing them


File: /distributed/client.py#2435
type: <class 'function'>
Commit: