dlt.common.runtime.signals
set_received_signal
def set_received_signal(sig: int) -> None
Called when signal was received
raise_if_signalled
def raise_if_signalled() -> None
Raises SignalReceivedException if signal was received.
was_signal_received
def was_signal_received() -> bool
check if a signal was received
sleep
def sleep(sleep_seconds: float) -> None
A signal-aware version of sleep function. Will wake up if signal is received but will not raise exception.
wake_all
def wake_all() -> None
Wakes all threads sleeping on event
intercepted_signals
@contextmanager
def intercepted_signals() -> Iterator[None]
Will intercept SIGINT and SIGTERM and will delay calling signal handlers until
raise_if_signalled is explicitly used or when a second signal with the same int value arrives.
A no-op when not called on main thread.
Can be nested - nested calls are no-ops.