# This file is a part of Remoulade.
#
# Copyright (C) 2017,2018 CLEARTYPE SRL <bogdan@cleartype.io>
#
# Remoulade is free software; you can redistribute it and/or modify it
# under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or (at
# your option) any later version.
#
# Remoulade is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
# FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
# License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import os
import time
from typing import Iterable, List
import redis
from ...helpers.backoff import BackoffStrategy, compute_backoff
from ..backend import BackendResult, ForgottenResult, Missing, ResultBackend, ResultMissing, ResultTimeout
[docs]class RedisBackend(ResultBackend):
"""A result backend for Redis_. This is the recommended result
backend as waiting for a result is resource efficient.
Parameters:
namespace(str): A string with which to prefix result keys.
encoder(Encoder): The encoder to use when storing and retrieving
result data. Defaults to :class:`.JSONEncoder`.
client(Redis): An optional client. If this is passed,
then all other parameters are ignored.
url(str): An optional connection URL. If both a URL and
connection paramters are provided, the URL is used.
**parameters(dict): Connection parameters are passed directly
to :class:`redis.Redis`.
.. _redis: https://redis.io
"""
def __init__(
self,
*,
namespace="remoulade-results",
encoder=None,
client=None,
url=None,
default_timeout=None,
max_retries=3,
min_backoff=500,
max_backoff=5000,
backoff_strategy: BackoffStrategy = "spread_exponential",
**parameters,
):
super().__init__(namespace=namespace, encoder=encoder, default_timeout=default_timeout)
url = url or os.getenv("REMOULADE_REDIS_URL")
if url:
parameters["connection_pool"] = redis.ConnectionPool.from_url(url)
self.client = client or redis.Redis(**parameters)
self.max_retries = max_retries
self.min_backoff = min_backoff
self.max_backoff = max_backoff
self.backoff_strategy = backoff_strategy
def get_results(
self,
message_ids: List[str],
*,
block: bool = False,
timeout: int = None,
forget: bool = False,
raise_on_error: bool = True,
) -> Iterable[BackendResult]:
if block:
yield from super().get_results(message_ids, block=block, timeout=timeout, forget=forget, raise_on_error=raise_on_error)
else:
with self.client.pipeline() as pipe:
for message_id in message_ids:
message_key = self.build_message_key(message_id)
if forget:
pipe.rpushx(message_key, self.encoder.encode(ForgottenResult.asdict()))
pipe.lpop(message_key)
else:
pipe.rpoplpush(message_key, message_key)
data = pipe.execute()
for i, row in enumerate(data):
if i % 2 == 0 and forget:
continue # skip one row in two if forget as there is two commands
if row is None:
raise ResultMissing(message_id)
yield self.process_result(BackendResult(**self.encoder.decode(row)), raise_on_error)
def get_result(self, message_id: str, *, block=False, timeout=None, forget=False, raise_on_error=True):
"""Get a result from the backend.
Warning:
Sub-second timeouts are not respected by this backend.
Parameters:
message_id(str)
block(bool): Whether or not to block until a result is set.
timeout(int): The maximum amount of time, in ms, to wait for
a result when block is True. Defaults to 10 seconds.
forget(bool): Whether or not the result need to be kept.
raise_on_error(bool): raise an error if the result stored in
an error
Raises:
ResultMissing: When block is False and the result isn't set.
ResultTimeout: When waiting for a result times out.
Returns:
object: The result.
"""
if timeout is None:
timeout = self.default_timeout
message_key = self.build_message_key(message_id)
timeout = int(timeout / 1000)
deadline = time.monotonic() + timeout
retry_count, data = 0, None
while data is None:
try:
if block and timeout > 0:
data = self.client.brpoplpush(message_key, message_key, timeout=timeout)
if forget and data is not None:
with self.client.pipeline() as pipe:
pipe.lpushx(message_key, self.encoder.encode(ForgottenResult.asdict()))
pipe.ltrim(message_key, 0, 0)
pipe.execute()
else:
if forget:
with self.client.pipeline() as pipe:
pipe.rpushx(message_key, self.encoder.encode(ForgottenResult.asdict()))
pipe.lpop(message_key)
data = pipe.execute()[1]
else:
data = self.client.rpoplpush(message_key, message_key)
if data is None:
if block:
raise ResultTimeout(message_id)
else:
raise ResultMissing(message_id)
except (redis.ConnectionError, redis.TimeoutError):
# if data is not None, it means the second step of block+forget has failed, we can live without a forget
if data is not None:
break
if retry_count >= self.max_retries:
raise
_, backoff = compute_backoff(
retry_count,
min_backoff=self.min_backoff,
max_backoff=self.max_backoff,
max_retries=self.max_retries,
)
retry_count += 1
time.sleep(backoff / 1000)
timeout = deadline - time.monotonic()
if block and timeout <= 0: # do not retry is timeout is expired
raise
result = BackendResult(**self.encoder.decode(data))
return self.process_result(result, raise_on_error)
def _store(self, message_keys, results, ttl):
with self.client.pipeline() as pipe:
for (message_key, result) in zip(message_keys, results):
pipe.delete(message_key)
pipe.lpush(message_key, self.encoder.encode(result))
pipe.pexpire(message_key, ttl)
pipe.execute()
def _get(self, key, forget=False):
data = self.client.rpoplpush(key, key)
if data:
return self.encoder.decode(data)
return Missing
def _delete(self, key):
self.client.delete(key)
def increment_group_completion(self, group_id: str, message_id: str, ttl: int) -> int:
group_completion_key = self.build_group_completion_key(group_id)
with self.client.pipeline() as pipe:
pipe.sadd(group_completion_key, message_id)
pipe.pexpire(group_completion_key, ttl)
pipe.scard(group_completion_key)
group_completion = pipe.execute()[2]
return group_completion
def get_status(self, message_ids: List[str]) -> int: # type: ignore
if not message_ids:
return 0
return self.client.exists(*[self.build_message_key(message_id) for message_id in message_ids])