ParallelFor#

class council.runners.ParallelFor(generator: Callable[[ChainContext, Budget], Iterable[Any]], skill: SkillRunnerBase, parallelism: int = 5)[source]#

Bases: LoopRunnerBase

Invoke a given skill for each value returned by a given generator function. Can run multiple iteration in parallel. For each invocation, the current iteration current is provided through the skill context SkillContext.iteration().

IterationContext.value() provides the value as returned by the generator function

IterationContext.index() provides the index of the iteration

Notes

Skill iteration are scheduled in the order given by the generator function. However, because multiple iterations can execute in parallel, no assumptions should be made on the order of results.

__init__(generator: Callable[[ChainContext, Budget], Iterable[Any]], skill: SkillRunnerBase, parallelism: int = 5)[source]#

Initialize a new instance

Parameters:

generator (RunnerGenerator) – a generator function that yields results

Example 1#

The example below demonstrate how to use the parallel for in a chain.

from council.chains import Chain
from council.contexts import ChainContext
from council.runners import Budget, ParallelFor
from council.mocks import MockSkill

def generator(context: ChainContext, budget: Budget):
    for i in range(0, 5):
        yield "hi"

chain = Chain(name="name", description="parallel for", runners=[ParallelFor(generator, MockSkill())])

Example 2#

This example builds on the previous one and shows how to consume the iteration into a skill.

from council.chains import Chain
from council.contexts import ChatMessage, ChainContext, SkillContext
from council.runners import Budget, ParallelFor
from council.skills import SkillBase

def generator(context: ChainContext, budget: Budget):
    for i in range(0, 5):
        yield f"hi {i}"

class MySkill(SkillBase):
    def __init__(self):
        super().__init__("mySkill")

    def execute(self, context: SkillContext, budget: Budget) -> ChatMessage:
        it = context.iteration.unwrap()
        message = f"index {it.index}, {it.value}"
        print(message)
        return self.build_success_message(message=message)

chain = Chain(name="name", description="parallel for", runners=[ParallelFor(generator, MySkill(), parallelism=5)])
context = ChainContext.empty()
context.new_iteration()
chain.execute(context, Budget(1))

The output would looks like.

index 0, hi 0
index 1, hi 1
index 2, hi 2
index 3, hi 3
index 4, hi 4