celery_remote¶
Transformation Plugin that uses the Celery framework to call remote workers.
CeleryConfig (BaseModel)
pydantic-model
¶
Celery configuration.
Source code in oteapi/strategies/transformation/celery_remote.py
class CeleryConfig(BaseModel):
"""Celery configuration."""
taskName: str = Field(..., description="A task name.")
args: List[Any] = Field(..., description="List of arguments for the task.")
CeleryRemoteStrategy
dataclass
¶
Submit job to remote Celery runner.
Registers strategies:
("transformation_type", "celery/remote")
Source code in oteapi/strategies/transformation/celery_remote.py
@dataclass
@StrategyFactory.register(("transformation_type", "celery/remote"))
class CeleryRemoteStrategy:
"""Submit job to remote Celery runner.
**Registers strategies**:
- `("transformation_type", "celery/remote")`
"""
transformation_config: "TransformationConfig"
def run(self, session: "Optional[Dict[str, Any]]" = None) -> "Dict[str, Any]":
"""Run a job, return a job ID."""
config = self.transformation_config.configuration
celeryConfig = CeleryConfig() if config is None else CeleryConfig(**config)
result = app.send_task(celeryConfig.taskName, celeryConfig.args, kwargs=session)
return {"result": result.task_id}
def initialize(
self, session: "Optional[Dict[str, Any]]" = None
) -> "Dict[str, Any]":
"""Initialize a job."""
return {}
def status(self, task_id: str) -> TransformationStatus:
"""Get job status."""
result = AsyncResult(id=task_id, app=app)
return TransformationStatus(id=task_id, status=result.state)
def get(self, session: "Optional[Dict[str, Any]]" = None) -> "Dict[str, Any]":
"""Get transformation."""
# TODO: update and return global state # pylint: disable=fixme
return {}
get(self, session=None)
¶
Get transformation.
Source code in oteapi/strategies/transformation/celery_remote.py
def get(self, session: "Optional[Dict[str, Any]]" = None) -> "Dict[str, Any]":
"""Get transformation."""
# TODO: update and return global state # pylint: disable=fixme
return {}
initialize(self, session=None)
¶
Initialize a job.
Source code in oteapi/strategies/transformation/celery_remote.py
def initialize(
self, session: "Optional[Dict[str, Any]]" = None
) -> "Dict[str, Any]":
"""Initialize a job."""
return {}
run(self, session=None)
¶
Run a job, return a job ID.
Source code in oteapi/strategies/transformation/celery_remote.py
def run(self, session: "Optional[Dict[str, Any]]" = None) -> "Dict[str, Any]":
"""Run a job, return a job ID."""
config = self.transformation_config.configuration
celeryConfig = CeleryConfig() if config is None else CeleryConfig(**config)
result = app.send_task(celeryConfig.taskName, celeryConfig.args, kwargs=session)
return {"result": result.task_id}
status(self, task_id)
¶
Get job status.
Source code in oteapi/strategies/transformation/celery_remote.py
def status(self, task_id: str) -> TransformationStatus:
"""Get job status."""
result = AsyncResult(id=task_id, app=app)
return TransformationStatus(id=task_id, status=result.state)