Source code for remoulade.encoder
# This file is a part of Remoulade.
#
# Copyright (C) 2017,2018 CLEARTYPE SRL <bogdan@cleartype.io>
#
# Remoulade is free software; you can redistribute it and/or modify it
# under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or (at
# your option) any later version.
#
# Remoulade is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
# FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
# License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import abc
import json
import pickle
import warnings
from typing import Any, Dict, Optional, get_type_hints
from typing_extensions import Annotated
try:
from pydantic import BaseConfig, BaseModel, TypeAdapter, ValidationError, WithJsonSchema, create_model
from simplejson.decoder import JSONDecoder
from simplejson.encoder import JSONEncoder as _JSONEncoder
except ImportError: # pragma: no cover
warnings.warn(
"Pydantic and simplejson are not available. Run `pip install remoulade[pydantic]`",
ImportWarning,
)
from .errors import ActorNotFound
#: Represents the contents of a Message object as a dict.
MessageData = Dict[str, Any]
[docs]class Encoder(abc.ABC):
"""Base class for message encoders."""
[docs] @abc.abstractmethod
def encode(self, data: MessageData) -> bytes: # pragma: no cover
"""Convert message metadata into a bytestring."""
raise NotImplementedError
[docs] @abc.abstractmethod
def decode(self, data: bytes) -> MessageData: # pragma: no cover
"""Convert a bytestring into message metadata."""
raise NotImplementedError
[docs]class JSONEncoder(Encoder):
"""Encodes messages as JSON. This is the default encoder."""
def encode(self, data: MessageData) -> bytes:
return json.dumps(data, separators=(",", ":")).encode("utf-8")
def decode(self, data: bytes) -> MessageData:
return json.loads(data.decode("utf-8"))
[docs]class PickleEncoder(Encoder):
"""Pickles messages.
Warning:
This encoder is not secure against maliciously-constructed data.
Use it at your own risk.
"""
encode = pickle.dumps # type: ignore
decode = pickle.loads # type: ignore
class PydanticEncoder(Encoder):
"""PydanticEncoder remoulade encoder working with Pydantic schemas (install remoulade[pydantic] extra dependency)
With this encoder you must use only Pydantic schema as inputs/outputs of the actors and type them explicitly.
class MyActorInputSchema(BaseModel):
...
class MyActorOutputSchema(BaseModel):
...
@remoulade.actor()
def my_actor(input_1: MyActorInputSchema, input_2: MyActorInputSchema | None = None) -> MyActorOutputSchema:
...
return MyActorOutputSchema()
"""
def __init__(self, fallback_encoder: Optional[Encoder] = None):
self.fallback_encoder = fallback_encoder
self.json_encoder = _JSONEncoder(default=self.default)
self.json_decoder = JSONDecoder()
@staticmethod
def default(o):
if isinstance(o, BaseModel):
# keep dict otherwise it will be serialized as a string (see Pydantic .json())
return json.loads(o.model_dump_json())
raise TypeError("Object of type %s is not JSON serializable" % o.__class__.__name__)
def encode(self, data: MessageData) -> bytes:
try:
return self.json_encoder.encode(data).encode("utf-8")
except Exception as e:
if self.fallback_encoder is not None:
return self.fallback_encoder.encode(data)
else:
raise e
def decode(self, data: bytes) -> MessageData:
from remoulade import get_broker
try:
raw_message = self.json_decoder.decode(data.decode("utf-8")) # type: ignore
actor_name = raw_message["actor_name"]
actor_fn = get_broker().get_actor(actor_name).fn
# Retrieve the Pydantic schemas from typing
schemas_by_param_name = {}
for param_name, type_hint in get_type_hints(actor_fn).items():
schemas_by_param_name[param_name] = TypeAdapter(
Annotated[
type_hint,
WithJsonSchema(
{"type": type_hint, "description": f"{param_name}_schema"}, mode="serialization"
),
]
)
# Override message_data with Pydantic schema when it matches
parsed_message: Dict[str, Any] = {}
for key, values in raw_message.items():
if key == "kwargs":
assert isinstance(values, dict)
parsed_message[key] = {
param_name: schemas_by_param_name[param_name].validate_python(raw_value)
for param_name, raw_value in values.items()
}
elif key == "args":
assert isinstance(values, list)
schemas = list(schemas_by_param_name.values())
parsed_message[key] = [
schemas[order].validate_python(raw_value) for order, raw_value in enumerate(values)
]
elif key == "result":
parsed_message[key] = schemas_by_param_name["return"].validate_python(values)
else:
parsed_message[key] = values
return parsed_message
except Exception as e:
if self.fallback_encoder is not None:
return self.fallback_encoder.decode(data)
else:
raise e