Source code for remoulade.middleware.catch_error

from typing import Any

from .middleware import Middleware


[docs]class CatchError(Middleware): """Middleware that lets you enqueue another actor or message on message failure. Parameters: on_failure(Message|Actor|str): A Message, Actor or Actor name to enqueue on failure. """ @property def actor_options(self): return {"on_failure"} def after_process_message(self, broker, message, *, result=None, exception=None): from .. import Message if message.failed: on_failure = self.get_option("on_failure", broker=broker, message=message) if isinstance(on_failure, str): actor = broker.get_actor(on_failure) actor.send(message.actor_name, type(exception).__name__, message.args, message.kwargs) elif isinstance(on_failure, dict): on_failure_message = Message[Any](**on_failure) on_failure_message = on_failure_message.copy( args=[message.actor_name, type(exception).__name__, message.args, message.kwargs] ) broker.enqueue(on_failure_message) def update_options_before_create_message(self, options, broker, actor_name): from .. import Message from ..actor import Actor on_failure = options.get("on_failure") if isinstance(on_failure, Message): options["on_failure"] = on_failure.asdict() elif isinstance(on_failure, Actor): options["on_failure"] = on_failure.actor_name elif on_failure is not None and not isinstance(on_failure, str): raise TypeError(f"on_failure must be an Message, an Actor or a string, got {type(on_failure)} instead") return options