# This file is a part of Remoulade.
#
# Copyright (C) 2017,2018 CLEARTYPE SRL <bogdan@cleartype.io>
#
# Remoulade is free software; you can redistribute it and/or modify it
# under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or (at
# your option) any later version.
#
# Remoulade is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
# FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
# License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import re
from typing import TYPE_CHECKING, Any, Callable, Dict, Generic, List, Optional, Tuple, TypeVar, Union, overload
from typing_extensions import Literal, ParamSpec, TypedDict
from .helpers.actor_arguments import get_actor_arguments
from .logging import get_logger
from .message import Message
from .result import Result
if TYPE_CHECKING:
from .broker import Broker
#: The regular expression that represents valid queue names.
_queue_name_re = re.compile(r"[a-zA-Z_][a-zA-Z0-9._-]*")
P = ParamSpec("P")
R = TypeVar("R")
class ActorDict(TypedDict):
name: str
queue_name: str
alternative_queues: Optional[List[str]]
priority: int
args: list
@overload
def actor(
fn: Literal[None] = None,
*,
actor_name: Optional[str] = None,
queue_name: str = "default",
alternative_queues: Optional[List[str]] = None,
priority: int = 0,
**options: Any,
) -> "Callable[[Callable[P, R]], Actor[P, R]]":
...
@overload
def actor(
fn: Callable[P, R],
*,
actor_name: Optional[str] = None,
queue_name: str = "default",
alternative_queues: Optional[List[str]] = None,
priority: int = 0,
**options: Any,
) -> "Actor[P, R]":
...
[docs]def actor(
fn: Optional[Callable[..., Any]] = None,
*,
actor_name: Optional[str] = None,
queue_name: str = "default",
alternative_queues: Optional[List[str]] = None,
priority: int = 0,
**options: Any,
) -> "Union[Actor[..., Any], Callable[..., Actor[..., Any]]]":
"""Declare an actor.
Examples:
>>> import remoulade
>>> @remoulade.actor
... def add(x, y):
... print(x + y)
...
>>> add
Actor(<function add at 0x106c6d488>, queue_name='default', actor_name='add')
You need to declare an actor before using it
>>> remoulade.declare_actors([add])
None
>>> add(1, 2)
3
>>> add.send(1, 2)
Message(
queue_name='default',
actor_name='add',
args=(1, 2), kwargs={}, options={},
message_id='e0d27b45-7900-41da-bb97-553b8a081206',
message_timestamp=1497862448685)
Parameters:
fn(callable): The function to wrap.
actor_name(str): The name of the actor.
queue_name(str): The name of the default queue to use.
alternative_queues(List[str]): The names of other queues this actor may be enqueued into.
priority(int): The actor's global priority. If two tasks have
been pulled on a worker concurrently and one has a higher
priority than the other then it will be processed first.
Higher numbers represent higher priorities.
**options(dict): Arbitrary options that vary with the set of
middleware that you use. See ``get_broker().actor_options``.
Returns:
Actor: The decorated function.
"""
def decorator(fn: Callable[P, R]) -> Actor[P, R]:
nonlocal actor_name
actor_name = actor_name or fn.__name__
queues_names = [queue_name]
if alternative_queues is not None:
queues_names += alternative_queues
if any(not _queue_name_re.fullmatch(name) for name in queues_names):
raise ValueError(
"Queue names must start with a letter or an underscore followed "
"by any number of letters, digits, dashes or underscores."
)
return Actor(
fn,
actor_name=actor_name,
queue_name=queue_name,
alternative_queues=alternative_queues,
priority=priority,
options=options,
)
if fn is None:
return decorator
return decorator(fn)
[docs]class Actor(Generic[P, R]):
"""Thin wrapper around callables that stores metadata about how
they should be executed asynchronously. Actors are callable.
Attributes:
logger(Logger): The actor's logger.
fn(callable): The underlying callable.
broker(Broker): The broker this actor is bound to.
actor_name(str): The actor's name.
queue_name(str): The actor's default queue.
alternative_queues(List[str]): The names of other queues this actor may be enqueued into.
priority(int): The actor's priority.
options(dict): Arbitrary options that are passed to the broker
and middleware.
"""
def __init__(
self,
fn: Callable[P, R],
*,
actor_name: str,
queue_name: str,
alternative_queues: Optional[List[str]],
priority: int,
options: Any,
) -> None:
self.logger = get_logger(fn.__module__, actor_name)
self.fn = fn
self.broker: "Optional[Broker]" = None
self.actor_name = actor_name
self.queue_name = queue_name
self.alternative_queues = alternative_queues
self.priority = priority
self.options = options
def set_broker(self, broker: "Broker") -> None:
invalid_options = set(self.options) - broker.actor_options
if invalid_options:
invalid_options_list = ", ".join(invalid_options)
message = "The following actor options are undefined: %s. " % invalid_options_list
message += "Did you forget to add a middleware to your Broker?"
raise ValueError(message)
self.broker = broker
@property
def queue_names(self):
return [self.queue_name] + (self.alternative_queues or [])
[docs] def message(self, *args: Any, **kwargs: Any) -> Message[Result[R]]:
"""Build a message. This method is useful if you want to
compose actors. See the actor composition documentation for
details.
Parameters:
*args(tuple): Positional arguments to send to the actor.
**kwargs(dict): Keyword arguments to send to the actor.
Examples:
>>> (add.message(1, 2) | add.message(3))
pipeline([add(1, 2), add(3)])
Returns:
Message: A message that can be enqueued on a broker.
"""
return self.message_with_options(args=args, kwargs=kwargs)
[docs] def message_with_options(
self,
*,
args: Optional[Tuple[Any, ...]] = None,
kwargs: Optional[Dict[str, Any]] = None,
queue_name: Optional[str] = None,
**options: Any,
) -> Message[Result[R]]:
"""Build a message with an arbitrary set of processing options.
This method is useful if you want to compose actors. See the
actor composition documentation for details.
Parameters:
args(tuple): Positional arguments that are passed to the actor.
kwargs(dict): Keyword arguments that are passed to the actor.
queue_name(str): Name of the queue to put this message into when enqueued.
**options(dict): Arbitrary options that are passed to the
broker and any registered middleware.
Returns:
Message: A message that can be enqueued on a broker.
"""
for middleware in self.broker.middleware:
options = middleware.update_options_before_create_message(options, self.broker, self.actor_name)
if queue_name is not None:
if queue_name not in self.queue_names:
raise ValueError(f"{queue_name} is not a valid queue name for actor {self.actor_name}.")
else:
queue_name = self.queue_name
return Message(
queue_name=queue_name,
actor_name=self.actor_name,
args=args or (),
kwargs=kwargs or {},
options=options,
)
[docs] def send(self, *args: P.args, **kwargs: P.kwargs) -> Message[Result[R]]:
"""Asynchronously send a message to this actor.
Parameters:
*args(tuple): Positional arguments to send to the actor.
**kwargs(dict): Keyword arguments to send to the actor.
Returns:
Message: The enqueued message.
"""
return self.send_with_options(args=args, kwargs=kwargs)
[docs] def send_with_options(
self,
*,
args: Optional[Tuple[Any, ...]] = None,
kwargs: Optional[Dict[str, Any]] = None,
queue_name: Optional[str] = None,
delay: Optional[int] = None,
**options: Any,
) -> Message[Result[R]]:
"""Asynchronously send a message to this actor, along with an
arbitrary set of processing options for the broker and
middleware.
Parameters:
args(tuple): Positional arguments that are passed to the actor.
kwargs(dict): Keyword arguments that are passed to the actor.
queue_name(str): Name of the queue to enqueue this message into.
delay(int): The minimum amount of time, in milliseconds, the
message should be delayed by.
**options(dict): Arbitrary options that are passed to the
broker and any registered middleware.
Returns:
Message: The enqueued message.
"""
if not self.broker:
raise ValueError("No broker is set, did you forget to call set_broker ?")
message = self.message_with_options(args=args, kwargs=kwargs, queue_name=queue_name, **options)
return self.broker.enqueue(message, delay=delay)
def as_dict(self) -> ActorDict:
return {
"name": self.actor_name,
"queue_name": self.queue_name,
"alternative_queues": self.alternative_queues,
"priority": self.priority,
"args": get_actor_arguments(self),
}
def __call__(self, *args: P.args, **kwargs: P.kwargs) -> R:
"""Synchronously call this actor.
Parameters:
*args: Positional arguments to send to the actor.
**kwargs: Keyword arguments to send to the actor.
Returns:
Whatever the underlying function backing this actor returns.
"""
return self.fn(*args, **kwargs)
def __repr__(self) -> str:
return f"Actor({repr(self.fn)}, queue_name={repr(self.queue_name)}, actor_name={repr(self.actor_name)})"
def __str__(self) -> str:
return f"Actor({self.actor_name})"