ParallelFor#
- class council.runners.ParallelFor(generator: Callable[[ChainContext, Budget], Iterable[Any]], skill: SkillRunnerBase, parallelism: int = 5)[source]#
Bases:
LoopRunnerBaseInvoke a given skill for each value returned by a given generator function. Can run multiple iteration in parallel. For each invocation, the current iteration current is provided through the skill context
SkillContext.iteration().IterationContext.value()provides the value as returned by the generator functionIterationContext.index()provides the index of the iterationNotes
Skill iteration are scheduled in the order given by the generator function. However, because multiple iterations can execute in parallel, no assumptions should be made on the order of results.
- __init__(generator: Callable[[ChainContext, Budget], Iterable[Any]], skill: SkillRunnerBase, parallelism: int = 5)[source]#
Initialize a new instance
- Parameters:
generator (RunnerGenerator) – a generator function that yields results
Example 1#
The example below demonstrate how to use the parallel for in a chain.
from council.chains import Chain
from council.contexts import ChainContext
from council.runners import Budget, ParallelFor
from council.mocks import MockSkill
def generator(context: ChainContext, budget: Budget):
for i in range(0, 5):
yield "hi"
chain = Chain(name="name", description="parallel for", runners=[ParallelFor(generator, MockSkill())])
Example 2#
This example builds on the previous one and shows how to consume the iteration into a skill.
from council.chains import Chain
from council.contexts import ChatMessage, ChainContext, SkillContext
from council.runners import Budget, ParallelFor
from council.skills import SkillBase
def generator(context: ChainContext, budget: Budget):
for i in range(0, 5):
yield f"hi {i}"
class MySkill(SkillBase):
def __init__(self):
super().__init__("mySkill")
def execute(self, context: SkillContext, budget: Budget) -> ChatMessage:
it = context.iteration.unwrap()
message = f"index {it.index}, {it.value}"
print(message)
return self.build_success_message(message=message)
chain = Chain(name="name", description="parallel for", runners=[ParallelFor(generator, MySkill(), parallelism=5)])
context = ChainContext.empty()
context.new_iteration()
chain.execute(context, Budget(1))
The output would looks like.
index 0, hi 0
index 1, hi 1
index 2, hi 2
index 3, hi 3
index 4, hi 4