Source code for remoulade.middleware.pipelines

# This file is a part of Remoulade.
#
# Copyright (C) 2017,2018 CLEARTYPE SRL <bogdan@cleartype.io>
#
# Remoulade is free software; you can redistribute it and/or modify it
# under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or (at
# your option) any later version.
#
# Remoulade is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
# FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
# License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
from typing import Any

from ..errors import NoResultBackend
from ..logging import get_logger
from ..results import Results
from .middleware import Middleware


[docs]class Pipelines(Middleware): """Middleware that lets you pipe actors together so that the output of one actor feeds into the input of another. Parameters: pipe_ignore(bool): When True, ignores the result of the previous actor in the pipeline. pipe_target(dict): A message representing the actor the current result should be fed into. pipe_on_error(bool): When True, pipe errors to next actor in line. """ def __init__(self): self.logger = get_logger(__name__, type(self)) self.pipe_on_error: bool = False @property def actor_options(self): return { "pipe_ignore", "pipe_target", "pipe_on_error", } def after_process_message(self, broker, message, *, result=None, exception=None): from ..composition import GroupInfo pipe_target = self.get_option("pipe_target", broker=broker, message=message) group_info = self.get_option("group_info", broker=broker, message=message) if group_info: group_info = GroupInfo(**group_info) if pipe_target is not None: res = None pipe_on_error: bool = self.get_option("pipe_on_error", broker=broker, message=message) if exception is None: res = result # Error handling: we want to pipe only on message.failed == True (end of retry) elif pipe_on_error and getattr(message, "failed", False): res = Results._serialize_exception(exception) else: # If we have an exception we do not want to pipe (<=> pipe_on_error == False) # and exception is not None return try: if group_info and not self._group_completed(message, group_info, broker): return if "trace_ctx" in message.options: for i in range(len(pipe_target)): pipe_target[i]["options"]["trace_ctx"] = message.options.get("trace_ctx") self._send_next_message(pipe_target, broker, res, group_info, raise_on_error=not pipe_on_error) broker.emit_after("enqueue_pipe_target", group_info) except NoResultBackend as e: self.logger.error(str(e)) message.fail() def _send_next_message(self, pipe_target, broker, result, group_info, raise_on_error: bool): """Send a message to the pipe target (if it's a list to all the pipe targets) If it's a group, we need to get the result of each actor from the result backend because it's not present in the message. Parameters: pipe_target(dict|List[dict]): the message or list of message to enqueue broker(Broker): the broker result(any): the result of the actor (to be send with the message if pipe_ignore=False) group_info(GroupInfo|None): the info of the group if it's a group else None """ # Since Pipelines is a default middleware, this import has to # happen at runtime in order to avoid a cyclic dependency # from broker -> pipelines -> messages -> broker. from ..message import Message for message_data in pipe_target: next_message = Message[Any](**message_data) pipe_ignore = self.get_option("pipe_ignore", broker=broker, message=next_message, default=False) if not pipe_ignore: if group_info: result = self._group_results(group_info, broker, raise_on_error) next_message = next_message.copy(args=next_message.args + (result,)) broker.enqueue(next_message) @staticmethod def _group_results(group_info, broker, raise_on_error: bool = True): """Get the result of a group (fetch the group members message_ids then all the results)""" from ..collection_results import CollectionResults from ..results.errors import ErrorStored try: result_backend = broker.get_result_backend() except NoResultBackend as e: raise NoResultBackend("Pipeline with groups are ony supported with a result backend") from e message_ids = result_backend.get_group_message_ids(group_id=group_info.group_id) results = CollectionResults.from_message_ids(message_ids) return [str(r) if isinstance(r, ErrorStored) else r for r in results.get(raise_on_error=raise_on_error)] def _group_completed(self, message, group_info, broker): """Returns true if a group is completed, and increment the completion count of the group Parameters: group_info(GroupInfo): the info of the group to get the completion from broker(Broker): the broker to use Raises: NoResultBackend: if there is no result backend set """ try: results = broker.get_middleware(Results) except NoResultBackend as e: raise NoResultBackend("Pipeline with groups are ony supported with a result backend") from e group_completion = 0 # noqa with results.backend.retry(broker, message, self.logger): ttl = results.get_option("result_ttl", broker=broker, message=message) group_completion = results.backend.increment_group_completion(group_info.group_id, message.message_id, ttl) group_competed = group_completion >= group_info.children_count if group_competed: self.logger.info("Finished group %s.", group_info.group_id) if group_completion == 1: self.logger.info("First message of Group %s has been completed.", group_info.group_id) return group_competed