# This file is a part of Remoulade.
#
# Copyright (C) 2017,2018 CLEARTYPE SRL <bogdan@cleartype.io>
#
# Remoulade is free software; you can redistribute it and/or modify it
# under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or (at
# your option) any later version.
#
# Remoulade is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
# FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
# License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import traceback
from typing import Callable, Optional
from ..helpers import compute_backoff
from ..helpers.backoff import BackoffStrategy
from ..logging import get_logger
from .middleware import Middleware
#: The default minimum amount of backoff to apply to retried tasks.
DEFAULT_MIN_BACKOFF = 15000
#: The default maximum amount of backoff to apply to retried tasks.
DEFAULT_MAX_BACKOFF = 1000 * 60 * 60
[docs]class Retries(Middleware):
"""Middleware that automatically retries failed tasks with
exponential backoff.
Parameters:
max_retries(int): The maximum number of times tasks can be retried.
min_backoff(int): The minimum amount of backoff milliseconds to
apply to retried tasks. Defaults to 15 seconds.
max_backoff(int): The maximum amount of backoff milliseconds to
apply to retried tasks. Defaults to 7 days.
retry_when(Callable[[int, Exception], bool]): An optional
predicate that can be used to programmatically determine
whether a task should be retried or not. This takes
precedence over `max_retries` when set.
increase_priority_on_retry(bool): specifies wether to increase
priority of message when retried. Default is False.
"""
def __init__(
self,
*,
max_retries: Optional[int] = None,
min_backoff: Optional[int] = None,
max_backoff: Optional[int] = None,
retry_when: Optional[Callable[[int, Exception], bool]] = None,
backoff_strategy: BackoffStrategy = "exponential",
jitter: bool = True,
increase_priority_on_retry: bool = False
):
self.logger = get_logger(__name__, type(self))
self.max_retries = max_retries
self.min_backoff = min_backoff or DEFAULT_MIN_BACKOFF
self.max_backoff = max_backoff or DEFAULT_MAX_BACKOFF
self.retry_when = retry_when
self.backoff_strategy = backoff_strategy
self.jitter = jitter
self.increase_priority_on_retry = increase_priority_on_retry
@property
def actor_options(self):
return {
"max_retries",
"min_backoff",
"max_backoff",
"retry_when",
"backoff_strategy",
"jitter",
"increase_priority_on_retry",
}
def after_process_message(self, broker, message, *, result=None, exception=None):
if exception is None:
return
retries = message.options.setdefault("retries", 0)
max_retries = self.get_option("max_retries", broker=broker, message=message)
retry_when = self.get_option("retry_when", broker=broker, message=message)
if retry_when is None and not max_retries:
message.fail()
return
if (
retry_when is not None
and not retry_when(retries, exception)
or max_retries is not None
and retries >= max_retries
):
if max_retries is not None and retries >= max_retries:
self.logger.warning(f"Retries exceeded for message {message.message_id}.")
else:
self.logger.warning(f"Message {message.message_id} has failed and will not be retried.")
message.fail()
return
new_message = message.copy()
increase_priority_on_retry = self.get_option("increase_priority_on_retry", broker=broker, message=message)
if increase_priority_on_retry and getattr(broker, "max_priority", None) is not None:
new_message.options["priority"] = min(message.options.get("priority", 0) + 1, broker.max_priority)
new_message.options["increase_priority_on_retry"] = False # we only want to do it once
new_message.options["retries"] += 1
new_message.options["traceback"] = traceback.format_exc(limit=30)
min_backoff = self.get_option("min_backoff", broker=broker, message=message)
max_backoff = self.get_option("max_backoff", broker=broker, message=message)
backoff_strategy = self.get_option("backoff_strategy", broker=broker, message=message)
jitter = self.get_option("jitter", broker=broker, message=message)
_, backoff = compute_backoff(
retries,
min_backoff=min_backoff,
max_backoff=max_backoff,
jitter=jitter,
max_retries=max_retries or 10,
backoff_strategy=backoff_strategy,
)
self.logger.info("Retrying message %r in %d milliseconds.", message.message_id, backoff)
broker.enqueue(new_message, delay=backoff)