Source code for council.runners.parallel
from concurrent import futures
from council.contexts import ChainContext
from .runner_base import RunnerBase
from .runner_executor import RunnerExecutor
[docs]
class Parallel(RunnerBase):
"""
Runner that execution multiple :class:`.RunnerBase` in parallel
"""
def __init__(self, *runners: RunnerBase):
super().__init__("parallelRunner")
self._runners = self.new_monitors("parallel", runners)
def _run(
self,
context: ChainContext,
executor: RunnerExecutor,
) -> None:
contexts = [(runner.inner, context.fork_for(runner)) for runner in self._runners]
# Seems like it is a bad idea using lambda as the function in submit,
# which results into inconsistent invocation (wrong arguments)
fs = [executor.submit(runner.run, inner, executor) for (runner, inner) in contexts]
try:
dones, not_dones = futures.wait(fs, context.budget.remaining_duration, futures.FIRST_EXCEPTION)
self.rethrow_if_exception(dones)
finally:
context.merge([context for (_, context) in contexts])
[f.cancel() for f in fs]