API Reference¶
Functions¶
- remoulade.get_broker() remoulade.broker.Broker [source]¶
Get the global broker instance. If no global broker is set, this initializes a RabbitmqBroker and returns it.
- Returns
The default Broker.
- Return type
- Raises
ValueError if no broker is set –
- remoulade.set_broker(broker: remoulade.broker.Broker) None [source]¶
Configure the global broker instance.
- Parameters
broker (Broker) – The broker instance to use by default.
- remoulade.get_encoder() remoulade.encoder.Encoder [source]¶
Get the global encoder object.
- Returns
Encoder
- remoulade.set_encoder(encoder: remoulade.encoder.Encoder) None [source]¶
Set the global encoder object.
- Parameters
encoder (Encoder) – The encoder instance to use when serializing messages.
Actors & Messages¶
- remoulade.actor(fn: typing_extensions.Literal[None] = None, *, actor_name: Optional[str] = 'None', queue_name: str = "'default'", alternative_queues: Optional[List[str]] = 'None', priority: int = '0', **options: Any) Callable[[Callable[P, R]], Actor[P, R]] [source]¶
- remoulade.actor(fn: Callable[[remoulade.actor.P], remoulade.actor.R], *, actor_name: Optional[str] = 'None', queue_name: str = "'default'", alternative_queues: Optional[List[str]] = 'None', priority: int = '0', **options: Any) Actor[P, R]
Declare an actor.
Examples
>>> import remoulade
>>> @remoulade.actor ... def add(x, y): ... print(x + y) ... >>> add Actor(<function add at 0x106c6d488>, queue_name='default', actor_name='add')
You need to declare an actor before using it >>> remoulade.declare_actors([add]) None
>>> add(1, 2) 3
>>> add.send(1, 2) Message( queue_name='default', actor_name='add', args=(1, 2), kwargs={}, options={}, message_id='e0d27b45-7900-41da-bb97-553b8a081206', message_timestamp=1497862448685)
- Parameters
fn (callable) – The function to wrap.
actor_name (str) – The name of the actor.
queue_name (str) – The name of the default queue to use.
alternative_queues (List[str]) – The names of other queues this actor may be enqueued into.
priority (int) – The actor’s global priority. If two tasks have been pulled on a worker concurrently and one has a higher priority than the other then it will be processed first. Higher numbers represent higher priorities.
**options (dict) – Arbitrary options that vary with the set of middleware that you use. See
get_broker().actor_options
.
- Returns
The decorated function.
- Return type
- class remoulade.Actor(fn: Callable[[remoulade.actor.P], remoulade.actor.R], *, actor_name: str, queue_name: str, alternative_queues: Optional[List[str]], priority: int, options: Any)[source]¶
Thin wrapper around callables that stores metadata about how they should be executed asynchronously. Actors are callable.
- logger¶
The actor’s logger.
- Type
Logger
- fn¶
The underlying callable.
- Type
callable
- message(*args: Any, **kwargs: Any) remoulade.message.Message[remoulade.result.Result[remoulade.actor.R]] [source]¶
Build a message. This method is useful if you want to compose actors. See the actor composition documentation for details.
- Parameters
Examples
>>> (add.message(1, 2) | add.message(3)) pipeline([add(1, 2), add(3)])
- Returns
A message that can be enqueued on a broker.
- Return type
- message_with_options(*, args: Optional[Tuple[Any, ...]] = None, kwargs: Optional[Dict[str, Any]] = None, queue_name: Optional[str] = None, **options: Any) remoulade.message.Message[remoulade.result.Result[remoulade.actor.R]] [source]¶
Build a message with an arbitrary set of processing options. This method is useful if you want to compose actors. See the actor composition documentation for details.
- Parameters
args (tuple) – Positional arguments that are passed to the actor.
kwargs (dict) – Keyword arguments that are passed to the actor.
queue_name (str) – Name of the queue to put this message into when enqueued.
**options (dict) – Arbitrary options that are passed to the broker and any registered middleware.
- Returns
A message that can be enqueued on a broker.
- Return type
- send_with_options(*, args: Optional[Tuple[Any, ...]] = None, kwargs: Optional[Dict[str, Any]] = None, queue_name: Optional[str] = None, delay: Optional[int] = None, **options: Any) remoulade.message.Message[remoulade.result.Result[remoulade.actor.R]] [source]¶
Asynchronously send a message to this actor, along with an arbitrary set of processing options for the broker and middleware.
- Parameters
args (tuple) – Positional arguments that are passed to the actor.
kwargs (dict) – Keyword arguments that are passed to the actor.
queue_name (str) – Name of the queue to enqueue this message into.
delay (int) – The minimum amount of time, in milliseconds, the message should be delayed by.
**options (dict) – Arbitrary options that are passed to the broker and any registered middleware.
- Returns
The enqueued message.
- Return type
- class remoulade.Message(*, queue_name: str, actor_name: str, args, kwargs: Dict, options: Dict[str, Any], message_id: str = _Nothing.NOTHING, message_timestamp: int = _Nothing.NOTHING)[source]¶
Encapsulates metadata about messages being sent to individual actors.
- Parameters
queue_name (str) – The name of the queue the message belogns to.
actor_name (str) – The name of the actor that will receive the message.
args (tuple) – Positional arguments that are passed to the actor.
kwargs (dict) – Keyword arguments that are passed to the actor.
options (dict) – Arbitrary options passed to the broker and middleware.
message_id (str) – A globally-unique id assigned to the actor.
message_timestamp (int) – The UNIX timestamp in milliseconds representing when the message was first enqueued.
Class-based Actors¶
- class remoulade.GenericActor[source]¶
Base-class for class-based actors.
Each subclass may define an inner class named
Meta
. You can use the meta class to provide broker options for the actor.Classes that have
abstract = True
in their meta class are considered abstract base classes and are not converted into actors. You can’t send these classes messages, you can only inherit from them. Actors that subclass abstract base classes inherit their parents’ meta classes. You can also override meta in subclass using inheritance.Example
>>> class BaseTask(GenericActor): ... class Meta: ... abstract = True ... queue_name = "tasks" ... max_retries = 20 ... ... def get_task_name(self): ... raise NotImplementedError ... ... def perform(self): ... print(f"Hello from {self.get_task_name()}!")
>>> class FooTask(BaseTask): ... def get_task_name(self): ... return "Foo"
>>> class BarTask(BaseTask): ... class Meta(BaseTask.Meta): ... max_retries = 10 ... ... def get_task_name(self): ... return "Bar"
>>> FooTask.send() >>> BarTask.send()
- logger¶
The actor’s logger.
- Type
Logger
Message Composition¶
- class remoulade.group(children: Iterable[Union[pipeline[ResultsT], Message[ResultsT]]], group_id: Optional[str] = None, cancel_on_error: bool = False)[source]¶
Run a group of actors in parallel.
- Raises
NoCancelBackend – if no cancel middleware is set
- property info: remoulade.composition.GroupInfo¶
Info used for group completion and cancel
- property results: remoulade.collection_results.CollectionResults[remoulade.composition.ResultsT]¶
CollectionResults created from this group, used for result related methods
- class remoulade.pipeline(children: Tuple[Unpack[Tuple[Union[Message[Any], pipeline[Any], group[Any]], ...]], Union[Message[ResultsT], pipeline[ResultsT]]], cancel_on_error: bool = False)[source]¶
- class remoulade.pipeline(children: Tuple[Unpack[Tuple[Union[Message[Any], pipeline[Any], group[Any]], ...]], group[ResultsT_1]], cancel_on_error: bool = False)
- class remoulade.pipeline(children: Tuple[Union[Message[Any], pipeline[Any], group[Any]], ...], cancel_on_error: bool = False)
Chain actors together, passing the result of one actor to the next one in line.
- Parameters
children (Iterable[Message|pipeline|group]) – A sequence of messages or pipelines or groups. Child pipelines are flattened into the resulting pipeline.
broker (Broker) – The broker to run the pipeline on. Defaults to the current global broker.
cancel_on_error (boolean) – True if you want to cancel all messages of a composition if on of
fails (the actor) –
middleware. (this is only possible with a Cancel) –
- build(*, last_options=None, composition_id: Optional[str] = None, cancel_on_error: bool = False)[source]¶
Build the pipeline, return the first message to be enqueued or integrated in another pipeline
Build the pipeline by starting at the end. We build a message with all it’s options in one step and we serialize it (asdict) as the previous message pipe_target in the next step.
We need to know what is the options (pipe_target) of the pipeline before building it because we cannot edit the pipeline after it has been built.
- Parameters
- Returns
the first message of the pipeline
- property result: remoulade.composition.ResultsT¶
Result of the last message/group of the pipeline
- property results: remoulade.collection_results.CollectionResults[Any]¶
CollectionResults created from this pipeline, used for result related methods
Message Encoders¶
Encoders are used to serialize and deserialize messages over the wire.
Brokers¶
- class remoulade.Broker(middleware: Optional[Iterable[Middleware]] = None)[source]¶
Base class for broker implementations.
- Parameters
middleware (list[Middleware]) – The set of middleware that apply to this broker. If you supply this parameter, you are expected to declare all middleware. Most of the time, you’ll want to use
add_middleware()
instead.
- actor_options¶
The names of all the options actors may overwrite when they are declared.
- add_middleware(middleware: Middleware) None [source]¶
- Add a middleware object to this broker. The middleware is
added to his default position.
- Parameters
middleware (Middleware) – The middleware.
- consume(queue_name: str, prefetch: int = 1, timeout: int = 30000) remoulade.broker.Consumer [source]¶
Get an iterator that consumes messages off of the queue.
- Raises
QueueNotFound – If the given queue was never declared.
- Parameters
- Returns
A message iterator.
- Return type
- declare_actor(actor: Actor) None [source]¶
Declare a new actor on this broker. Declaring an Actor twice replaces the first actor with the second by name.
- Parameters
actor (Actor) – The actor being declared.
- declare_queue(queue_name: str) None [source]¶
Declare a queue on this broker. This method must be idempotent.
- Parameters
queue_name (str) – The name of the queue being declared.
- enqueue(message: Message[Any], *, delay: Optional[int] = None) Message[Any] [source]¶
Enqueue a message on this broker.
- flush(queue_name: str) None [source]¶
Drop all the messages from a queue.
- Parameters
queue_name (str) – The name of the queue to flush.
- get_actor(actor_name: str) Actor [source]¶
Look up an actor by its name.
- Parameters
actor_name (str) – The name to look up.
- Raises
ActorNotFound – If the actor was never declared.
- Returns
The actor.
- Return type
- get_cancel_backend() remoulade.cancel.backend.CancelBackend [source]¶
Get the CancelBackend associated with the broker
- Raises
NoCancelBackend – if there is no CancelBackend
- Returns
the cancel backend
- Return type
CancelBackend
- get_result_backend() remoulade.results.backend.ResultBackend [source]¶
Get the ResultBackend associated with the broker
- Raises
NoResultBackend – if there is no ResultBackend
- Returns
the result backend
- Return type
- get_state_backend() remoulade.state.backend.StateBackend [source]¶
Get the StateBackend associated with the broker
- Raises
NoStateBackend – if there is no StateBackend
- Returns
the state backend
- Return type
StateBackend
- join(queue_name: str, *, timeout: Optional[int] = None) None [source]¶
Wait for all the messages on the given queue to be processed. This method is only meant to be used in tests to wait for all the messages in a queue to be processed.
- remove_middleware(middleware_class: Type[Middleware])[source]¶
Removes a middleware object from this broker.
- Parameters
middleware_class (Type[Middleware]) – The middleware class.
- class remoulade.Consumer[source]¶
Consumers iterate over messages on a queue.
Consumers and their MessageProxies are not thread-safe.
- __next__()[source]¶
Retrieve the next message off of the queue. This method blocks until a message becomes available.
- Returns
A transparent proxy around a Message that can be used to acknowledge or reject it once it’s done being processed.
- Return type
- ack(message)[source]¶
Acknowledge that a message has been processed, removing it from the broker.
- Parameters
message (MessageProxy) – The message to acknowledge.
- nack(message)[source]¶
Move a message to the dead-letter queue.
- Parameters
message (MessageProxy) – The message to reject.
- class remoulade.MessageProxy(message)[source]¶
Base class for messages returned by
Broker.consume()
.
- class remoulade.brokers.rabbitmq.RabbitmqBroker(*, confirm_delivery: bool = False, url: Optional[str] = None, middleware: Optional[List[Middleware]] = None, max_priority: Optional[int] = None, channel_pool_size: int = 200, dead_queue_max_length: Optional[int] = None, delivery_mode: int = 2, group_transaction: bool = False)[source]¶
A broker that can be used with RabbitMQ.
Examples
>>> RabbitmqBroker(url="amqp://guest:guest@127.0.0.1:5672")
- Parameters
confirm_delivery (bool) – Wait for RabbitMQ to confirm that messages have been committed on every call to enqueue. Defaults to False.
url (str) – The optional connection URL to use to determine which Rabbit server to connect to. If None is provided, connection is made with ‘amqp://guest:guest@localhost:5672’
middleware (list[Middleware]) – The set of middleware that apply to this broker.
max_priority (int) – Configure the queues with x-max-priority to support priority queue in RabbitMQ itself
channel_pool_size (int) – Size of the channel pool
dead_queue_max_length (int|None) – Max size of the dead queue. If None, no max size.
delivery_mode (int) – 2 (persistent) to wait for message to be flushed to disk for confirmation (safer) or 1 (transient) which don’t (faster)
group_transaction (bool) – If true, use transactions by default when running group and pipelines
- add_middleware(middleware: Middleware) None ¶
- Add a middleware object to this broker. The middleware is
added to his default position.
- Parameters
middleware (Middleware) – The middleware.
- property connection¶
amqpstorm.Connection` for the current proccess. This property may change without notice.
- Type
The
- Type
class
- consume(queue_name: str, prefetch: int = 1, timeout: int = 5000) remoulade.brokers.rabbitmq._RabbitmqConsumer [source]¶
Create a new consumer for a queue.
- declare_actor(actor: Actor) None ¶
Declare a new actor on this broker. Declaring an Actor twice replaces the first actor with the second by name.
- Parameters
actor (Actor) – The actor being declared.
- declare_queue(queue_name: str) None [source]¶
Declare a queue. Has no effect if a queue with the given name already exists.
- Parameters
queue_name (str) – The name of the new queue.
- Raises
ConnectionClosed – If the underlying channel or connection has been closed.
- enqueue(message: Message[Any], *, delay: Optional[int] = None) Message[Any] ¶
Enqueue a message on this broker.
- flush(queue_name: str) None [source]¶
Drop all the messages from a queue.
- Parameters
queue_name (str) – The queue to flush.
- get_actor(actor_name: str) Actor ¶
Look up an actor by its name.
- Parameters
actor_name (str) – The name to look up.
- Raises
ActorNotFound – If the actor was never declared.
- Returns
The actor.
- Return type
- get_cancel_backend() remoulade.cancel.backend.CancelBackend ¶
Get the CancelBackend associated with the broker
- Raises
NoCancelBackend – if there is no CancelBackend
- Returns
the cancel backend
- Return type
CancelBackend
- get_queue_message_counts(queue_name: str)[source]¶
Get the number of messages in a queue. This method is only meant to be used in unit and integration tests.
- get_result_backend() remoulade.results.backend.ResultBackend ¶
Get the ResultBackend associated with the broker
- Raises
NoResultBackend – if there is no ResultBackend
- Returns
the result backend
- Return type
- get_state_backend() remoulade.state.backend.StateBackend ¶
Get the StateBackend associated with the broker
- Raises
NoStateBackend – if there is no StateBackend
- Returns
the state backend
- Return type
StateBackend
- join(queue_name: str, min_successes: int = 10, idle_time: int = 100, *, timeout: Optional[int] = None) None [source]¶
Wait for all the messages on the given queue to be processed. This method is only meant to be used in tests to wait for all the messages in a queue to be processed.
Warning
This method doesn’t wait for unacked messages so it may not be completely reliable. Use the stub broker in your unit tests and only use this for simple integration tests.
- remove_middleware(middleware_class: Type[Middleware])¶
Removes a middleware object from this broker.
- Parameters
middleware_class (Type[Middleware]) – The middleware class.
- class remoulade.brokers.stub.StubBroker(middleware=None)[source]¶
A broker that can be used within unit tests.
- add_middleware(middleware: Middleware) None ¶
- Add a middleware object to this broker. The middleware is
added to his default position.
- Parameters
middleware (Middleware) – The middleware.
- close()¶
Close this broker and perform any necessary cleanup actions.
- consume(queue_name, prefetch=1, timeout=100)[source]¶
Create a new consumer for a queue.
- Parameters
- Raises
QueueNotFound – If the queue hasn’t been declared.
- Returns
A consumer that retrieves messages from Redis.
- Return type
- declare_actor(actor: Actor) None ¶
Declare a new actor on this broker. Declaring an Actor twice replaces the first actor with the second by name.
- Parameters
actor (Actor) – The actor being declared.
- declare_queue(queue_name)[source]¶
Declare a queue. Has no effect if a queue with the given name has already been declared.
- Parameters
queue_name (str) – The name of the new queue.
- enqueue(message: Message[Any], *, delay: Optional[int] = None) Message[Any] ¶
Enqueue a message on this broker.
- flush(queue_name)[source]¶
Drop all the messages from a queue.
- Parameters
queue_name (str) – The queue to flush.
- get_actor(actor_name: str) Actor ¶
Look up an actor by its name.
- Parameters
actor_name (str) – The name to look up.
- Raises
ActorNotFound – If the actor was never declared.
- Returns
The actor.
- Return type
- get_cancel_backend() remoulade.cancel.backend.CancelBackend ¶
Get the CancelBackend associated with the broker
- Raises
NoCancelBackend – if there is no CancelBackend
- Returns
the cancel backend
- Return type
CancelBackend
- get_result_backend() remoulade.results.backend.ResultBackend ¶
Get the ResultBackend associated with the broker
- Raises
NoResultBackend – if there is no ResultBackend
- Returns
the result backend
- Return type
- get_state_backend() remoulade.state.backend.StateBackend ¶
Get the StateBackend associated with the broker
- Raises
NoStateBackend – if there is no StateBackend
- Returns
the state backend
- Return type
StateBackend
- join(queue_name, *, timeout=None)[source]¶
Wait for all the messages on the given queue to be processed. This method is only meant to be used in tests to wait for all the messages in a queue to be processed.
- Raises
QueueJoinTimeout – When the timeout elapses.
QueueNotFound – If the given queue was never declared.
- Parameters
- remove_middleware(middleware_class: Type[Middleware])¶
Removes a middleware object from this broker.
- Parameters
middleware_class (Type[Middleware]) – The middleware class.
Middleware¶
The following middleware are all enabled by default.
- class remoulade.Middleware[source]¶
Base class for broker middleware. The default implementations for all hooks are no-ops and subclasses may implement whatever subset of hooks they like.
- property actor_options¶
The set of options that may be configured on each actor.
- after_declare_delay_queue(broker, queue_name)[source]¶
Called after a delay queue has been declared.
- after_enqueue(broker, message, delay, exception=None)[source]¶
Called after a message has been enqueued.
- before_delay_message(broker, message)[source]¶
Called before a message has been delayed in worker memory.
- before_process_message(broker, message)[source]¶
Called before a message is processed.
- Raises
SkipMessage – If the current message should be skipped. When this is raised,
after_skip_message
is emitted instead ofafter_process_message
.
- after_process_message(broker, message, *, result=None, exception=None)[source]¶
Called after a message has been processed.
- after_worker_thread_process_message(broker, thread)[source]¶
Called after a worker thread has finished processing a message
- after_skip_message(broker, message)[source]¶
Called instead of
after_process_message
after a message has been skippped.
- after_message_canceled(broker, message)[source]¶
Called instead of
after_process_message
after a message has been canceled.
- before_consumer_thread_shutdown(broker, thread)[source]¶
Called before a consumer thread shuts down. This may be used to clean up thread-local resources (such as Django database connections).
There is no
after_consumer_thread_boot
.
- before_worker_thread_shutdown(broker, thread)[source]¶
Called before a worker thread shuts down. This may be used to clean up thread-local resources (such as Django database connections).
There is no
after_worker_thread_boot
.
- after_enqueue_pipe_target(broker, group_info)[source]¶
Called after the pipe target of a message has been enqueued
- before_build_group_pipeline(broker, group_id, message_ids)[source]¶
Called before a group in a group pipeline is enqueued
- class remoulade.middleware.AgeLimit(*, max_age=None)[source]¶
Middleware that drops messages that have been in the queue for too long.
- Parameters
max_age (int) – The default message age limit in millseconds. Defaults to
None
, meaning that messages can exist indefinitely.
- class remoulade.middleware.CatchError[source]¶
Middleware that lets you enqueue another actor or message on message failure.
- class remoulade.middleware.Pipelines[source]¶
Middleware that lets you pipe actors together so that the output of one actor feeds into the input of another.
- class remoulade.middleware.Prometheus(*, http_host='127.0.0.1', http_port=9191, registry=None, use_default_label=False)[source]¶
A middleware that exports stats via Prometheus.
- Parameters
http_host (str) – The host to bind the Prometheus exposition server on. This parameter can also be configured via the
remoulade_prom_host
environment variable.http_port (int) – The port on which the server should listen. This parameter can also be configured via the
remoulade_prom_port
environment variable.registry (CollectorRegistry) – the prometheus registry to use, if None, use a new registry.
- class remoulade.middleware.Retries(*, max_retries: Optional[int] = None, min_backoff: Optional[int] = None, max_backoff: Optional[int] = None, retry_when: Optional[Callable[[int, Exception], bool]] = None, backoff_strategy: typing_extensions.Literal[constant, linear, spread_linear, exponential, spread_exponential] = 'exponential', jitter: bool = True, increase_priority_on_retry: bool = False)[source]¶
Middleware that automatically retries failed tasks with exponential backoff.
- Parameters
max_retries (int) – The maximum number of times tasks can be retried.
min_backoff (int) – The minimum amount of backoff milliseconds to apply to retried tasks. Defaults to 15 seconds.
max_backoff (int) – The maximum amount of backoff milliseconds to apply to retried tasks. Defaults to 7 days.
retry_when (Callable[[int, Exception], bool]) – An optional predicate that can be used to programmatically determine whether a task should be retried or not. This takes precedence over max_retries when set.
increase_priority_on_retry (bool) – specifies wether to increase priority of message when retried. Default is False.
- class remoulade.middleware.ShutdownNotifications(notify_shutdown=False)[source]¶
Middleware that interrupts actors whose worker process has been signaled for termination. Currently, this is only available on CPython.
Note
This works by setting an async exception in the worker thread that runs the actor. This means that the exception will only get called the next time that thread acquires the GIL. Concretely, this means that this middleware can’t cancel system calls.
- Parameters
notify_shutdown (bool) – When true, the actor will be interrupted if the worker process was terminated.
- class remoulade.middleware.TimeLimit(*, time_limit: int = 1800000, interval: int = 1000, exit_delay: Optional[int] = None)[source]¶
Middleware that cancels actors that run for too long. Currently, this is only available on CPython.
Note
This works by setting an async exception in the worker thread that runs the actor. This means that the exception will only get called the next time that thread acquires the GIL. Concretely, this means that this middleware can’t cancel system calls.
- Parameters
time_limit (int) – The maximum number of milliseconds actors may run for.
interval (int) – The interval (in milliseconds) with which to check for actors that have exceeded the limit.
exit_delay (int) – The delay (in milliseconds) after with we stop (SystemExit) to the worker if the exception failed to stop the message (ie. system calls). None to disable, disabled by default.
Middleware Errors¶
The class hierarchy for middleware exceptions:
BaseException
+-- Exception
| +-- remoulade.middleware.MiddlewareError
| +-- remoulade.middleware.SkipMessage
+-- remoulade.middleware.Interrupt
+-- remoulade.middleware.Shutdown
+-- remoulade.middleware.TimeLimitExceeded
- class remoulade.middleware.SkipMessage[source]¶
An exception that may be raised by Middleware inside the
before_process_message
hook in order to skip a message.
- class remoulade.middleware.Interrupt[source]¶
Base class for exceptions used to asynchronously interrupt a thread’s execution. An actor may catch these exceptions in order to respond gracefully, such as performing any necessary cleanup.
This is not a subclass of
RemouladeError
to avoid it being caught unintentionally.
Results¶
Actor results can be stored and retrieved by leveraging result backends and the results middleware. Results and result backends are not enabled by default and you should avoid using them until you have a really good use case. Most of the time you can get by with actors simply updating data in your database instead of using results.
Result objects¶
- class remoulade.result.Result(*, message_id: str)[source]¶
Encapsulates metadata needed to retrieve the result of a message
- Parameters
message_id (str) – The id of the message sent to the broker.
- class remoulade.collection_results.CollectionResults(children: Iterable[ResultT])[source]¶
Result of a group or pipeline, having result related methods
- Parameters
children (List[Result|CollectionResults]) – A sequence of results of messages, groups or pipelines.
Result Middleware¶
- class remoulade.results.Results(*, backend=None, store_results=False, result_ttl=None)[source]¶
Middleware that automatically stores actor results.
Example
>>> from remoulade.results import Results >>> from remoulade.results.backends import RedisBackend >>> backend = RedisBackend() >>> broker.add_middleware(Results(backend=backend))
>>> @remoulade.actor(store_results=True) ... def add(x, y): ... return x + y
>>> broker.declare_actor(add) >>> message = add.send(1, 2) >>> message.result.get() 3
- Parameters
backend (ResultBackend) – The result backend to use when storing results.
store_results (bool) – Whether or not actor results should be stored. Defaults to False and can be set on a per-actor basis.
result_ttl (int) – The maximum number of milliseconds results are allowed to exist in the backend. Defaults to 10 minutes and can be set on a per-actor basis.
Backends¶
- class remoulade.results.ResultBackend(*, namespace: str = 'remoulade-results', encoder: Optional[remoulade.encoder.Encoder] = None, default_timeout: Optional[int] = None)[source]¶
ABC for result backends.
- Parameters
namespace (str) – The logical namespace under which the data should be stored.
encoder (Encoder) – The encoder to use when storing and retrieving result data. Defaults to
JSONEncoder
.
- class remoulade.results.backends.RedisBackend(*, namespace='remoulade-results', encoder=None, client=None, url=None, default_timeout=None, max_retries=3, min_backoff=500, max_backoff=5000, backoff_strategy: typing_extensions.Literal[constant, linear, spread_linear, exponential, spread_exponential] = 'spread_exponential', **parameters)[source]¶
A result backend for Redis. This is the recommended result backend as waiting for a result is resource efficient.
- Parameters
namespace (str) – A string with which to prefix result keys.
encoder (Encoder) – The encoder to use when storing and retrieving result data. Defaults to
JSONEncoder
.client (Redis) – An optional client. If this is passed, then all other parameters are ignored.
url (str) – An optional connection URL. If both a URL and connection paramters are provided, the URL is used.
**parameters (dict) – Connection parameters are passed directly to
redis.Redis
.
- class remoulade.results.backends.StubBackend(*, namespace: str = 'remoulade-results', encoder: Optional[remoulade.encoder.Encoder] = None, default_timeout: Optional[int] = None)[source]¶
An in-memory result backend. For use in unit tests.
- Parameters
namespace (str) – A string with which to prefix result keys.
encoder (Encoder) – The encoder to use when storing and retrieving result data. Defaults to
JSONEncoder
.
Rate Limiters¶
Rate limiters can be used to determine whether or not an operation can be run at the current time across many processes and machines by using a shared storage backend.
Rate Limiter Backends¶
Rate limiter backends are used to store metadata about rate limits.
Limiters¶
- class remoulade.rate_limits.RateLimiter(backend, key)[source]¶
ABC for rate limiters.
Examples
>>> from remoulade.rate_limits.backends import RedisBackend
>>> backend = RedisBackend() >>> limiter = ConcurrentRateLimiter(backend, "distributed-mutex", limit=1)
>>> with limiter.acquire(raise_on_failure=False) as acquired: ... if not acquired: ... print("Mutex not acquired.") ... return ... ... print("Mutex acquired.")
- Parameters
backend (RateLimiterBackend) – The rate limiting backend to use.
key (str) – The key to rate limit on.
- class remoulade.rate_limits.BucketRateLimiter(backend, key, *, limit=1, bucket=1000)[source]¶
A rate limiter that ensures that only up to limit operations may happen over some time interval.
Examples
Up to 10 operations every second:
>>> BucketRateLimiter(backend, "some-key", limit=10, bucket=1_000)
Up to 1 operation every minute:
>>> BucketRateLimiter(backend, "some-key", limit=1, bucket=60_000)
Warning
Bucket rate limits are cheap to maintain but are susceptible to burst “attacks”. Given a bucket rate limit of 100 per minute, an attacker could make a burst of 100 calls in the last second of a minute and then another 100 calls in the first second of the subsequent minute.
For a rate limiter that doesn’t have this problem (but is more expensive to maintain), see
WindowRateLimiter
.- Parameters
backend (RateLimiterBackend) – The backend to use.
key (str) – The key to rate limit on.
limit (int) – The maximum number of operations per bucket per key.
bucket (int) – The bucket interval in milliseconds.
- class remoulade.rate_limits.ConcurrentRateLimiter(backend, key, *, limit=1, ttl=900000)[source]¶
A rate limiter that ensures that only limit concurrent operations may happen at the same time.
Note
You can use a concurrent rate limiter of size 1 to get a distributed mutex.
- Parameters
backend (RateLimiterBackend) – The backend to use.
key (str) – The key to rate limit on.
limit (int) – The maximum number of concurrent operations per key.
ttl (int) – The time in milliseconds that keys may live for.
- class remoulade.rate_limits.WindowRateLimiter(backend, key, *, limit=1, window=1)[source]¶
A rate limiter that ensures that only limit operations may happen over some sliding window.
Note
Windows are in seconds rather that milliseconds. This is different from most durations and intervals used in Remoulade, because keeping metadata at the millisecond level is far too expensive for most use cases.
- Parameters
backend (RateLimiterBackend) – The backend to use.
key (str) – The key to rate limit on.
limit (int) – The maximum number of operations per window per key.
window (int) – The window size in seconds. The wider the window, the more expensive it is to maintain.
Workers¶
- class remoulade.Worker(broker, *, queues=None, worker_timeout=1000, worker_threads=8, prefetch_multiplier=2)[source]¶
Workers consume messages off of all declared queues and distribute those messages to individual worker threads for processing. Workers don’t block the current thread so it’s up to the caller to keep it alive.
Don’t run more than one Worker per process.
- Parameters
broker (Broker) –
queues (Set[str]) – An optional subset of queues to listen on. By default, if this is not provided, the worker will listen on all declared queues.
worker_timeout (int) – The number of milliseconds workers should wake up after if the queue is idle.
worker_threads (int) – The number of worker threads to spawn.
prefetch_multiplier (int) – The number of message to prefetch at a time, to be multiplied with the number of threads
Errors¶
- class remoulade.ActorNotFound(message: str)[source]¶
Raised when a message is sent to an actor that hasn’t been declared.
- class remoulade.QueueNotFound(message: str)[source]¶
Raised when a message is sent to an queue that hasn’t been declared.
- class remoulade.ConnectionError(message: str)[source]¶
Base class for broker connection-related errors.
- class remoulade.ConnectionClosed(message: str)[source]¶
Raised when a broker connection is suddenly closed.
- class remoulade.ConnectionFailed(message: str)[source]¶
Raised when a broker connection could not be opened.
- class remoulade.RateLimitExceeded(message: str)[source]¶
Raised when a rate limit has been exceeded.