# This file is a part of Remoulade.
#
# Copyright (C) 2017,2018 CLEARTYPE SRL <bogdan@cleartype.io>
#
# Remoulade is free software; you can redistribute it and/or modify it
# under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or (at
# your option) any later version.
#
# Remoulade is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
# FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
# License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
from __future__ import annotations
from collections import namedtuple
from contextlib import nullcontext
from typing import TYPE_CHECKING, Any, Generic, Iterable, List, Optional, Tuple, TypeVar, Union, cast, overload
from typing_extensions import Self, TypedDict, Unpack
from .broker import get_broker
from .collection_results import CollectionResults
from .common import flatten, generate_unique_id
if TYPE_CHECKING:
from .message import Message # noqa
from .result import Result
ResultsT = TypeVar("ResultsT", bound="Union[Result[Any], CollectionResults[Any]]")
ResultsT_1 = TypeVar("ResultsT_1", bound="Union[Result[Any], CollectionResults[Any]]")
class GroupInfoDict(TypedDict):
group_id: str
children_count: int
class GroupInfo(namedtuple("GroupInfo", ("group_id", "children_count"))):
"""Encapsulates metadata about a group being sent to multiple actors.
Parameters:
group_id(str): The id of the group
children_count(int)
"""
def __new__(cls, *, group_id: str, children_count: int):
return super().__new__(cls, group_id, children_count)
def asdict(self) -> GroupInfoDict:
return cast(GroupInfoDict, self._asdict())
[docs]class pipeline(Generic[ResultsT]):
"""Chain actors together, passing the result of one actor to the
next one in line.
Parameters:
children(Iterable[Message|pipeline|group]): A sequence of messages or
pipelines or groups. Child pipelines are flattened into the resulting
pipeline.
broker(Broker): The broker to run the pipeline on. Defaults to
the current global broker.
cancel_on_error(boolean): True if you want to cancel all messages of a composition if on of
the actor fails, this is only possible with a Cancel middleware.
Attributes:
children(List[Message|group]) The sequence of messages or groups to execute as a pipeline
"""
@overload
def __init__(
self,
# we should actually not use ResultTs here but define a new type var that is only bound to Result
# but then mypy gets lost, so reusing ResultsT and ignoring the error
children: Tuple[Unpack[Tuple[Union[Message[Any], pipeline[Any], group[Any]], ...]], Union[Message[ResultsT], pipeline[ResultsT]]], # type: ignore
cancel_on_error: bool = False,
):
...
@overload
def __init__(
self: pipeline[CollectionResults[ResultsT_1]],
children: Tuple[Unpack[Tuple[Union[Message[Any], pipeline[Any], group[Any]], ...]], group[ResultsT_1]], # type: ignore [misc]
cancel_on_error: bool = False,
):
...
@overload
def __init__(
self,
children: Tuple[Union[Message[Any], pipeline[Any], group[Any]], ...],
cancel_on_error: bool = False,
):
...
def __init__(
self,
children: Tuple[Union[Message[Any], pipeline[Any], group[Any]], ...],
cancel_on_error: bool = False,
):
self.broker = get_broker()
self.children: List[Union[Message[Any], group[Any]]] = []
for child in children:
if isinstance(child, pipeline):
self.children += child.children
elif isinstance(child, group):
self.children.append(child)
else:
self.children.append(child.copy())
self.cancel_on_error = cancel_on_error
if cancel_on_error:
self.broker.get_cancel_backend()
[docs] def build(self, *, last_options=None, composition_id: str = None, cancel_on_error: bool = False):
"""Build the pipeline, return the first message to be enqueued or integrated in another pipeline
Build the pipeline by starting at the end. We build a message with all it's options in one step and
we serialize it (asdict) as the previous message pipe_target in the next step.
We need to know what is the options (pipe_target) of the pipeline before building it because we cannot
edit the pipeline after it has been built.
Parameters:
cancel_on_error(bool): Whether the whole composition should be canceled when one of its messages fails
composition_id(str): The composition id to pass to messages
last_options(dict): options to be assigned to the last actor of the pipeline (ex: pipe_target)
Returns:
the first message of the pipeline
"""
composition_id = composition_id or generate_unique_id()
cancel_on_error = cancel_on_error or self.cancel_on_error
next_child = None
for child in reversed(self.children):
if next_child:
options = {"pipe_target": [m.asdict() for m in next_child]}
else:
options = last_options or {}
options["composition_id"] = composition_id
options["cancel_on_error"] = cancel_on_error
if isinstance(child, group):
next_child = child.build(options)
else:
next_child = [child.build(options)]
return next_child
def __len__(self):
"""Returns the length of the pipeline."""
return len(self.children)
def __or__(self, other: "Union[Message, group]"):
"""Returns a new pipeline with "other" added to the end."""
return type(self)(tuple(self.children) + (other,))
def __str__(self): # pragma: no cover
return "pipeline([%s])" % ", ".join(str(m) for m in self.children)
@property
def message_ids(self):
for child in self.children:
if isinstance(child, group):
yield list(child.message_ids)
else:
yield child.message_id
[docs] def run(self, *, delay: Optional[int] = None, transaction: Optional[bool] = None) -> Self:
"""Run this pipeline.
Parameters:
delay(int): The minimum amount of time, in milliseconds, the
pipeline should be delayed by.
Returns:
pipeline: Itself.
"""
transaction = transaction if transaction is not None else self.broker.group_transaction
with self.broker.tx() if transaction else nullcontext():
first = self.build()
if isinstance(first, list):
for message in first:
self.broker.enqueue(message, delay=delay)
else:
self.broker.enqueue(first, delay=delay)
return self
@property
def results(self) -> CollectionResults[Any]:
"""CollectionResults created from this pipeline, used for result related methods"""
results: List[Union[Result, CollectionResults]] = []
for element in self.children:
results += [element.results if isinstance(element, group) else element.result]
return CollectionResults(results)
@property
def result(self) -> ResultsT:
"""Result of the last message/group of the pipeline"""
last_child = self.children[-1]
return cast(ResultsT, last_child.results if isinstance(last_child, group) else last_child.result)
[docs] def cancel(self) -> None:
"""Mark all the children as cancelled"""
broker = get_broker()
backend = broker.get_cancel_backend()
backend.cancel(list(flatten(self.message_ids)))
[docs]class group(Generic[ResultsT]):
"""Run a group of actors in parallel.
Parameters:
children(Iterable[Message|pipeline]): A sequence of messages or pipelines.
Attributes:
children(List[Message|pipeline]) The sequence to execute as a group
Raise:
NoCancelBackend: if no cancel middleware is set
"""
def __init__(
self,
# we should actually not use ResultTs here but define a new type var that is only bound to Result
# but then mypy gets lost, so reusing ResultsT and ignoring the error
children: "Iterable[Union[pipeline[ResultsT], Message[ResultsT]]]", # type: ignore
group_id: Optional[str] = None,
cancel_on_error: bool = False,
) -> None:
self.children: "List[Union[Message[Any], pipeline[Any]]]" = []
for child in children:
if isinstance(child, group):
raise ValueError("Groups of groups are not supported")
self.children.append(child)
self.broker = get_broker()
self.group_id: str = generate_unique_id() if group_id is None else group_id
self.cancel_on_error = cancel_on_error
if cancel_on_error:
self.broker.get_cancel_backend()
def __or__(self, other: "Union[Message, group, pipeline]") -> pipeline:
"""Combine this group into a pipeline with "other"."""
return pipeline((self, other))
def __len__(self) -> int:
"""Returns the size of the group."""
return len(self.children)
def __str__(self) -> str: # pragma: no cover
return f"group({', '.join(str(child) for child in self.children)})"
[docs] def build(self, options=None) -> "List[Message]":
"""Build group for pipeline"""
if options is None:
options = {}
else:
self.broker.emit_before("build_group_pipeline", group_id=self.group_id, message_ids=list(self.message_ids))
composition_id = options.get("composition_id", self.group_id)
cancel_on_error = options.get("cancel_on_error", self.cancel_on_error)
options = {
"group_info": self.info.asdict(),
"composition_id": composition_id,
"cancel_on_error": self.cancel_on_error,
**options,
}
messages: "List[Message]" = []
for group_child in self.children:
if isinstance(group_child, pipeline):
messages += group_child.build(
last_options=options, composition_id=composition_id, cancel_on_error=cancel_on_error
)
else:
messages += [group_child.build(options)]
return messages
@property
def info(self) -> GroupInfo:
"""Info used for group completion and cancel"""
return GroupInfo(group_id=self.group_id, children_count=len(self.children))
@property
def message_ids(self):
for child in self.children:
if isinstance(child, pipeline):
yield list(child.message_ids)
else:
yield child.message_id
[docs] def run(self, *, delay: Optional[int] = None, transaction: Optional[bool] = None) -> Self:
"""Run the actors in this group.
Parameters:
delay(int): The minimum amount of time, in milliseconds,
each message in the group should be delayed by.
transaction(bool):
"""
transaction = transaction if transaction is not None else self.broker.group_transaction
with self.broker.tx() if transaction else nullcontext():
for message in self.build():
self.broker.enqueue(message, delay=delay)
return self
@property
def results(self) -> CollectionResults[ResultsT]:
"""CollectionResults created from this group, used for result related methods"""
return cast(CollectionResults[ResultsT], CollectionResults(children=[child.result for child in self.children]))
[docs] def cancel(self) -> None:
"""Mark all the children as cancelled"""
broker = get_broker()
backend = broker.get_cancel_backend()
backend.cancel(list(flatten(self.message_ids)))