Source code for council.runners.budget
from __future__ import annotations
import time
from council.utils import read_env_int
class BudgetExpiredException(Exception):
pass
[docs]class Budget:
"""
Represents the budget available for the execution
Attributes:
duration (float): The number of seconds
"""
[docs] def __init__(self, duration: float):
"""
Initialize the Budget object
Args:
duration (float): The number of seconds
"""
self.duration = duration
self.deadline = time.monotonic() + duration
[docs] def remaining(self) -> Budget:
"""
Create a new instance with the remaining budget
Returns:
a new instance with the remaining budget
"""
return Budget(self.deadline - time.monotonic())
[docs] def is_expired(self) -> bool:
"""
Check if the budget is expired
Returns:
True is the budget is expired. Otherwise False
"""
return self.deadline < time.monotonic()
def __repr__(self):
return f"Budget({self.duration})"
[docs] @staticmethod
def default() -> "Budget":
"""
Helper function that create a new Budget with a default value.
Returns:
Budget
"""
duration = read_env_int("COUNCIL_DEFAULT_BUDGET", required=False, default=30)
return Budget(duration=duration.unwrap())
class InfiniteBudget(Budget):
def __init__(self):
super().__init__(10000)
def remaining(self) -> Budget:
return Budget(10000)
def is_expired(self) -> bool:
return False