Source code for council.contexts._cancellation_token

from threading import Lock


[docs] class CancellationToken: """ A cancellation token which is initially not set. """ def __init__(self): self._cancelled = False self._lock = Lock()
[docs] def cancel(self) -> None: """ set the cancelletion token. """ with self._lock: self._cancelled = True
@property def cancelled(self) -> bool: """ returns `True` if the cancellation token is set, otherwise, `False` """ return self._cancelled