Skip to content

celery_remote

Transformation Plugin that uses the Celery framework to call remote workers.

CeleryConfig (BaseModel) pydantic-model

Celery configuration.

Source code in oteapi/strategies/transformation/celery_remote.py
class CeleryConfig(BaseModel):
    """Celery configuration."""

    taskName: str = Field(..., description="A task name.")
    args: List[Any] = Field(..., description="List of arguments for the task.")

args: List[Any] pydantic-field required

List of arguments for the task.

taskName: str pydantic-field required

A task name.

CeleryRemoteStrategy dataclass

Submit job to remote Celery runner.

Registers strategies:

  • ("transformation_type", "celery/remote")
Source code in oteapi/strategies/transformation/celery_remote.py
@dataclass
@StrategyFactory.register(("transformation_type", "celery/remote"))
class CeleryRemoteStrategy:
    """Submit job to remote Celery runner.

    **Registers strategies**:

    - `("transformation_type", "celery/remote")`

    """

    transformation_config: "TransformationConfig"

    def run(self, session: "Optional[Dict[str, Any]]" = None) -> "Dict[str, Any]":
        """Run a job, return a job ID."""
        config = self.transformation_config.configuration
        celeryConfig = CeleryConfig() if config is None else CeleryConfig(**config)
        result = app.send_task(celeryConfig.taskName, celeryConfig.args, kwargs=session)
        return {"result": result.task_id}

    def initialize(
        self, session: "Optional[Dict[str, Any]]" = None
    ) -> "Dict[str, Any]":
        """Initialize a job."""
        return {}

    def status(self, task_id: str) -> TransformationStatus:
        """Get job status."""
        result = AsyncResult(id=task_id, app=app)
        return TransformationStatus(id=task_id, status=result.state)

    def get(self, session: "Optional[Dict[str, Any]]" = None) -> "Dict[str, Any]":
        """Get transformation."""
        # TODO: update and return global state  # pylint: disable=fixme
        return {}

get(self, session=None)

Get transformation.

Source code in oteapi/strategies/transformation/celery_remote.py
def get(self, session: "Optional[Dict[str, Any]]" = None) -> "Dict[str, Any]":
    """Get transformation."""
    # TODO: update and return global state  # pylint: disable=fixme
    return {}

initialize(self, session=None)

Initialize a job.

Source code in oteapi/strategies/transformation/celery_remote.py
def initialize(
    self, session: "Optional[Dict[str, Any]]" = None
) -> "Dict[str, Any]":
    """Initialize a job."""
    return {}

run(self, session=None)

Run a job, return a job ID.

Source code in oteapi/strategies/transformation/celery_remote.py
def run(self, session: "Optional[Dict[str, Any]]" = None) -> "Dict[str, Any]":
    """Run a job, return a job ID."""
    config = self.transformation_config.configuration
    celeryConfig = CeleryConfig() if config is None else CeleryConfig(**config)
    result = app.send_task(celeryConfig.taskName, celeryConfig.args, kwargs=session)
    return {"result": result.task_id}

status(self, task_id)

Get job status.

Source code in oteapi/strategies/transformation/celery_remote.py
def status(self, task_id: str) -> TransformationStatus:
    """Get job status."""
    result = AsyncResult(id=task_id, app=app)
    return TransformationStatus(id=task_id, status=result.state)
Back to top