Source code for council.runners.runner_base
import abc
from collections.abc import Set
from concurrent import futures
import logging
from council.contexts import ChainContext
from .runner_context import RunnerContext
from .budget import Budget
from .errrors import RunnerTimeoutError, RunnerError
from .runner_executor import RunnerExecutor
logger = logging.getLogger(__name__)
[docs]class RunnerBase(abc.ABC):
def run_from_chain_context(self, chain_context: ChainContext, budget: Budget, executor: RunnerExecutor):
context = RunnerContext(chain_context, budget)
try:
self.run(context, executor)
finally:
chain_context.current.extend(context.messages)
"""
Base runner class that handles common execution logic, including error management and timeout
"""
def run(
self,
context: RunnerContext,
executor: RunnerExecutor,
) -> None:
if context.should_stop():
return
logger.debug("start running %s", self.__class__.__name__)
try:
self._run(context, executor)
except futures.TimeoutError as e:
logger.debug("timeout running %s", self.__class__.__name__)
context.cancellation_token.cancel()
raise RunnerTimeoutError(self.__class__.__name__) from e
except RunnerError:
logger.debug("runner error running %s", self.__class__.__name__)
context.cancellation_token.cancel()
raise
except Exception as e:
logger.exception("an unexpected error occurred running %s", self.__class__.__name__)
context.cancellation_token.cancel()
raise RunnerError(f"an unexpected error occurred in {self.__class__.__name__}") from e
finally:
logger.debug("done running %s", self.__class__.__name__)
@staticmethod
def rethrow_if_exception(fs: Set[futures.Future]):
[f.result(timeout=0) for f in fs]
@abc.abstractmethod
def _run(
self,
context: RunnerContext,
executor: RunnerExecutor,
) -> None:
pass