Source code for council.contexts._cancellation_token
from threading import Lock
[docs]
class CancellationToken:
"""
A cancellation token which is initially not set.
"""
def __init__(self) -> None:
self._cancelled = False
self._lock = Lock()
[docs]
def cancel(self) -> None:
"""
set the cancellation token.
"""
with self._lock:
self._cancelled = True
@property
def cancelled(self) -> bool:
"""
returns `True` if the cancellation token is set, otherwise, `False`
"""
return self._cancelled