Skip to content

celery_remote

Transformation Plugin that uses the Celery framework to call remote workers.

CeleryConfig

Bases: AttrDict

Celery configuration.

Source code in oteapi/strategies/transformation/celery_remote.py
25
26
27
28
29
class CeleryConfig(AttrDict):
    """Celery configuration."""

    task_name: str = Field(..., description="A task name.")
    args: list = Field(..., description="List of arguments for the task.")

CeleryRemoteStrategy

Submit job to remote Celery runner.

Registers strategies:

  • ("transformationType", "celery/remote")
Source code in oteapi/strategies/transformation/celery_remote.py
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
@dataclass
class CeleryRemoteStrategy:
    """Submit job to remote Celery runner.

    **Registers strategies**:

    - `("transformationType", "celery/remote")`

    """

    transformation_config: CeleryStrategyConfig

    def run(self, session: "Optional[Dict[str, Any]]" = None) -> TransformationStatus:
        """Run a job, return a job ID."""
        if session:
            self._use_session(session)
            celery_kwargs = session.copy()
            for field in CeleryConfig.__fields__:
                celery_kwargs.pop(field, None)

        result: "Union[AsyncResult, Any]" = CELERY.send_task(
            name=self.transformation_config.configuration.task_name,
            args=self.transformation_config.configuration.args,
            kwargs=celery_kwargs,
        )
        return self.status(result.task_id)

    def initialize(self, session: "Optional[Dict[str, Any]]" = None) -> SessionUpdate:
        """Initialize a job."""
        return SessionUpdate()

    def status(self, task_id: str) -> TransformationStatus:
        """Get job status."""
        result = AsyncResult(id=task_id, app=CELERY)
        return TransformationStatus(id=task_id, status=result.state)

    def get(self, session: "Optional[Dict[str, Any]]" = None) -> SessionUpdateCelery:
        """Get transformation."""
        # TODO: update and return global state  # pylint: disable=fixme
        return SessionUpdateCelery(data={})

    def _use_session(self, session: "Dict[str, Any]") -> None:
        """Update the configuration with values from the sesssion."""
        for field in CeleryConfig.__fields__:
            if field in session:
                setattr(
                    self.transformation_config.configuration,
                    field,
                    session[field],
                )

get(session=None)

Get transformation.

Source code in oteapi/strategies/transformation/celery_remote.py
92
93
94
95
def get(self, session: "Optional[Dict[str, Any]]" = None) -> SessionUpdateCelery:
    """Get transformation."""
    # TODO: update and return global state  # pylint: disable=fixme
    return SessionUpdateCelery(data={})

initialize(session=None)

Initialize a job.

Source code in oteapi/strategies/transformation/celery_remote.py
83
84
85
def initialize(self, session: "Optional[Dict[str, Any]]" = None) -> SessionUpdate:
    """Initialize a job."""
    return SessionUpdate()

run(session=None)

Run a job, return a job ID.

Source code in oteapi/strategies/transformation/celery_remote.py
68
69
70
71
72
73
74
75
76
77
78
79
80
81
def run(self, session: "Optional[Dict[str, Any]]" = None) -> TransformationStatus:
    """Run a job, return a job ID."""
    if session:
        self._use_session(session)
        celery_kwargs = session.copy()
        for field in CeleryConfig.__fields__:
            celery_kwargs.pop(field, None)

    result: "Union[AsyncResult, Any]" = CELERY.send_task(
        name=self.transformation_config.configuration.task_name,
        args=self.transformation_config.configuration.args,
        kwargs=celery_kwargs,
    )
    return self.status(result.task_id)

status(task_id)

Get job status.

Source code in oteapi/strategies/transformation/celery_remote.py
87
88
89
90
def status(self, task_id: str) -> TransformationStatus:
    """Get job status."""
    result = AsyncResult(id=task_id, app=CELERY)
    return TransformationStatus(id=task_id, status=result.state)

CeleryStrategyConfig

Bases: TransformationConfig

Celery strategy-specific configuration.

Source code in oteapi/strategies/transformation/celery_remote.py
41
42
43
44
45
46
47
48
49
50
51
52
53
class CeleryStrategyConfig(TransformationConfig):
    """Celery strategy-specific configuration."""

    transformationType: str = Field(
        "celery/remote",
        const=True,
        description=TransformationConfig.__fields__[
            "transformationType"
        ].field_info.description,
    )
    configuration: CeleryConfig = Field(
        ..., description="Celery transformation strategy-specific configuration."
    )

SessionUpdateCelery

Bases: SessionUpdate

Class for returning values from XLSXParse.

Source code in oteapi/strategies/transformation/celery_remote.py
32
33
34
35
36
37
38
class SessionUpdateCelery(SessionUpdate):
    """Class for returning values from XLSXParse."""

    data: Dict[str, list] = Field(
        ...,
        description="A dict with column-name/column-value pairs. The values are lists.",
    )