Cookbook

This part of the docs contains recipes for various things you might want to do using Remoulade. Each section will be light on prose and code-heavy, so if you have any questions about one of the recipes, open an issue on GitHub.

Send Actor on Fail

Remoulade has built-in support for sending messages when other messages fail. The on_failure option can either be an Actor, or a Message if passed in the message options. The on_failure message or actor is enqueued every time an actor fails or times out and is not going to be retried. It receives 4 arguments : the name of the actor that failed, the name of the exception, and the args and kwargs of the message that failed.

import remoulade

@remoulade.actor
def identity(x):
  return x


@remoulade.actor
def print_error(actor_name, exception_name, message_args, message_kwargs):
  print(f"Actor {actor_name} failed:")
  print(f"Exception type: {exception_name}")
  print(f"Message args: {message_args}")
  print(f"Message kwargs: {message_kwargs}")

if __name__ == "__main__":
  identity.send_with_options(
    args=(42,)
    on_failure=print_error,
  )

Composition

Remoulade has built-in support for a couple of high-level composition constructs. You can use these to combine generalized tasks that don’t know about one another into complex workflows.

In order to take advantage of group and pipeline result management, you need to enable result storage and your actors need to store results. Check out the Results section for more information on result storage.

Groups

Groups run actors in parallel and let you gather their results or wait for all of them to finish. Assuming you have a computationally intensive actor called frobnicate, you can group multiple messages together as follows:

g = group([
  frobnicate.message(1, 2),
  frobnicate.message(2, 3),
  frobnicate.message(3, 4),
]).run()

This will enqueue 3 separate messages and, assuming there are enough resources available, execute them in parallel. You can then wait for the whole group to finish:

g.results.wait(timeout=10_000)  # 10s expressed in millis

Or you can iterate over the results:

for res in g.results.get(block=True, timeout=5_000):
  ...

Results are returned in the same order that the messages were added to the group. If you don’t pass the timeout argument in get, the timeout will have a default value of 10 seconds. To set a custom default timeout, pass a default_timeout argument when instantiating your result backend.

Pipelines

Actors can be chained together using the pipeline function. For example, if you have an actor that gets the text contents of a website and one that counts the number of “words” in a piece of text:

@remoulade.actor
def get_uri_contents(uri):
  return requests.get(uri).text

@remoulade.actor
def count_words(uri, text):
  count = len(text.split(" "))
  print(f"There are {count} words at {uri}.")

You can chain them together like so:

uri = "http://example.com"
pipe = pipeline([
  get_uri_contents.message(uri),
  count_words.message(uri),
]).run()

Or you can use pipe notation to achieve the same thing:

pipe = get_uri_contents.message(uri) | count_words.message(uri)

In both cases, the result of running get_uri_contents(uri) is passed as the last positional argument to count_words. If you would like to avoid passing the result of an actor to the next one in line, set the pipe_ignore option to True when you create the “receiving” message:

(
  bust_caches.message() |
  prepare_codes.message_with_options(pipe_ignore=True) |
  launch_missiles.message()
)

Here, the result of bust_caches() will not be passed to prepare_codes(), but the result of prepare_codes() will be passed to launch_missiles(codes). To get the end result of a pipeline – that is, the result of the last actor in the pipeline – you can call get:

pipe.result.get(block=True, timeout=5_000)

To get the intermediate results of each step in the pipeline, you can call get:

for res in pipe.results.get(block=True):
  ...

Logging

If you want to track your messages, you can use the LoggingMetadata middleware.

This middleware enables you to pass metadata to your messages, either by using the logging_metadata option:

message = actor.message_with_options(logging_metadata={"id":"value"})

Or by passing a callback that returns the metadata to the message using the logging_metadata_getter option:

def callback():
    return {"id":"value}

message = actor.message_with_options(logging_metadata_getter=callback)

Either way, the logging_metadata will be sent in all remoulade logs concerning this message, and can also be accessed like this:

message.options['logging_metadata']

As with most options, you can pass these options at every level : message, actor and middleware.

When a message is created, the value of the logging_metadata and return value of logging_metadata from every level are merged and passed to the message. Same fields in multiple levels or options are overwritten following the standard option priority : message level having higher priority than actor level, which has higher priority that middleware level. For each level, logging_metadata_getter has higher priority that logging_metadata.

Note

Because they are already used in logging, “message_id” and “input” cannot be used as fields in logging_metadata.

Error Reporting

Reporting errors with Rollbar

Rollbar provides an easy-to-use Python client. Add it to your project with pipenv:

$ pipenv install rollbar

Save the following middleware to a module inside your project:

import remoulade
import rollbar

class RollbarMiddleware(remoulade.Middleware):
  def after_process_message(self, broker, message, *, result=None, exception=None):
    if exception is not None:
      rollbar.report_exc_info()

Finally, instantiate and add it to your broker:

rollbar.init(YOUR_ROLLBAR_KEY)
broker.add_middleware(path.to.RollbarMiddleware())

Reporting errors with Sentry

Install Sentry’s raven client with pipenv:

$ pipenv install raven

Save the following middleware to a module inside your project:

import remoulade

class SentryMiddleware(remoulade.Middleware):
  def __init__(self, raven_client):
    self.raven_client = raven_client

  def after_process_message(self, broker, message, *, result=None, exception=None):
    if exception is not None:
      self.raven_client.captureException()

Finally, instantiate and add it to your broker:

from raven import Client

raven_client = Client(YOUR_DSN)
broker.add_middleware(path.to.SentryMiddleware(raven_client))

Operations

Binding Worker Groups to Queues

By default, Remoulade workers consume all declared queues, but it’s common to want to bind worker groups to specific queues in order to have better control over throughput. For example, given the following actors:

@remoulade.actor
def very_slow():
  ...

@remoulade.actor(queue_name="ui-blocking")
def very_important():
  ...

You may want to run one group of workers that only processes messages on the default queue and another that only processes messages off of the ui-blocking queue. To do that, you have to pass each group the appropriate queue on the command line:

# Only consume the "default" queue
$ remoulade an_app --queues default

# Only consume the "ui-blocking" queue
$ remoulade an_app --queues ui-blocking

Messages sent to very_slow will always be delievered to those workers that consume the default queue and messages sent to very_important will always be delievered to the ones that consume the ui-blocking queue.

Rate Limiting

Rate limiting work

You can use Remoulade’s RateLimiters to constrain actor concurrency.

import remoulade
import time

from remoulade.rate_limits import ConcurrentRateLimiter
from remoulade.rate_limits.backends import RedisBackend

backend = RedisBackend()
DISTRIBUTED_MUTEX = ConcurrentRateLimiter(backend, "distributed-mutex", limit=1)

@remoulade.actor
def one_at_a_time():
  with DISTRIBUTED_MUTEX.acquire():
    time.sleep(1)
    print("Done.")

Whenever two one_at_a_time actors run at the same time, one of them will be retried with exponential backoff. This works by raising an exception and relying on the built-in Retries middleware to do the work of re-enqueueing the task.

If you want rate limiters not to raise an exception when they can’t be acquired, you should pass raise_on_failure=False to acquire:

with DISTRIBUTED_MUTEX.acquire(raise_on_failure=False) as acquired:
  if not acquired:
    print("Lock could not be acquired.")
  else:
    print("Lock was acquired.")

Results

Storing message results

You can use Remoulade’s result backends to store and retrieve message return values. To enable result storage, you need to instantiate and add the Results middleware to your broker.

import remoulade

from remoulade.brokers.rabbitmq import RabbitmqBroker
from remoulade.results.backends import RedisBackend
from remoulade.results import Results

result_backend = RedisBackend()
broker = RabbitmqBroker()
broker.add_middleware(Results(backend=result_backend))
remoulade.set_broker(broker)

@remoulade.actor(store_results=True)
def add(x, y):
  return x + y

broker.declare_actor(add)

if __name__ == "__main__":
  message = add.send(1, 2)
  print(message.result.get(block=True, raise_on_error=True, forget=False))

Getting a result raises ResultMissing when a result hasn’t been stored yet or if it has already expired (results expire after 10 minutes by default). When the block parameter is True, ResultTimeout is raised instead. When the forget parameter is True the result will be deleted from the backend when retrieved.

If an exception is raised during message execution, a serialized version of the exception is stored in the ResultBackend. If raise_on_error parameter is True, an ErrorStored is raised when it’s the case.

Result

import remoulade

from remoulade.brokers.rabbitmq import RabbitmqBroker
from remoulade.results.backends import RedisBackend
from remoulade.results import Results

result_backend = RedisBackend()
broker = RabbitmqBroker()
broker.add_middleware(Results(backend=result_backend))
remoulade.set_broker(broker)

@remoulade.actor(store_results=True)
def add(x, y):
  return x + y

broker.declare_actor(add)

if __name__ == "__main__":
  message = add.send(1, 2)
  result = Result(message_id=message.message_id)
  print(result.get(block=True))

The property result of Message return a Result instance which can be used to get the result. But you can also create a Result from a message_id and access to the result the same way.

Group results

The property results of group return a CollectionResults instance which can be used to get the result. You access the results with the get method, but also the completed_count of the group (the count of finished actors errored or not).

You can also get all the message ids of a group with the message_ids property and create a CollectionResults with the from_message_ids method.

Pipelines of groups

If you activated the result backend, remoulade can be used to create a group pipeline as follow:

group_pipeline = group([
  do_something.message(1, 2),
  do_something.message(2, 3),
  do_something.message(3, 4),
]) | merge_results.message()

This can be handy to merge the results of parallel calculation. Under the hood, each group get a group_id and after each actor is finished a counter in the results backend associed to the group id is incremented. If the counter reach the number of message in the group, the results of each message are fetched from the result backend and the next message is enqueued.

Scheduling

Scheduling messages

There is a scheduler integrated into remoulade:

import remoulade
from datetime import datetime
from remoulade.brokers.rabbitmq import RabbitmqBroker
from remoulade.scheduler import ScheduledJob, Scheduler

broker = RabbitmqBroker()

remoulade.set_broker(broker)
remoulade.set_scheduler(
    Scheduler(
        broker,
        [
            ScheduledJob(actor_name="count_words", interval=86400),
        ]
    )
)

broker.declare_actor(count_words)

Optimizing

Prefetch Limits

The prefetch count is the number of message a worker can reserve for itself (the limit of unacknowledged message it can get from RabbitMQ).

The prefetch count is set by multiplying the prefetch_multiplier with the number of worker threads (default: 2)

If you have many actors with a long duration you want the multiplier value to be one, it’ll only reserve one task per worker process at a time.

But if you have short actors, you may want to increase this multiplier to reduce I/O.

remoulade package --prefetch-multiplier 1

Cancel

Cancel a message

You can cancel messages if you add the Cancel middleware to the broker.

import remoulade

from remoulade.brokers.rabbitmq import RabbitmqBroker
from remoulade.cancel.backends import RedisBackend
from remoulade.cancel import Cancel
from remoulade import group

result_backend = RedisBackend()
broker = RabbitmqBroker()
broker.add_middleware(Results(backend=result_backend))
remoulade.set_broker(broker)

@remoulade.actor()
def add(x, y):
  return x + y

broker.declare_actor(add)

if __name__ == "__main__":
  message = add.send(1, 2)
  message.cancel()

  g = group([add.message(1, 2), add.message(1, 2)]).run()
  g.cancel()

If a message has not yet started its processing, remoulade will not start the execution of the actor. Basically, the id of the message to cancel is stored in a redis set. And before each message processing, remoulade check if the message_id is in the set. Message, group and pipeline can be canceled.

Progress bar

tqdm is the recommended way if you want to make a progress bar for a group:

from time import time, sleep
import logging

from tqdm import tqdm
import remoulade

logger = logging.getLogger(__name__)

def blocking_remoulade_group(remoulade_group, timeout=1800):
    actor_count = len(remoulade_group)
    logger.info('Start group')
    start_time = time()
    try:
        with tqdm(total=actor_count) as progress_bar:
            results = remoulade_group.run().results
            completed_count, waited_time = 0, 0
            while waited_time < timeout and completed_count != actor_count:
                completed_count = results.completed_count
                waited_time = time() - start_time
                progress_bar.update(completed_count - progress_bar.n)
                sleep(1)

            progress_bar.update(completed_count - progress_bar.n)  # final update of the progress

        if waited_time > timeout:
            raise Exception('The operation timed out')

    except:
        logger.error('Group canceled ')
        remoulade_group.cancel()
        raise

    logger.info('Finished group')

    return results