# This file is a part of Remoulade.
#
# Copyright (C) 2017,2018 CLEARTYPE SRL <bogdan@cleartype.io>
#
# Remoulade is free software; you can redistribute it and/or modify it
# under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or (at
# your option) any later version.
#
# Remoulade is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
# FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
# License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
from typing import Any, Set
from ..logging import get_logger
from ..middleware import Middleware
from .backend import BackendResult
from .errors import ParentFailed
#: The maximum amount of milliseconds results are allowed to exist in
#: the backend.
DEFAULT_RESULT_TTL = 600000
[docs]class Results(Middleware):
"""Middleware that automatically stores actor results.
Example:
>>> from remoulade.results import Results
>>> from remoulade.results.backends import RedisBackend
>>> backend = RedisBackend()
>>> broker.add_middleware(Results(backend=backend))
>>> @remoulade.actor(store_results=True)
... def add(x, y):
... return x + y
>>> broker.declare_actor(add)
>>> message = add.send(1, 2)
>>> message.result.get()
3
Parameters:
backend(ResultBackend): The result backend to use when storing
results.
store_results(bool): Whether or not actor results should be
stored. Defaults to False and can be set on a per-actor
basis.
result_ttl(int): The maximum number of milliseconds results are
allowed to exist in the backend. Defaults to 10 minutes and
can be set on a per-actor basis.
"""
def __init__(self, *, backend=None, store_results=False, result_ttl=None):
self.logger = get_logger(__name__, type(self))
self.backend = backend
self.store_results = store_results
self.result_ttl = result_ttl or DEFAULT_RESULT_TTL
@property
def actor_options(self):
return {"store_results", "result_ttl"}
def after_process_message(self, broker, message, *, result=None, exception=None):
from ..middleware import Pipelines
store_results = self.get_option("store_results", broker=broker, message=message)
result_ttl = self.get_option("result_ttl", broker=broker, message=message)
message_failed = getattr(message, "failed", False)
try:
pipe_on_error: bool = broker.get_middleware(Pipelines).get_option(
"pipe_on_error", broker=broker, message=message
)
except: # noqa
pipe_on_error = False
results = []
if store_results:
if exception is None:
results.append(
(message.message_id, BackendResult(result=result, error=None, actor_name=message.actor_name))
)
elif message_failed:
error_str = self._serialize_exception(exception)
results.append(
(message.message_id, BackendResult(result=None, error=error_str, actor_name=message.actor_name))
)
# even if the actor do not have store_results, we need to invalidate the messages in the pipeline that has it
if message_failed and not pipe_on_error:
error_str = self._serialize_exception(exception)
exception = ParentFailed(f"{message} failed because of {error_str}")
children_result = BackendResult(
result=None, error=self._serialize_exception(exception), actor_name=message.actor_name
)
for message_id in self._get_children_message_ids(
broker, self.get_option("pipe_target", broker=broker, message=message)
):
results.append((message_id, children_result))
if results:
message_ids, results = zip(*results)
with self.backend.retry(broker, message, self.logger):
self.backend.store_results(message_ids, results, result_ttl)
@staticmethod
def _serialize_exception(exception):
try:
return repr(exception)
except Exception as e: # noqa
return "Exception could not be serialized"
def _get_children_message_ids(self, broker, pipe_target):
"""Get the ids of all the following messages in the pipeline which have store_results"""
from ..message import Message
message_ids: Set[str] = set()
if isinstance(pipe_target, list):
for message_data in pipe_target:
message_ids |= self._get_children_message_ids(broker, message_data)
elif pipe_target:
message = Message[Any](**pipe_target)
if self.get_option("store_results", broker=broker, message=message):
message_ids.add(message.message_id)
message_ids |= self._get_children_message_ids(
broker, self.get_option("pipe_target", broker=broker, message=message)
)
return message_ids
def after_enqueue_pipe_target(self, _, group_info):
"""After a pipe target has been enqueued, we need to forget the result of the group (if it's a group)"""
if group_info:
message_ids = self.backend.get_group_message_ids(group_info.group_id)
self.backend.forget_results(message_ids, self.result_ttl)
self.backend.delete_group_message_ids(group_info.group_id)
self.backend.delete_group_completion(group_info.group_id)
def before_build_group_pipeline(self, _, group_id, message_ids):
self.backend.set_group_message_ids(group_id, message_ids, self.result_ttl)