Advanced (multi) channel communication¶
MultiChannel: container for multiple channels¶
Use execnet.MultiChannel
to work with multiple channels:
>>> import execnet
>>> ch1 = execnet.makegateway().remote_exec("channel.send(1)")
>>> ch2 = execnet.makegateway().remote_exec("channel.send(2)")
>>> mch = execnet.MultiChannel([ch1, ch2])
>>> len(mch)
2
>>> mch[0] is ch1 and mch[1] is ch2
True
>>> ch1 in mch and ch2 in mch
True
>>> sum(mch.receive_each())
3
Receive results from sub processes with a Queue¶
Use MultiChannel.make_receive_queue()
to get a queue
from which to obtain results:
>>> ch1 = execnet.makegateway().remote_exec("channel.send(1)")
>>> ch2 = execnet.makegateway().remote_exec("channel.send(2)")
>>> mch = execnet.MultiChannel([ch1, ch2])
>>> queue = mch.make_receive_queue()
>>> chan1, res1 = queue.get()
>>> chan2, res2 = queue.get(timeout=3)
>>> res1 + res2
3
Working asynchronously/event-based with channels¶
Use channel callbacks if you want to process incoming data immediately and without blocking execution:
>>> import execnet
>>> gw = execnet.makegateway()
>>> ch = gw.remote_exec("channel.receive() ; channel.send(42)")
>>> l = []
>>> ch.setcallback(l.append)
>>> ch.send(1)
>>> ch.waitclose()
>>> assert l == [42]
Note that the callback function will be executed in the receiver thread and should not block or run for too long.
Robustly receive results and termination notification¶
Use MultiChannel.make_receive_queue(endmarker)
to specify
an object to be put to the queue when the remote side of a channel
is closed. The endmarker will also be put to the Queue if the gateway
is blocked in execution and is terminated/killed:
>>> group = execnet.Group(['popen'] * 3) # create three gateways
>>> mch = group.remote_exec("channel.send(channel.receive()+1)")
>>> queue = mch.make_receive_queue(endmarker=42)
>>> mch[0].send(1)
>>> chan1, res1 = queue.get()
>>> res1
2
>>> group.terminate(timeout=1) # kill processes waiting on receive
>>> for i in range(3):
... chan1, res1 = queue.get()
... assert res1 == 42
>>> group
<Group []>
Saturate multiple Hosts and CPUs with tasks to process¶
If you have multiple CPUs or hosts you can create as many gateways and then have a process sit on each CPU and wait for a task to proceed. One complication is that we want to ensure clean termination of all processes and loose no result. Here is an example that just uses local subprocesses and does the task:
from __future__ import annotations
import execnet
group = execnet.Group()
for i in range(4): # 4 CPUs
group.makegateway()
def process_item(channel):
# task processor, sits on each CPU
import time
import random
channel.send("ready")
for x in channel:
if x is None: # we can shutdown
break
# sleep random time, send result
time.sleep(random.randrange(3))
channel.send(x * 10)
# execute taskprocessor everywhere
mch = group.remote_exec(process_item)
# get a queue that gives us results
q = mch.make_receive_queue(endmarker=-1)
tasks: list[int] | None = list(range(10)) # a list of tasks, here just integers
terminated = 0
while 1:
channel, item = q.get()
if item == -1:
terminated += 1
print("terminated %s" % channel.gateway.id)
if terminated == len(mch):
print("got all results, terminating")
break
continue
if item != "ready":
print(f"other side {channel.gateway.id} returned {item!r}")
if not tasks and tasks is not None:
print("no tasks remain, sending termination request to all")
mch.send_each(None)
tasks = None
if tasks:
task = tasks.pop()
channel.send(task)
print(f"sent task {task!r} to {channel.gateway.id}")
group.terminate()