Skip to content

celery_remote

Transformation Plugin that uses the Celery framework to call remote workers.

CeleryConfig

Bases: AttrDict

Celery configuration.

All fields here (including those added from the session through the get() method, as well as those added "anonymously") will be used as keyword arguments to the send_task() method for the Celery App.

Note

Using alias for the name field to favor populating it with task_name arguments, since this is the "original" field name. I.e., this is done for backwards compatibility.

Setting allow_population_by_field_name=True as pydantic model configuration in order to allow populating it using name as well as task_name.

Source code in oteapi/strategies/transformation/celery_remote.py
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
class CeleryConfig(AttrDict, allow_population_by_field_name=True):
    """Celery configuration.

    All fields here (including those added from the session through the `get()` method,
    as well as those added "anonymously") will be used as keyword arguments to the
    `send_task()` method for the Celery App.

    Note:
        Using `alias` for the `name` field to favor populating it with `task_name`
        arguments, since this is the "original" field name. I.e., this is done for
        backwards compatibility.

    Setting `allow_population_by_field_name=True` as pydantic model configuration in
    order to allow populating it using `name` as well as `task_name`.

    """

    name: str = Field(..., description="A task name.", alias="task_name")
    args: list = Field(..., description="List of arguments for the task.")

CeleryRemoteStrategy

Submit job to remote Celery runner.

Registers strategies:

  • ("transformationType", "celery/remote")
Source code in oteapi/strategies/transformation/celery_remote.py
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
@dataclass
class CeleryRemoteStrategy:
    """Submit job to remote Celery runner.

    **Registers strategies**:

    - `("transformationType", "celery/remote")`

    """

    transformation_config: CeleryStrategyConfig

    def get(self, session: "Optional[Dict[str, Any]]" = None) -> SessionUpdateCelery:
        """Run a job, return a job ID."""
        if session:
            self._use_session(session)

        result: "Union[AsyncResult, Any]" = CELERY_APP.send_task(
            **self.transformation_config.configuration
        )
        return SessionUpdateCelery(celery_task_id=result.task_id)

    def initialize(self, session: "Optional[Dict[str, Any]]" = None) -> SessionUpdate:
        # pylint: disable=unused-argument
        """Initialize a job."""
        return SessionUpdate()

    def status(self, task_id: str) -> TransformationStatus:
        """Get job status."""
        result = AsyncResult(id=task_id, app=CELERY_APP)
        return TransformationStatus(id=task_id, status=result.state)

    def _use_session(self, session: "Dict[str, Any]") -> None:
        """Update the configuration with values from the sesssion.

        Check all fields (non-aliased and aliased) in `CeleryConfig` if they exist in
        the session. Override the given field values in the current strategy-specific
        configuration (the `CeleryConfig` instance) with the values found in the
        session.

        Parameters:
            session: The current OTE session.

        """
        alias_mapping: dict[str, str] = {
            field.alias: field_name
            for field_name, field in CeleryConfig.__fields__.items()
        }

        fields = set(CeleryConfig.__fields__)
        fields |= {_.alias for _ in CeleryConfig.__fields__.values()}

        for field in fields:
            if field in session:
                setattr(
                    self.transformation_config.configuration,
                    alias_mapping[field],
                    session[field],
                )

get(session=None)

Run a job, return a job ID.

Source code in oteapi/strategies/transformation/celery_remote.py
85
86
87
88
89
90
91
92
93
def get(self, session: "Optional[Dict[str, Any]]" = None) -> SessionUpdateCelery:
    """Run a job, return a job ID."""
    if session:
        self._use_session(session)

    result: "Union[AsyncResult, Any]" = CELERY_APP.send_task(
        **self.transformation_config.configuration
    )
    return SessionUpdateCelery(celery_task_id=result.task_id)

initialize(session=None)

Initialize a job.

Source code in oteapi/strategies/transformation/celery_remote.py
95
96
97
98
def initialize(self, session: "Optional[Dict[str, Any]]" = None) -> SessionUpdate:
    # pylint: disable=unused-argument
    """Initialize a job."""
    return SessionUpdate()

status(task_id)

Get job status.

Source code in oteapi/strategies/transformation/celery_remote.py
100
101
102
103
def status(self, task_id: str) -> TransformationStatus:
    """Get job status."""
    result = AsyncResult(id=task_id, app=CELERY_APP)
    return TransformationStatus(id=task_id, status=result.state)

CeleryStrategyConfig

Bases: TransformationConfig

Celery strategy-specific configuration.

Source code in oteapi/strategies/transformation/celery_remote.py
58
59
60
61
62
63
64
65
66
67
68
69
70
class CeleryStrategyConfig(TransformationConfig):
    """Celery strategy-specific configuration."""

    transformationType: str = Field(
        "celery/remote",
        const=True,
        description=TransformationConfig.__fields__[
            "transformationType"
        ].field_info.description,
    )
    configuration: CeleryConfig = Field(
        ..., description="Celery transformation strategy-specific configuration."
    )

SessionUpdateCelery

Bases: SessionUpdate

Class for returning values from a Celery task.

Source code in oteapi/strategies/transformation/celery_remote.py
52
53
54
55
class SessionUpdateCelery(SessionUpdate):
    """Class for returning values from a Celery task."""

    celery_task_id: str = Field(..., description="A Celery task identifier.")