Source code for remoulade.middleware.time_limit

# This file is a part of Remoulade.
#
# Copyright (C) 2017,2018 CLEARTYPE SRL <bogdan@cleartype.io>
#
# Remoulade is free software; you can redistribute it and/or modify it
# under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or (at
# your option) any later version.
#
# Remoulade is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
# FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
# License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
import threading
import time
import warnings
from typing import TYPE_CHECKING, Dict, Optional, Set, Tuple

from ..errors import NoCancelBackend
from ..logging import get_logger
from .middleware import Middleware
from .threading import Interrupt, current_platform, raise_thread_exception, supported_platforms

if TYPE_CHECKING:
    from ..broker import Broker
    from ..message import Message


[docs]class TimeLimitExceeded(Interrupt): """Exception used to interrupt worker threads when actors exceed their time limits. """
[docs]class TimeLimit(Middleware): """Middleware that cancels actors that run for too long. Currently, this is only available on CPython. Note: This works by setting an async exception in the worker thread that runs the actor. This means that the exception will only get called the next time that thread acquires the GIL. Concretely, this means that this middleware can't cancel system calls. Parameters: time_limit(int): The maximum number of milliseconds actors may run for. interval(int): The interval (in milliseconds) with which to check for actors that have exceeded the limit. exit_delay(int): The delay (in milliseconds) after with we stop (SystemExit) to the worker if the exception failed to stop the message (ie. system calls). None to disable, disabled by default. """ def __init__(self, *, time_limit: int = 1800000, interval: int = 1000, exit_delay: Optional[int] = None) -> None: self.logger = get_logger(__name__, type(self)) self.time_limit = time_limit self.interval = interval self.deadlines: "Dict[int, Optional[Tuple[float, bool, Message]]]" = {} self.exit_delay = exit_delay self.lock = threading.Lock() def _handle(self, *_) -> None: with self.lock: current_time = time.monotonic() for thread_id, row in self.deadlines.items(): if row is None: continue deadline, exception_raised, message = row if exception_raised and self.exit_delay and current_time >= deadline + self.exit_delay / 1000: self.deadlines[thread_id] = None try: message.cancel() self.logger.critical( "Could not stop message %s execution with TimeLimitExceeded, " "cancel message and exit the worker", message.message_id, ) raise SystemExit(1) except NoCancelBackend: # Sending SIGKILL would requeue the message via RabbitMQ self.logger.error( "Could not stop message %s execution with TimeLimitExceeded, " "but do not stop the worker as the message would be requeue", message.message_id, ) elif not exception_raised and current_time >= deadline: self.logger.warning("Time limit exceeded. Raising exception in worker thread %r.", thread_id) self.deadlines[thread_id] = deadline, True, message raise_thread_exception(thread_id, TimeLimitExceeded) def _timer(self): while True: try: self._handle() except: # noqa self.logger.exception("Unhandled error while running the time limit handler.", exc_info=True) time.sleep(self.interval / 1000) @property def actor_options(self) -> Set[str]: return {"time_limit"} def after_process_boot(self, _) -> None: if current_platform in supported_platforms: thread = threading.Thread(target=self._timer, daemon=True) thread.start() else: msg = f"TimeLimit cannot kill threads on your current platform {current_platform}." warnings.warn(msg, category=RuntimeWarning, stacklevel=2) def before_process_message(self, broker: "Broker", message: "Message") -> None: limit = self.get_option("time_limit", broker=broker, message=message) deadline = time.monotonic() + limit / 1000 with self.lock: self.deadlines[threading.get_ident()] = deadline, False, message def after_process_message(self, broker: "Broker", message: "Message", *, result=None, exception=None) -> None: with self.lock: self.deadlines[threading.get_ident()] = None after_skip_message = after_process_message after_message_canceled = after_process_message