API Reference

Functions

remoulade.get_broker() remoulade.broker.Broker[source]

Get the global broker instance. If no global broker is set, this initializes a RabbitmqBroker and returns it.

Returns

The default Broker.

Return type

Broker

Raises

ValueError if no broker is set

remoulade.set_broker(broker: remoulade.broker.Broker) None[source]

Configure the global broker instance.

Parameters

broker (Broker) – The broker instance to use by default.

remoulade.get_encoder() remoulade.encoder.Encoder[source]

Get the global encoder object.

Returns

Encoder

remoulade.set_encoder(encoder: remoulade.encoder.Encoder) None[source]

Set the global encoder object.

Parameters

encoder (Encoder) – The encoder instance to use when serializing messages.

Actors & Messages

remoulade.actor(fn: typing_extensions.Literal[None] = None, *, actor_name: Optional[str] = 'None', queue_name: str = "'default'", alternative_queues: Optional[List[str]] = 'None', priority: int = '0', **options: Any) Callable[[Callable[P, R]], Actor[P, R]][source]
remoulade.actor(fn: Callable[[remoulade.actor.P], remoulade.actor.R], *, actor_name: Optional[str] = 'None', queue_name: str = "'default'", alternative_queues: Optional[List[str]] = 'None', priority: int = '0', **options: Any) Actor[P, R]

Declare an actor.

Examples

>>> import remoulade
>>> @remoulade.actor
... def add(x, y):
...   print(x + y)
...
>>> add
Actor(<function add at 0x106c6d488>, queue_name='default', actor_name='add')

You need to declare an actor before using it >>> remoulade.declare_actors([add]) None

>>> add(1, 2)
3
>>> add.send(1, 2)
Message(
  queue_name='default',
  actor_name='add',
  args=(1, 2), kwargs={}, options={},
  message_id='e0d27b45-7900-41da-bb97-553b8a081206',
  message_timestamp=1497862448685)
Parameters
  • fn (callable) – The function to wrap.

  • actor_name (str) – The name of the actor.

  • queue_name (str) – The name of the default queue to use.

  • alternative_queues (List[str]) – The names of other queues this actor may be enqueued into.

  • priority (int) – The actor’s global priority. If two tasks have been pulled on a worker concurrently and one has a higher priority than the other then it will be processed first. Higher numbers represent higher priorities.

  • **options (dict) – Arbitrary options that vary with the set of middleware that you use. See get_broker().actor_options.

Returns

The decorated function.

Return type

Actor

class remoulade.Actor(fn: Callable[[remoulade.actor.P], remoulade.actor.R], *, actor_name: str, queue_name: str, alternative_queues: Optional[List[str]], priority: int, options: Any)[source]

Thin wrapper around callables that stores metadata about how they should be executed asynchronously. Actors are callable.

logger

The actor’s logger.

Type

Logger

fn

The underlying callable.

Type

callable

broker

The broker this actor is bound to.

Type

Broker

actor_name

The actor’s name.

Type

str

queue_name

The actor’s default queue.

Type

str

alternative_queues

The names of other queues this actor may be enqueued into.

Type

List[str]

priority

The actor’s priority.

Type

int

options

Arbitrary options that are passed to the broker and middleware.

Type

dict

message(*args: Any, **kwargs: Any) remoulade.message.Message[remoulade.result.Result[remoulade.actor.R]][source]

Build a message. This method is useful if you want to compose actors. See the actor composition documentation for details.

Parameters
  • *args (tuple) – Positional arguments to send to the actor.

  • **kwargs (dict) – Keyword arguments to send to the actor.

Examples

>>> (add.message(1, 2) | add.message(3))
pipeline([add(1, 2), add(3)])
Returns

A message that can be enqueued on a broker.

Return type

Message

message_with_options(*, args: Optional[Tuple[Any, ...]] = None, kwargs: Optional[Dict[str, Any]] = None, queue_name: Optional[str] = None, **options: Any) remoulade.message.Message[remoulade.result.Result[remoulade.actor.R]][source]

Build a message with an arbitrary set of processing options. This method is useful if you want to compose actors. See the actor composition documentation for details.

Parameters
  • args (tuple) – Positional arguments that are passed to the actor.

  • kwargs (dict) – Keyword arguments that are passed to the actor.

  • queue_name (str) – Name of the queue to put this message into when enqueued.

  • **options (dict) – Arbitrary options that are passed to the broker and any registered middleware.

Returns

A message that can be enqueued on a broker.

Return type

Message

send()[source]

Asynchronously send a message to this actor.

Parameters
  • *args (tuple) – Positional arguments to send to the actor.

  • **kwargs (dict) – Keyword arguments to send to the actor.

Returns

The enqueued message.

Return type

Message

send_with_options(*, args: Optional[Tuple[Any, ...]] = None, kwargs: Optional[Dict[str, Any]] = None, queue_name: Optional[str] = None, delay: Optional[int] = None, **options: Any) remoulade.message.Message[remoulade.result.Result[remoulade.actor.R]][source]

Asynchronously send a message to this actor, along with an arbitrary set of processing options for the broker and middleware.

Parameters
  • args (tuple) – Positional arguments that are passed to the actor.

  • kwargs (dict) – Keyword arguments that are passed to the actor.

  • queue_name (str) – Name of the queue to enqueue this message into.

  • delay (int) – The minimum amount of time, in milliseconds, the message should be delayed by.

  • **options (dict) – Arbitrary options that are passed to the broker and any registered middleware.

Returns

The enqueued message.

Return type

Message

class remoulade.Message(*, queue_name: str, actor_name: str, args, kwargs: Dict, options: Dict[str, Any], message_id: str = _Nothing.NOTHING, message_timestamp: int = _Nothing.NOTHING)[source]

Encapsulates metadata about messages being sent to individual actors.

Parameters
  • queue_name (str) – The name of the queue the message belogns to.

  • actor_name (str) – The name of the actor that will receive the message.

  • args (tuple) – Positional arguments that are passed to the actor.

  • kwargs (dict) – Keyword arguments that are passed to the actor.

  • options (dict) – Arbitrary options passed to the broker and middleware.

  • message_id (str) – A globally-unique id assigned to the actor.

  • message_timestamp (int) – The UNIX timestamp in milliseconds representing when the message was first enqueued.

asdict()[source]

Convert this message to a dictionary.

build(options: Dict[str, Any])[source]

Build message for pipeline

cancel() None[source]

Mark a message as canceled

copy(**attributes)[source]

Create a copy of this message.

classmethod decode(data)[source]

Convert a bytestring to a message.

encode()[source]

Convert this message to a bytestring.

set_progress(progress: float) None[source]

Set the progress of the message. progress(float) number between 0 and 1 inclusive

Raises

InvalidProgress: when not( 0 <= progress <= 1)

Class-based Actors

class remoulade.GenericActor[source]

Base-class for class-based actors.

Each subclass may define an inner class named Meta. You can use the meta class to provide broker options for the actor.

Classes that have abstract = True in their meta class are considered abstract base classes and are not converted into actors. You can’t send these classes messages, you can only inherit from them. Actors that subclass abstract base classes inherit their parents’ meta classes. You can also override meta in subclass using inheritance.

Example

>>> class BaseTask(GenericActor):
...   class Meta:
...     abstract = True
...     queue_name = "tasks"
...     max_retries = 20
...
...   def get_task_name(self):
...     raise NotImplementedError
...
...   def perform(self):
...     print(f"Hello from {self.get_task_name()}!")
>>> class FooTask(BaseTask):
...   def get_task_name(self):
...     return "Foo"
>>> class BarTask(BaseTask):
...   class Meta(BaseTask.Meta):
...     max_retries = 10
...
...   def get_task_name(self):
...     return "Bar"
>>> FooTask.send()
>>> BarTask.send()
logger

The actor’s logger.

Type

Logger

broker

The broker this actor is bound to.

Type

Broker

actor_name

The actor’s name.

Type

str

queue_name

The actor’s queue.

Type

str

priority

The actor’s priority.

Type

int

options

Arbitrary options that are passed to the broker and middleware.

Type

dict

perform(*args, **kwargs)[source]

This is the method that gets called when the actor receives a message. All non-abstract subclasses must implement this method.

Message Composition

class remoulade.group(children: Iterable[Union[pipeline[ResultsT], Message[ResultsT]]], group_id: Optional[str] = None, cancel_on_error: bool = False)[source]

Run a group of actors in parallel.

Parameters

children (Iterable[Message|pipeline]) – A sequence of messages or pipelines.

children
Type

List[Message|pipeline]

Raises

NoCancelBackend – if no cancel middleware is set

build(options=None) List[Message][source]

Build group for pipeline

cancel() None[source]

Mark all the children as cancelled

property info: remoulade.composition.GroupInfo

Info used for group completion and cancel

property results: remoulade.collection_results.CollectionResults[remoulade.composition.ResultsT]

CollectionResults created from this group, used for result related methods

run(*, delay: Optional[int] = None, transaction: Optional[bool] = None) typing_extensions.Self[source]

Run the actors in this group.

Parameters
  • delay (int) – The minimum amount of time, in milliseconds, each message in the group should be delayed by.

  • transaction (bool) –

class remoulade.pipeline(children: Tuple[Unpack[Tuple[Union[Message[Any], pipeline[Any], group[Any]], ...]], Union[Message[ResultsT], pipeline[ResultsT]]], cancel_on_error: bool = False)[source]
class remoulade.pipeline(children: Tuple[Unpack[Tuple[Union[Message[Any], pipeline[Any], group[Any]], ...]], group[ResultsT_1]], cancel_on_error: bool = False)
class remoulade.pipeline(children: Tuple[Union[Message[Any], pipeline[Any], group[Any]], ...], cancel_on_error: bool = False)

Chain actors together, passing the result of one actor to the next one in line.

Parameters
  • children (Iterable[Message|pipeline|group]) – A sequence of messages or pipelines or groups. Child pipelines are flattened into the resulting pipeline.

  • broker (Broker) – The broker to run the pipeline on. Defaults to the current global broker.

  • cancel_on_error (boolean) – True if you want to cancel all messages of a composition if on of

  • fails (the actor) –

  • middleware. (this is only possible with a Cancel) –

children
Type

List[Message|group]

build(*, last_options=None, composition_id: Optional[str] = None, cancel_on_error: bool = False)[source]

Build the pipeline, return the first message to be enqueued or integrated in another pipeline

Build the pipeline by starting at the end. We build a message with all it’s options in one step and we serialize it (asdict) as the previous message pipe_target in the next step.

We need to know what is the options (pipe_target) of the pipeline before building it because we cannot edit the pipeline after it has been built.

Parameters
  • cancel_on_error (bool) – Whether the whole composition should be canceled when one of its messages fails

  • composition_id (str) – The composition id to pass to messages

  • last_options (dict) – options to be assigned to the last actor of the pipeline (ex: pipe_target)

Returns

the first message of the pipeline

cancel() None[source]

Mark all the children as cancelled

property result: remoulade.composition.ResultsT

Result of the last message/group of the pipeline

property results: remoulade.collection_results.CollectionResults[Any]

CollectionResults created from this pipeline, used for result related methods

run(*, delay: Optional[int] = None, transaction: Optional[bool] = None) typing_extensions.Self[source]

Run this pipeline.

Parameters

delay (int) – The minimum amount of time, in milliseconds, the pipeline should be delayed by.

Returns

Itself.

Return type

pipeline

Message Encoders

Encoders are used to serialize and deserialize messages over the wire.

class remoulade.Encoder[source]

Base class for message encoders.

abstract decode(data: bytes) Dict[str, Any][source]

Convert a bytestring into message metadata.

abstract encode(data: Dict[str, Any]) bytes[source]

Convert message metadata into a bytestring.

class remoulade.JSONEncoder[source]

Encodes messages as JSON. This is the default encoder.

class remoulade.PickleEncoder[source]

Pickles messages.

Warning

This encoder is not secure against maliciously-constructed data. Use it at your own risk.

Brokers

class remoulade.Broker(middleware: Optional[Iterable[Middleware]] = None)[source]

Base class for broker implementations.

Parameters

middleware (list[Middleware]) – The set of middleware that apply to this broker. If you supply this parameter, you are expected to declare all middleware. Most of the time, you’ll want to use add_middleware() instead.

actor_options

The names of all the options actors may overwrite when they are declared.

Type

set[str]

add_middleware(middleware: Middleware) None[source]
Add a middleware object to this broker. The middleware is

added to his default position.

Parameters

middleware (Middleware) – The middleware.

close()[source]

Close this broker and perform any necessary cleanup actions.

consume(queue_name: str, prefetch: int = 1, timeout: int = 30000) remoulade.broker.Consumer[source]

Get an iterator that consumes messages off of the queue.

Raises

QueueNotFound – If the given queue was never declared.

Parameters
  • queue_name (str) – The name of the queue to consume messages off of.

  • prefetch (int) – The number of messages to prefetch per consumer.

  • timeout (int) – The amount of time in milliseconds to idle for.

Returns

A message iterator.

Return type

Consumer

declare_actor(actor: Actor) None[source]

Declare a new actor on this broker. Declaring an Actor twice replaces the first actor with the second by name.

Parameters

actor (Actor) – The actor being declared.

declare_queue(queue_name: str) None[source]

Declare a queue on this broker. This method must be idempotent.

Parameters

queue_name (str) – The name of the queue being declared.

enqueue(message: Message[Any], *, delay: Optional[int] = None) Message[Any][source]

Enqueue a message on this broker.

Parameters
  • message (Message) – The message to enqueue.

  • delay (int) – The number of milliseconds to delay the message for.

Returns

Either the original message or a copy of it.

Return type

Message

flush(queue_name: str) None[source]

Drop all the messages from a queue.

Parameters

queue_name (str) – The name of the queue to flush.

flush_all() None[source]

Drop all messages from all declared queues.

get_actor(actor_name: str) Actor[source]

Look up an actor by its name.

Parameters

actor_name (str) – The name to look up.

Raises

ActorNotFound – If the actor was never declared.

Returns

The actor.

Return type

Actor

get_cancel_backend() remoulade.cancel.backend.CancelBackend[source]

Get the CancelBackend associated with the broker

Raises

NoCancelBackend – if there is no CancelBackend

Returns

the cancel backend

Return type

CancelBackend

get_declared_actors() Set[str][source]

Get all declared actors.

Returns

The names of all the actors declared so far on this Broker.

Return type

set[str]

get_declared_delay_queues() Set[str][source]

Get all declared delay queues.

Returns

The names of all the delay queues declared so far on this Broker.

Return type

set[str]

get_declared_queues() Set[str][source]

Get all declared queues.

Returns

The names of all the queues declared so far on this Broker.

Return type

set[str]

get_result_backend() remoulade.results.backend.ResultBackend[source]

Get the ResultBackend associated with the broker

Raises

NoResultBackend – if there is no ResultBackend

Returns

the result backend

Return type

ResultBackend

get_state_backend() remoulade.state.backend.StateBackend[source]

Get the StateBackend associated with the broker

Raises

NoStateBackend – if there is no StateBackend

Returns

the state backend

Return type

StateBackend

join(queue_name: str, *, timeout: Optional[int] = None) None[source]

Wait for all the messages on the given queue to be processed. This method is only meant to be used in tests to wait for all the messages in a queue to be processed.

remove_middleware(middleware_class: Type[Middleware])[source]

Removes a middleware object from this broker.

Parameters

middleware_class (Type[Middleware]) – The middleware class.

class remoulade.Consumer[source]

Consumers iterate over messages on a queue.

Consumers and their MessageProxies are not thread-safe.

__iter__()[source]

Returns this instance as a Message iterator.

__next__()[source]

Retrieve the next message off of the queue. This method blocks until a message becomes available.

Returns

A transparent proxy around a Message that can be used to acknowledge or reject it once it’s done being processed.

Return type

MessageProxy

ack(message)[source]

Acknowledge that a message has been processed, removing it from the broker.

Parameters

message (MessageProxy) – The message to acknowledge.

close()[source]

Close this consumer and perform any necessary cleanup actions.

nack(message)[source]

Move a message to the dead-letter queue.

Parameters

message (MessageProxy) – The message to reject.

class remoulade.MessageProxy(message)[source]

Base class for messages returned by Broker.consume().

fail()[source]

Mark this message for rejection.

class remoulade.brokers.rabbitmq.RabbitmqBroker(*, confirm_delivery: bool = False, url: Optional[str] = None, middleware: Optional[List[Middleware]] = None, max_priority: Optional[int] = None, channel_pool_size: int = 200, dead_queue_max_length: Optional[int] = None, delivery_mode: int = 2, group_transaction: bool = False)[source]

A broker that can be used with RabbitMQ.

Examples

>>> RabbitmqBroker(url="amqp://guest:guest@127.0.0.1:5672")
Parameters
  • confirm_delivery (bool) – Wait for RabbitMQ to confirm that messages have been committed on every call to enqueue. Defaults to False.

  • url (str) – The optional connection URL to use to determine which Rabbit server to connect to. If None is provided, connection is made with ‘amqp://guest:guest@localhost:5672’

  • middleware (list[Middleware]) – The set of middleware that apply to this broker.

  • max_priority (int) – Configure the queues with x-max-priority to support priority queue in RabbitMQ itself

  • channel_pool_size (int) – Size of the channel pool

  • dead_queue_max_length (int|None) – Max size of the dead queue. If None, no max size.

  • delivery_mode (int) – 2 (persistent) to wait for message to be flushed to disk for confirmation (safer) or 1 (transient) which don’t (faster)

  • group_transaction (bool) – If true, use transactions by default when running group and pipelines

add_middleware(middleware: Middleware) None
Add a middleware object to this broker. The middleware is

added to his default position.

Parameters

middleware (Middleware) – The middleware.

close() None[source]

Close all open RabbitMQ connections.

property connection

amqpstorm.Connection` for the current proccess. This property may change without notice.

Type

The

Type

class

consume(queue_name: str, prefetch: int = 1, timeout: int = 5000) remoulade.brokers.rabbitmq._RabbitmqConsumer[source]

Create a new consumer for a queue.

Parameters
  • queue_name (str) – The queue to consume.

  • prefetch (int) – The number of messages to prefetch.

  • timeout (int) – The idle timeout in milliseconds.

Returns

A consumer that retrieves messages from RabbitMQ.

Return type

Consumer

declare_actor(actor: Actor) None

Declare a new actor on this broker. Declaring an Actor twice replaces the first actor with the second by name.

Parameters

actor (Actor) – The actor being declared.

declare_queue(queue_name: str) None[source]

Declare a queue. Has no effect if a queue with the given name already exists.

Parameters

queue_name (str) – The name of the new queue.

Raises

ConnectionClosed – If the underlying channel or connection has been closed.

enqueue(message: Message[Any], *, delay: Optional[int] = None) Message[Any]

Enqueue a message on this broker.

Parameters
  • message (Message) – The message to enqueue.

  • delay (int) – The number of milliseconds to delay the message for.

Returns

Either the original message or a copy of it.

Return type

Message

flush(queue_name: str) None[source]

Drop all the messages from a queue.

Parameters

queue_name (str) – The queue to flush.

flush_all() None[source]

Drop all messages from all declared queues.

get_actor(actor_name: str) Actor

Look up an actor by its name.

Parameters

actor_name (str) – The name to look up.

Raises

ActorNotFound – If the actor was never declared.

Returns

The actor.

Return type

Actor

get_cancel_backend() remoulade.cancel.backend.CancelBackend

Get the CancelBackend associated with the broker

Raises

NoCancelBackend – if there is no CancelBackend

Returns

the cancel backend

Return type

CancelBackend

get_declared_actors() Set[str]

Get all declared actors.

Returns

The names of all the actors declared so far on this Broker.

Return type

set[str]

get_declared_delay_queues() Set[str]

Get all declared delay queues.

Returns

The names of all the delay queues declared so far on this Broker.

Return type

set[str]

get_declared_queues() Set[str]

Get all declared queues.

Returns

The names of all the queues declared so far on this Broker.

Return type

set[str]

get_queue_message_counts(queue_name: str)[source]

Get the number of messages in a queue. This method is only meant to be used in unit and integration tests.

Parameters

queue_name (str) – The queue whose message counts to get.

Returns

A triple representing the number of messages in the queue, its delayed queue and its dead letter queue.

Return type

tuple

get_result_backend() remoulade.results.backend.ResultBackend

Get the ResultBackend associated with the broker

Raises

NoResultBackend – if there is no ResultBackend

Returns

the result backend

Return type

ResultBackend

get_state_backend() remoulade.state.backend.StateBackend

Get the StateBackend associated with the broker

Raises

NoStateBackend – if there is no StateBackend

Returns

the state backend

Return type

StateBackend

join(queue_name: str, min_successes: int = 10, idle_time: int = 100, *, timeout: Optional[int] = None) None[source]

Wait for all the messages on the given queue to be processed. This method is only meant to be used in tests to wait for all the messages in a queue to be processed.

Warning

This method doesn’t wait for unacked messages so it may not be completely reliable. Use the stub broker in your unit tests and only use this for simple integration tests.

Parameters
  • queue_name (str) – The queue to wait on.

  • min_successes (int) – The minimum number of times all the polled queues should be empty.

  • idle_time (int) – The number of milliseconds to wait between counts.

  • timeout (Optional[int]) – The max amount of time, in milliseconds, to wait on this queue.

remove_middleware(middleware_class: Type[Middleware])

Removes a middleware object from this broker.

Parameters

middleware_class (Type[Middleware]) – The middleware class.

class remoulade.brokers.stub.StubBroker(middleware=None)[source]

A broker that can be used within unit tests.

dead_letters

Contains the dead-lettered messages for all defined queues.

Type

list[Message]

add_middleware(middleware: Middleware) None
Add a middleware object to this broker. The middleware is

added to his default position.

Parameters

middleware (Middleware) – The middleware.

close()

Close this broker and perform any necessary cleanup actions.

consume(queue_name, prefetch=1, timeout=100)[source]

Create a new consumer for a queue.

Parameters
  • queue_name (str) – The queue to consume.

  • prefetch (int) – The number of messages to prefetch.

  • timeout (int) – The idle timeout in milliseconds.

Raises

QueueNotFound – If the queue hasn’t been declared.

Returns

A consumer that retrieves messages from Redis.

Return type

Consumer

declare_actor(actor: Actor) None

Declare a new actor on this broker. Declaring an Actor twice replaces the first actor with the second by name.

Parameters

actor (Actor) – The actor being declared.

declare_queue(queue_name)[source]

Declare a queue. Has no effect if a queue with the given name has already been declared.

Parameters

queue_name (str) – The name of the new queue.

enqueue(message: Message[Any], *, delay: Optional[int] = None) Message[Any]

Enqueue a message on this broker.

Parameters
  • message (Message) – The message to enqueue.

  • delay (int) – The number of milliseconds to delay the message for.

Returns

Either the original message or a copy of it.

Return type

Message

flush(queue_name)[source]

Drop all the messages from a queue.

Parameters

queue_name (str) – The queue to flush.

flush_all()[source]

Drop all messages from all declared queues.

get_actor(actor_name: str) Actor

Look up an actor by its name.

Parameters

actor_name (str) – The name to look up.

Raises

ActorNotFound – If the actor was never declared.

Returns

The actor.

Return type

Actor

get_cancel_backend() remoulade.cancel.backend.CancelBackend

Get the CancelBackend associated with the broker

Raises

NoCancelBackend – if there is no CancelBackend

Returns

the cancel backend

Return type

CancelBackend

get_declared_actors() Set[str]

Get all declared actors.

Returns

The names of all the actors declared so far on this Broker.

Return type

set[str]

get_declared_delay_queues() Set[str]

Get all declared delay queues.

Returns

The names of all the delay queues declared so far on this Broker.

Return type

set[str]

get_declared_queues() Set[str]

Get all declared queues.

Returns

The names of all the queues declared so far on this Broker.

Return type

set[str]

get_result_backend() remoulade.results.backend.ResultBackend

Get the ResultBackend associated with the broker

Raises

NoResultBackend – if there is no ResultBackend

Returns

the result backend

Return type

ResultBackend

get_state_backend() remoulade.state.backend.StateBackend

Get the StateBackend associated with the broker

Raises

NoStateBackend – if there is no StateBackend

Returns

the state backend

Return type

StateBackend

join(queue_name, *, timeout=None)[source]

Wait for all the messages on the given queue to be processed. This method is only meant to be used in tests to wait for all the messages in a queue to be processed.

Raises
  • QueueJoinTimeout – When the timeout elapses.

  • QueueNotFound – If the given queue was never declared.

Parameters
  • queue_name (str) – The queue to wait on.

  • timeout (Optional[int]) – The max amount of time, in milliseconds, to wait on this queue.

remove_middleware(middleware_class: Type[Middleware])

Removes a middleware object from this broker.

Parameters

middleware_class (Type[Middleware]) – The middleware class.

Middleware

The following middleware are all enabled by default.

class remoulade.Middleware[source]

Base class for broker middleware. The default implementations for all hooks are no-ops and subclasses may implement whatever subset of hooks they like.

property actor_options

The set of options that may be configured on each actor.

before_ack(broker, message)[source]

Called before a message is acknowledged.

after_ack(broker, message)[source]

Called after a message has been acknowledged.

before_nack(broker, message)[source]

Called before a message is rejected.

after_nack(broker, message)[source]

Called after a message has been rejected.

before_declare_actor(broker, actor)[source]

Called before an actor is declared.

after_declare_actor(broker, actor)[source]

Called after an actor has been declared.

before_declare_queue(broker, queue_name)[source]

Called before a queue is declared.

after_declare_queue(broker, queue_name)[source]

Called after a queue has been declared.

after_declare_delay_queue(broker, queue_name)[source]

Called after a delay queue has been declared.

before_enqueue(broker, message, delay)[source]

Called before a message is enqueued.

after_enqueue(broker, message, delay, exception=None)[source]

Called after a message has been enqueued.

before_delay_message(broker, message)[source]

Called before a message has been delayed in worker memory.

before_process_message(broker, message)[source]

Called before a message is processed.

Raises

SkipMessage – If the current message should be skipped. When this is raised, after_skip_message is emitted instead of after_process_message.

after_process_message(broker, message, *, result=None, exception=None)[source]

Called after a message has been processed.

after_worker_thread_process_message(broker, thread)[source]

Called after a worker thread has finished processing a message

after_skip_message(broker, message)[source]

Called instead of after_process_message after a message has been skippped.

after_message_canceled(broker, message)[source]

Called instead of after_process_message after a message has been canceled.

after_process_boot(broker)[source]

Called immediately after subprocess start up.

before_process_stop(broker)[source]

Called before after subprocess stop.

before_worker_boot(broker, worker)[source]

Called before the worker process starts up.

after_worker_boot(broker, worker)[source]

Called after the worker process has started up.

before_worker_shutdown(broker, worker)[source]

Called before the worker process shuts down.

after_worker_shutdown(broker, worker)[source]

Called after the worker process shuts down.

before_consumer_thread_shutdown(broker, thread)[source]

Called before a consumer thread shuts down. This may be used to clean up thread-local resources (such as Django database connections).

There is no after_consumer_thread_boot.

before_worker_thread_shutdown(broker, thread)[source]

Called before a worker thread shuts down. This may be used to clean up thread-local resources (such as Django database connections).

There is no after_worker_thread_boot.

after_enqueue_pipe_target(broker, group_info)[source]

Called after the pipe target of a message has been enqueued

before_build_group_pipeline(broker, group_id, message_ids)[source]

Called before a group in a group pipeline is enqueued

update_options_before_create_message(options, broker, actor_name)[source]

Called when a message is being built. The message options is set to this function’s return value

before_actor_execution(broker, message)[source]

Called before the actor is called

after_actor_execution(broker, message, *, runtime=0)[source]

Called after the actor is called

class remoulade.middleware.AgeLimit(*, max_age=None)[source]

Middleware that drops messages that have been in the queue for too long.

Parameters

max_age (int) – The default message age limit in millseconds. Defaults to None, meaning that messages can exist indefinitely.

class remoulade.middleware.CatchError[source]

Middleware that lets you enqueue another actor or message on message failure.

Parameters

on_failure (Message|Actor|str) – A Message, Actor or Actor name to enqueue on failure.

class remoulade.middleware.Pipelines[source]

Middleware that lets you pipe actors together so that the output of one actor feeds into the input of another.

Parameters
  • pipe_ignore (bool) – When True, ignores the result of the previous actor in the pipeline.

  • pipe_target (dict) – A message representing the actor the current result should be fed into.

  • pipe_on_error (bool) – When True, pipe errors to next actor in line.

class remoulade.middleware.Prometheus(*, http_host='127.0.0.1', http_port=9191, registry=None, use_default_label=False)[source]

A middleware that exports stats via Prometheus.

Parameters
  • http_host (str) – The host to bind the Prometheus exposition server on. This parameter can also be configured via the remoulade_prom_host environment variable.

  • http_port (int) – The port on which the server should listen. This parameter can also be configured via the remoulade_prom_port environment variable.

  • registry (CollectorRegistry) – the prometheus registry to use, if None, use a new registry.

class remoulade.middleware.Retries(*, max_retries: Optional[int] = None, min_backoff: Optional[int] = None, max_backoff: Optional[int] = None, retry_when: Optional[Callable[[int, Exception], bool]] = None, backoff_strategy: typing_extensions.Literal[constant, linear, spread_linear, exponential, spread_exponential] = 'exponential', jitter: bool = True, increase_priority_on_retry: bool = False)[source]

Middleware that automatically retries failed tasks with exponential backoff.

Parameters
  • max_retries (int) – The maximum number of times tasks can be retried.

  • min_backoff (int) – The minimum amount of backoff milliseconds to apply to retried tasks. Defaults to 15 seconds.

  • max_backoff (int) – The maximum amount of backoff milliseconds to apply to retried tasks. Defaults to 7 days.

  • retry_when (Callable[[int, Exception], bool]) – An optional predicate that can be used to programmatically determine whether a task should be retried or not. This takes precedence over max_retries when set.

  • increase_priority_on_retry (bool) – specifies wether to increase priority of message when retried. Default is False.

class remoulade.middleware.ShutdownNotifications(notify_shutdown=False)[source]

Middleware that interrupts actors whose worker process has been signaled for termination. Currently, this is only available on CPython.

Note

This works by setting an async exception in the worker thread that runs the actor. This means that the exception will only get called the next time that thread acquires the GIL. Concretely, this means that this middleware can’t cancel system calls.

Parameters

notify_shutdown (bool) – When true, the actor will be interrupted if the worker process was terminated.

class remoulade.middleware.TimeLimit(*, time_limit: int = 1800000, interval: int = 1000, exit_delay: Optional[int] = None)[source]

Middleware that cancels actors that run for too long. Currently, this is only available on CPython.

Note

This works by setting an async exception in the worker thread that runs the actor. This means that the exception will only get called the next time that thread acquires the GIL. Concretely, this means that this middleware can’t cancel system calls.

Parameters
  • time_limit (int) – The maximum number of milliseconds actors may run for.

  • interval (int) – The interval (in milliseconds) with which to check for actors that have exceeded the limit.

  • exit_delay (int) – The delay (in milliseconds) after with we stop (SystemExit) to the worker if the exception failed to stop the message (ie. system calls). None to disable, disabled by default.

Middleware Errors

The class hierarchy for middleware exceptions:

BaseException
+-- Exception
|   +-- remoulade.middleware.MiddlewareError
|       +-- remoulade.middleware.SkipMessage
+-- remoulade.middleware.Interrupt
    +-- remoulade.middleware.Shutdown
    +-- remoulade.middleware.TimeLimitExceeded
class remoulade.middleware.MiddlewareError[source]

Base class for middleware errors.

class remoulade.middleware.SkipMessage[source]

An exception that may be raised by Middleware inside the before_process_message hook in order to skip a message.

class remoulade.middleware.Interrupt[source]

Base class for exceptions used to asynchronously interrupt a thread’s execution. An actor may catch these exceptions in order to respond gracefully, such as performing any necessary cleanup.

This is not a subclass of RemouladeError to avoid it being caught unintentionally.

class remoulade.middleware.TimeLimitExceeded[source]

Exception used to interrupt worker threads when actors exceed their time limits.

class remoulade.middleware.Shutdown[source]

Exception used to interrupt worker threads when their worker processes have been signaled for termination.

Results

Actor results can be stored and retrieved by leveraging result backends and the results middleware. Results and result backends are not enabled by default and you should avoid using them until you have a really good use case. Most of the time you can get by with actors simply updating data in your database instead of using results.

Result objects

class remoulade.result.Result(*, message_id: str)[source]

Encapsulates metadata needed to retrieve the result of a message

Parameters

message_id (str) – The id of the message sent to the broker.

class remoulade.collection_results.CollectionResults(children: Iterable[ResultT])[source]

Result of a group or pipeline, having result related methods

Parameters

children (List[Result|CollectionResults]) – A sequence of results of messages, groups or pipelines.

Result Middleware

class remoulade.results.Results(*, backend=None, store_results=False, result_ttl=None)[source]

Middleware that automatically stores actor results.

Example

>>> from remoulade.results import Results
>>> from remoulade.results.backends import RedisBackend
>>> backend = RedisBackend()
>>> broker.add_middleware(Results(backend=backend))
>>> @remoulade.actor(store_results=True)
... def add(x, y):
...   return x + y
>>> broker.declare_actor(add)
>>> message = add.send(1, 2)
>>> message.result.get()
3
Parameters
  • backend (ResultBackend) – The result backend to use when storing results.

  • store_results (bool) – Whether or not actor results should be stored. Defaults to False and can be set on a per-actor basis.

  • result_ttl (int) – The maximum number of milliseconds results are allowed to exist in the backend. Defaults to 10 minutes and can be set on a per-actor basis.

Backends

class remoulade.results.ResultBackend(*, namespace: str = 'remoulade-results', encoder: Optional[remoulade.encoder.Encoder] = None, default_timeout: Optional[int] = None)[source]

ABC for result backends.

Parameters
  • namespace (str) – The logical namespace under which the data should be stored.

  • encoder (Encoder) – The encoder to use when storing and retrieving result data. Defaults to JSONEncoder.

class remoulade.results.backends.RedisBackend(*, namespace='remoulade-results', encoder=None, client=None, url=None, default_timeout=None, max_retries=3, min_backoff=500, max_backoff=5000, backoff_strategy: typing_extensions.Literal[constant, linear, spread_linear, exponential, spread_exponential] = 'spread_exponential', **parameters)[source]

A result backend for Redis. This is the recommended result backend as waiting for a result is resource efficient.

Parameters
  • namespace (str) – A string with which to prefix result keys.

  • encoder (Encoder) – The encoder to use when storing and retrieving result data. Defaults to JSONEncoder.

  • client (Redis) – An optional client. If this is passed, then all other parameters are ignored.

  • url (str) – An optional connection URL. If both a URL and connection paramters are provided, the URL is used.

  • **parameters (dict) – Connection parameters are passed directly to redis.Redis.

class remoulade.results.backends.StubBackend(*, namespace: str = 'remoulade-results', encoder: Optional[remoulade.encoder.Encoder] = None, default_timeout: Optional[int] = None)[source]

An in-memory result backend. For use in unit tests.

Parameters
  • namespace (str) – A string with which to prefix result keys.

  • encoder (Encoder) – The encoder to use when storing and retrieving result data. Defaults to JSONEncoder.

Rate Limiters

Rate limiters can be used to determine whether or not an operation can be run at the current time across many processes and machines by using a shared storage backend.

Rate Limiter Backends

Rate limiter backends are used to store metadata about rate limits.

class remoulade.rate_limits.RateLimiterBackend[source]

ABC for rate limiter backends.

class remoulade.rate_limits.backends.RedisBackend(*, client=None, url=None, **parameters)[source]

A rate limiter backend for Redis.

Parameters
  • client (Redis) – An optional client. If this is passed, then all other parameters are ignored.

  • url (str) – An optional connection URL. If both a URL and connection paramters are provided, the URL is used.

  • **parameters (dict) – Connection parameters are passed directly to redis.Redis.

class remoulade.rate_limits.backends.StubBackend[source]

An in-memory rate limiter backend. For use in unit tests.

Limiters

class remoulade.rate_limits.RateLimiter(backend, key)[source]

ABC for rate limiters.

Examples

>>> from remoulade.rate_limits.backends import RedisBackend
>>> backend = RedisBackend()
>>> limiter = ConcurrentRateLimiter(backend, "distributed-mutex", limit=1)
>>> with limiter.acquire(raise_on_failure=False) as acquired:
...   if not acquired:
...     print("Mutex not acquired.")
...     return
...
...   print("Mutex acquired.")
Parameters
  • backend (RateLimiterBackend) – The rate limiting backend to use.

  • key (str) – The key to rate limit on.

acquire(*, raise_on_failure=True)[source]

Attempt to acquire a slot under this rate limiter.

Parameters

raise_on_failure (bool) – Whether or not failures should raise an exception. If this is false, the context manager will instead return a boolean value representing whether or not the rate limit slot was acquired.

Returns

Whether or not the slot could be acquired.

Return type

bool

class remoulade.rate_limits.BucketRateLimiter(backend, key, *, limit=1, bucket=1000)[source]

A rate limiter that ensures that only up to limit operations may happen over some time interval.

Examples

Up to 10 operations every second:

>>> BucketRateLimiter(backend, "some-key", limit=10, bucket=1_000)

Up to 1 operation every minute:

>>> BucketRateLimiter(backend, "some-key", limit=1, bucket=60_000)

Warning

Bucket rate limits are cheap to maintain but are susceptible to burst “attacks”. Given a bucket rate limit of 100 per minute, an attacker could make a burst of 100 calls in the last second of a minute and then another 100 calls in the first second of the subsequent minute.

For a rate limiter that doesn’t have this problem (but is more expensive to maintain), see WindowRateLimiter.

Parameters
  • backend (RateLimiterBackend) – The backend to use.

  • key (str) – The key to rate limit on.

  • limit (int) – The maximum number of operations per bucket per key.

  • bucket (int) – The bucket interval in milliseconds.

class remoulade.rate_limits.ConcurrentRateLimiter(backend, key, *, limit=1, ttl=900000)[source]

A rate limiter that ensures that only limit concurrent operations may happen at the same time.

Note

You can use a concurrent rate limiter of size 1 to get a distributed mutex.

Parameters
  • backend (RateLimiterBackend) – The backend to use.

  • key (str) – The key to rate limit on.

  • limit (int) – The maximum number of concurrent operations per key.

  • ttl (int) – The time in milliseconds that keys may live for.

class remoulade.rate_limits.WindowRateLimiter(backend, key, *, limit=1, window=1)[source]

A rate limiter that ensures that only limit operations may happen over some sliding window.

Note

Windows are in seconds rather that milliseconds. This is different from most durations and intervals used in Remoulade, because keeping metadata at the millisecond level is far too expensive for most use cases.

Parameters
  • backend (RateLimiterBackend) – The backend to use.

  • key (str) – The key to rate limit on.

  • limit (int) – The maximum number of operations per window per key.

  • window (int) – The window size in seconds. The wider the window, the more expensive it is to maintain.

Workers

class remoulade.Worker(broker, *, queues=None, worker_timeout=1000, worker_threads=8, prefetch_multiplier=2)[source]

Workers consume messages off of all declared queues and distribute those messages to individual worker threads for processing. Workers don’t block the current thread so it’s up to the caller to keep it alive.

Don’t run more than one Worker per process.

Parameters
  • broker (Broker) –

  • queues (Set[str]) – An optional subset of queues to listen on. By default, if this is not provided, the worker will listen on all declared queues.

  • worker_timeout (int) – The number of milliseconds workers should wake up after if the queue is idle.

  • worker_threads (int) – The number of worker threads to spawn.

  • prefetch_multiplier (int) – The number of message to prefetch at a time, to be multiplied with the number of threads

join()[source]

Wait for this worker to complete its work in progress. This method is useful when testing code.

pause()[source]

Pauses all the worker threads.

resume()[source]

Resumes all the worker threads.

start()[source]

Initialize the worker boot sequence and start up all the worker threads.

stop(timeout=600000)[source]

Gracefully stop the Worker and all of its consumers and workers.

Parameters

timeout (int) – The number of milliseconds to wait for everything to shut down.

Errors

class remoulade.RemouladeError(message: str)[source]

Base class for all remoulade errors.

class remoulade.BrokerError(message: str)[source]

Base class for broker-related errors.

class remoulade.ActorNotFound(message: str)[source]

Raised when a message is sent to an actor that hasn’t been declared.

class remoulade.QueueNotFound(message: str)[source]

Raised when a message is sent to an queue that hasn’t been declared.

class remoulade.ConnectionError(message: str)[source]

Base class for broker connection-related errors.

class remoulade.ConnectionClosed(message: str)[source]

Raised when a broker connection is suddenly closed.

class remoulade.ConnectionFailed(message: str)[source]

Raised when a broker connection could not be opened.

class remoulade.RateLimitExceeded(message: str)[source]

Raised when a rate limit has been exceeded.

class remoulade.NoResultBackend(message: str)[source]

Raised when trying to access the result backend on a broker without it

class remoulade.results.ResultError(message: str)[source]

Base class for result errors.

class remoulade.results.ResultMissing(message: str)[source]

Raised when a result can’t be found.

class remoulade.results.ResultTimeout(message: str)[source]

Raised when waiting for a result times out.