# This file is a part of Remoulade.
#
# Copyright (C) 2017,2018 CLEARTYPE SRL <bogdan@cleartype.io>
#
# Remoulade is free software; you can redistribute it and/or modify it
# under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or (at
# your option) any later version.
#
# Remoulade is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
# FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
# License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
[docs]class MiddlewareError(Exception):
"""Base class for middleware errors."""
[docs]class SkipMessage(MiddlewareError):
"""An exception that may be raised by Middleware inside the
``before_process_message`` hook in order to skip a message.
"""
[docs]class Middleware:
"""Base class for broker middleware. The default implementations
for all hooks are no-ops and subclasses may implement whatever
subset of hooks they like.
"""
def get_option(self, option_name, *, broker, message, default=None):
# get option at message level
option = message.options.get(option_name)
if option is not None:
return option
# it doesn't make sense to get pipe_target and group_info at any other level than message
if option == "pipe_target" or option == "group_info":
return None
# get option at actor level
actor = broker.get_actor(message.actor_name)
option = actor.options.get(option_name)
if option is not None:
return option
# get option at middleware level or return default value
return getattr(self, option_name, default)
@property
def actor_options(self):
"""The set of options that may be configured on each actor."""
return set()
[docs] def before_ack(self, broker, message):
"""Called before a message is acknowledged."""
[docs] def after_ack(self, broker, message):
"""Called after a message has been acknowledged."""
[docs] def before_nack(self, broker, message):
"""Called before a message is rejected."""
[docs] def after_nack(self, broker, message):
"""Called after a message has been rejected."""
[docs] def before_declare_actor(self, broker, actor):
"""Called before an actor is declared."""
[docs] def after_declare_actor(self, broker, actor):
"""Called after an actor has been declared."""
[docs] def before_declare_queue(self, broker, queue_name):
"""Called before a queue is declared."""
[docs] def after_declare_queue(self, broker, queue_name):
"""Called after a queue has been declared."""
[docs] def after_declare_delay_queue(self, broker, queue_name):
"""Called after a delay queue has been declared."""
[docs] def before_enqueue(self, broker, message, delay):
"""Called before a message is enqueued."""
[docs] def after_enqueue(self, broker, message, delay, exception=None):
"""Called after a message has been enqueued."""
[docs] def before_delay_message(self, broker, message):
"""Called before a message has been delayed in worker memory."""
[docs] def before_process_message(self, broker, message):
"""Called before a message is processed.
Raises:
SkipMessage: If the current message should be skipped. When
this is raised, ``after_skip_message`` is emitted instead
of ``after_process_message``.
"""
[docs] def after_process_message(self, broker, message, *, result=None, exception=None):
"""Called after a message has been processed."""
[docs] def after_worker_thread_process_message(self, broker, thread):
"""Called after a worker thread has finished processing a message"""
[docs] def after_skip_message(self, broker, message):
"""Called instead of ``after_process_message`` after a message
has been skippped.
"""
[docs] def after_message_canceled(self, broker, message):
"""Called instead of ``after_process_message`` after a message
has been canceled.
"""
[docs] def after_process_boot(self, broker):
"""Called immediately after subprocess start up."""
[docs] def before_process_stop(self, broker):
"""Called before after subprocess stop."""
[docs] def before_worker_boot(self, broker, worker):
"""Called before the worker process starts up."""
[docs] def after_worker_boot(self, broker, worker):
"""Called after the worker process has started up."""
[docs] def before_worker_shutdown(self, broker, worker):
"""Called before the worker process shuts down."""
[docs] def after_worker_shutdown(self, broker, worker):
"""Called after the worker process shuts down."""
[docs] def before_consumer_thread_shutdown(self, broker, thread):
"""Called before a consumer thread shuts down. This may be
used to clean up thread-local resources (such as Django
database connections).
There is no ``after_consumer_thread_boot``.
"""
[docs] def before_worker_thread_shutdown(self, broker, thread):
"""Called before a worker thread shuts down. This may be used
to clean up thread-local resources (such as Django database
connections).
There is no ``after_worker_thread_boot``.
"""
[docs] def after_enqueue_pipe_target(self, broker, group_info):
"""Called after the pipe target of a message has been enqueued"""
[docs] def before_build_group_pipeline(self, broker, group_id, message_ids):
"""Called before a group in a group pipeline is enqueued"""
[docs] def update_options_before_create_message(self, options, broker, actor_name):
"""Called when a message is being built.
The message options is set to this function's return value"""
return options
[docs] def before_actor_execution(self, broker, message):
"""Called before the actor is called"""
[docs] def after_actor_execution(self, broker, message, *, runtime=0):
"""Called after the actor is called"""