Skip to content

celery_remote

Transformation Plugin that uses the Celery framework to call remote workers.

CELERY_APP = Celery(broker=f'redis://{REDIS_HOST}:{REDIS_PORT}', backend=f'redis://{REDIS_HOST}:{REDIS_PORT}') module-attribute

REDIS_HOST = os.getenv('OTEAPI_REDIS_HOST', 'redis') module-attribute

REDIS_PORT = int(os.getenv('OTEAPI_REDIS_PORT', '6379')) module-attribute

CeleryConfig

Bases: AttrDict

Celery configuration.

All fields here (including those added from the session through the get() method, as well as those added "anonymously") will be used as keyword arguments to the send_task() method for the Celery App.

Note

Using alias for the name field to favor populating it with task_name arguments, since this is the "original" field name. I.e., this is done for backwards compatibility.

Special pydantic configuration settings:

  • populate_by_name Allow populating CeleryConfig.name using name as well as task_name.
Source code in oteapi/strategies/transformation/celery_remote.py
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
class CeleryConfig(AttrDict):
    """Celery configuration.

    All fields here (including those added from the session through the `get()` method,
    as well as those added "anonymously") will be used as keyword arguments to the
    `send_task()` method for the Celery App.

    Note:
        Using `alias` for the `name` field to favor populating it with `task_name`
        arguments, since this is the "original" field name. I.e., this is done for
        backwards compatibility.

    Special pydantic configuration settings:

    - **`populate_by_name`**
      Allow populating CeleryConfig.name using `name` as well as `task_name`.

    """

    model_config = ConfigDict(populate_by_name=True)

    name: str = Field(..., description="A task name.", alias="task_name")
    args: list = Field(..., description="List of arguments for the task.")

args: list = Field(..., description='List of arguments for the task.') class-attribute instance-attribute

model_config = ConfigDict(populate_by_name=True) class-attribute instance-attribute

name: str = Field(..., description='A task name.', alias='task_name') class-attribute instance-attribute

CeleryContent

Bases: AttrDict

Class for returning values from a Celery task.

Source code in oteapi/strategies/transformation/celery_remote.py
60
61
62
63
class CeleryContent(AttrDict):
    """Class for returning values from a Celery task."""

    celery_task_id: str = Field(..., description="A Celery task identifier.")

celery_task_id: str = Field(..., description='A Celery task identifier.') class-attribute instance-attribute

CeleryRemoteStrategy

Submit job to remote Celery runner.

Registers strategies:

  • ("transformationType", "celery/remote")
Source code in oteapi/strategies/transformation/celery_remote.py
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
@dataclass
class CeleryRemoteStrategy:
    """Submit job to remote Celery runner.

    **Registers strategies**:

    - `("transformationType", "celery/remote")`

    """

    transformation_config: CeleryStrategyConfig

    def get(self) -> CeleryContent:
        """Run a job, return a job ID."""

        result: Union[AsyncResult, Any] = CELERY_APP.send_task(
            **self.transformation_config.configuration.model_dump()
        )
        return CeleryContent(celery_task_id=result.task_id)

    def initialize(self) -> AttrDict:
        """Initialize a job."""
        return AttrDict()

    def status(self, task_id: str) -> TransformationStatus:
        """Get job status."""
        result = AsyncResult(id=task_id, app=CELERY_APP)
        return TransformationStatus(id=task_id, status=result.state)

transformation_config: CeleryStrategyConfig instance-attribute

get()

Run a job, return a job ID.

Source code in oteapi/strategies/transformation/celery_remote.py
90
91
92
93
94
95
96
def get(self) -> CeleryContent:
    """Run a job, return a job ID."""

    result: Union[AsyncResult, Any] = CELERY_APP.send_task(
        **self.transformation_config.configuration.model_dump()
    )
    return CeleryContent(celery_task_id=result.task_id)

initialize()

Initialize a job.

Source code in oteapi/strategies/transformation/celery_remote.py
 98
 99
100
def initialize(self) -> AttrDict:
    """Initialize a job."""
    return AttrDict()

status(task_id)

Get job status.

Source code in oteapi/strategies/transformation/celery_remote.py
102
103
104
105
def status(self, task_id: str) -> TransformationStatus:
    """Get job status."""
    result = AsyncResult(id=task_id, app=CELERY_APP)
    return TransformationStatus(id=task_id, status=result.state)

CeleryStrategyConfig

Bases: TransformationConfig

Celery strategy-specific configuration.

Source code in oteapi/strategies/transformation/celery_remote.py
66
67
68
69
70
71
72
73
74
75
class CeleryStrategyConfig(TransformationConfig):
    """Celery strategy-specific configuration."""

    transformationType: Literal["celery/remote"] = Field(
        "celery/remote",
        description=TransformationConfig.model_fields["transformationType"].description,
    )
    configuration: CeleryConfig = Field(
        ..., description="Celery transformation strategy-specific configuration."
    )

configuration: CeleryConfig = Field(..., description='Celery transformation strategy-specific configuration.') class-attribute instance-attribute

transformationType: Literal['celery/remote'] = Field('celery/remote', description=TransformationConfig.model_fields['transformationType'].description) class-attribute instance-attribute