Source code for remoulade.message

# This file is a part of Remoulade.
# Copyright (C) 2017,2018 CLEARTYPE SRL <>
# Remoulade is free software; you can redistribute it and/or modify it
# under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or (at
# your option) any later version.
# Remoulade is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
# License for more details.
# You should have received a copy of the GNU Lesser General Public License
# along with this program.  If not, see <>.

import time
from typing import Any, Dict, Generic, TypeVar, cast

import attr

from remoulade.state import State

from .broker import get_broker
from .common import generate_unique_id
from .composition import pipeline
from .encoder import Encoder, JSONEncoder
from .errors import InvalidProgress
from .result import Result

#: The global encoder instance.
global_encoder = JSONEncoder()  # type: Encoder

[docs]def get_encoder() -> Encoder: """Get the global encoder object. Returns: Encoder """ global global_encoder return global_encoder
[docs]def set_encoder(encoder: Encoder) -> None: """Set the global encoder object. Parameters: encoder(Encoder): The encoder instance to use when serializing messages. """ global global_encoder global_encoder = encoder
ResultT = TypeVar("ResultT", bound=Result[Any], covariant=True)
[docs]@attr.s(frozen=True, slots=True, kw_only=True, auto_attribs=True) class Message(Generic[ResultT]): """Encapsulates metadata about messages being sent to individual actors. Parameters: queue_name(str): The name of the queue the message belogns to. actor_name(str): The name of the actor that will receive the message. args(tuple): Positional arguments that are passed to the actor. kwargs(dict): Keyword arguments that are passed to the actor. options(dict): Arbitrary options passed to the broker and middleware. message_id(str): A globally-unique id assigned to the actor. message_timestamp(int): The UNIX timestamp in milliseconds representing when the message was first enqueued. """ queue_name: str actor_name: str args: tuple = attr.field(converter=tuple) kwargs: Dict options: Dict[str, Any] message_id: str = attr.field(factory=generate_unique_id) message_timestamp: int = attr.field(factory=lambda: int(time.time() * 1000)) def __or__(self, other) -> pipeline: """Combine this message into a pipeline with "other".""" return pipeline((self, other))
[docs] def asdict(self): """Convert this message to a dictionary.""" return attr.asdict(self)
[docs] @classmethod def decode(cls, data): """Convert a bytestring to a message.""" return cls(**global_encoder.decode(data))
[docs] def encode(self): """Convert this message to a bytestring.""" return global_encoder.encode(self.asdict())
[docs] def copy(self, **attributes): """Create a copy of this message.""" updated_options = attributes.pop("options", {}) options = self.options.copy() options.update(updated_options) return attr.evolve(self, **attributes, options=options)
[docs] def build(self, options: Dict[str, Any]): """Build message for pipeline""" return self.copy(options=options)
[docs] def cancel(self) -> None: """Mark a message as canceled""" broker = get_broker() backend = broker.get_cancel_backend() backend.cancel([self.message_id])
[docs] def set_progress(self, progress: float) -> None: """Set the progress of the message. progress(float) number between 0 and 1 inclusive :raises: InvalidProgress: when not( 0 <= progress <= 1) """ if not (0 <= progress <= 1): raise InvalidProgress(f"Progress {progress} is not between 0 and 1.") broker = get_broker() backend = broker.get_state_backend() backend.set_state(State(self.message_id, progress=progress))
@property def result(self) -> ResultT: return cast(ResultT, Result(message_id=self.message_id)) def __str__(self) -> str: return f"{self.actor_name} / {self.message_id}"