Source code for remoulade.results.middleware

# This file is a part of Remoulade.
#
# Copyright (C) 2017,2018 CLEARTYPE SRL <bogdan@cleartype.io>
#
# Remoulade is free software; you can redistribute it and/or modify it
# under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or (at
# your option) any later version.
#
# Remoulade is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
# FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
# License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
from typing import Any, Set

from ..logging import get_logger
from ..middleware import Middleware
from .backend import BackendResult
from .errors import ParentFailed

#: The maximum amount of milliseconds results are allowed to exist in
#: the backend.
DEFAULT_RESULT_TTL = 600000


[docs]class Results(Middleware): """Middleware that automatically stores actor results. Example: >>> from remoulade.results import Results >>> from remoulade.results.backends import RedisBackend >>> backend = RedisBackend() >>> broker.add_middleware(Results(backend=backend)) >>> @remoulade.actor(store_results=True) ... def add(x, y): ... return x + y >>> broker.declare_actor(add) >>> message = add.send(1, 2) >>> message.result.get() 3 Parameters: backend(ResultBackend): The result backend to use when storing results. store_results(bool): Whether or not actor results should be stored. Defaults to False and can be set on a per-actor basis. result_ttl(int): The maximum number of milliseconds results are allowed to exist in the backend. Defaults to 10 minutes and can be set on a per-actor basis. """ def __init__(self, *, backend=None, store_results=False, result_ttl=None): self.logger = get_logger(__name__, type(self)) self.backend = backend self.store_results = store_results self.result_ttl = result_ttl or DEFAULT_RESULT_TTL @property def actor_options(self): return {"store_results", "result_ttl"} def after_process_message(self, broker, message, *, result=None, exception=None): from ..middleware import Pipelines store_results = self.get_option("store_results", broker=broker, message=message) result_ttl = self.get_option("result_ttl", broker=broker, message=message) message_failed = getattr(message, "failed", False) try: pipe_on_error: bool = broker.get_middleware(Pipelines).get_option( "pipe_on_error", broker=broker, message=message ) except: # noqa pipe_on_error = False results = [] if store_results: if exception is None: results.append( (message.message_id, BackendResult(result=result, error=None, actor_name=message.actor_name)) ) elif message_failed: error_str = self._serialize_exception(exception) results.append( (message.message_id, BackendResult(result=None, error=error_str, actor_name=message.actor_name)) ) # even if the actor do not have store_results, we need to invalidate the messages in the pipeline that has it if message_failed and not pipe_on_error: error_str = self._serialize_exception(exception) exception = ParentFailed(f"{message} failed because of {error_str}") children_result = BackendResult( result=None, error=self._serialize_exception(exception), actor_name=message.actor_name ) for message_id in self._get_children_message_ids( broker, self.get_option("pipe_target", broker=broker, message=message) ): results.append((message_id, children_result)) if results: message_ids, results = zip(*results) with self.backend.retry(broker, message, self.logger): self.backend.store_results(message_ids, results, result_ttl) @staticmethod def _serialize_exception(exception): try: return repr(exception) except Exception as e: # noqa return "Exception could not be serialized" def _get_children_message_ids(self, broker, pipe_target): """Get the ids of all the following messages in the pipeline which have store_results""" from ..message import Message message_ids: Set[str] = set() if isinstance(pipe_target, list): for message_data in pipe_target: message_ids |= self._get_children_message_ids(broker, message_data) elif pipe_target: message = Message[Any](**pipe_target) if self.get_option("store_results", broker=broker, message=message): message_ids.add(message.message_id) message_ids |= self._get_children_message_ids( broker, self.get_option("pipe_target", broker=broker, message=message) ) return message_ids def after_enqueue_pipe_target(self, _, group_info): """After a pipe target has been enqueued, we need to forget the result of the group (if it's a group)""" if group_info: message_ids = self.backend.get_group_message_ids(group_info.group_id) self.backend.forget_results(message_ids, self.result_ttl) self.backend.delete_group_message_ids(group_info.group_id) self.backend.delete_group_completion(group_info.group_id) def before_build_group_pipeline(self, _, group_id, message_ids): self.backend.set_group_message_ids(group_id, message_ids, self.result_ttl)