Skip to content

celery_remote

Transformation Plugin that uses the Celery framework to call remote workers.

CeleryConfig (BaseModel) pydantic-model

Celery configuration.

Source code in oteapi/strategies/transformation/celery_remote.py
class CeleryConfig(BaseModel):
    """Celery configuration."""

    task_name: str = Field(..., description="A task name.")
    args: list = Field(..., description="List of arguments for the task.")

args: list pydantic-field required

List of arguments for the task.

task_name: str pydantic-field required

A task name.

CeleryRemoteStrategy dataclass

Submit job to remote Celery runner.

Registers strategies:

  • ("transformationType", "celery/remote")
Source code in oteapi/strategies/transformation/celery_remote.py
@dataclass
class CeleryRemoteStrategy:
    """Submit job to remote Celery runner.

    **Registers strategies**:

    - `("transformationType", "celery/remote")`

    """

    transformation_config: "TransformationConfig"

    def run(self, session: "Optional[Dict[str, Any]]" = None) -> TransformationStatus:
        """Run a job, return a job ID."""
        config = self.transformation_config.configuration
        celery_config = CeleryConfig() if config is None else CeleryConfig(**config)
        result: "Union[AsyncResult, Any]" = app.send_task(
            celery_config.task_name, celery_config.args, kwargs=session
        )
        status = AsyncResult(id=result.task_id, app=app)
        return TransformationStatus(id=result.task_id, status=status.status)

    def initialize(self, session: "Optional[Dict[str, Any]]" = None) -> SessionUpdate:
        """Initialize a job."""
        return SessionUpdate()

    def status(self, task_id: str) -> TransformationStatus:
        """Get job status."""
        result = AsyncResult(id=task_id, app=app)
        return TransformationStatus(id=task_id, status=result.state)

    def get(self, session: "Optional[Dict[str, Any]]" = None) -> "SessionUpdateCelery":
        """Get transformation."""
        # TODO: update and return global state  # pylint: disable=fixme
        return SessionUpdateCelery(data={})

get(self, session=None)

Get transformation.

Source code in oteapi/strategies/transformation/celery_remote.py
def get(self, session: "Optional[Dict[str, Any]]" = None) -> "SessionUpdateCelery":
    """Get transformation."""
    # TODO: update and return global state  # pylint: disable=fixme
    return SessionUpdateCelery(data={})

initialize(self, session=None)

Initialize a job.

Source code in oteapi/strategies/transformation/celery_remote.py
def initialize(self, session: "Optional[Dict[str, Any]]" = None) -> SessionUpdate:
    """Initialize a job."""
    return SessionUpdate()

run(self, session=None)

Run a job, return a job ID.

Source code in oteapi/strategies/transformation/celery_remote.py
def run(self, session: "Optional[Dict[str, Any]]" = None) -> TransformationStatus:
    """Run a job, return a job ID."""
    config = self.transformation_config.configuration
    celery_config = CeleryConfig() if config is None else CeleryConfig(**config)
    result: "Union[AsyncResult, Any]" = app.send_task(
        celery_config.task_name, celery_config.args, kwargs=session
    )
    status = AsyncResult(id=result.task_id, app=app)
    return TransformationStatus(id=result.task_id, status=status.status)

status(self, task_id)

Get job status.

Source code in oteapi/strategies/transformation/celery_remote.py
def status(self, task_id: str) -> TransformationStatus:
    """Get job status."""
    result = AsyncResult(id=task_id, app=app)
    return TransformationStatus(id=task_id, status=result.state)

SessionUpdateCelery (SessionUpdate) pydantic-model

Class for returning values from XLSXParse.

Source code in oteapi/strategies/transformation/celery_remote.py
class SessionUpdateCelery(SessionUpdate):
    """Class for returning values from XLSXParse."""

    data: Dict[str, list] = Field(
        ...,
        description="A dict with column-name/column-value pairs. The values are lists.",
    )

data: Dict[str, list] pydantic-field required

A dict with column-name/column-value pairs. The values are lists.

Back to top