celery_remote¶
Transformation Plugin that uses the Celery framework to call remote workers.
CeleryConfig (BaseModel)
pydantic-model
¶
Celery configuration.
Source code in oteapi/strategies/transformation/celery_remote.py
class CeleryConfig(BaseModel):
"""Celery configuration."""
task_name: str = Field(..., description="A task name.")
args: list = Field(..., description="List of arguments for the task.")
CeleryRemoteStrategy
dataclass
¶
Submit job to remote Celery runner.
Registers strategies:
("transformationType", "celery/remote")
Source code in oteapi/strategies/transformation/celery_remote.py
@dataclass
class CeleryRemoteStrategy:
"""Submit job to remote Celery runner.
**Registers strategies**:
- `("transformationType", "celery/remote")`
"""
transformation_config: "TransformationConfig"
def run(self, session: "Optional[Dict[str, Any]]" = None) -> TransformationStatus:
"""Run a job, return a job ID."""
config = self.transformation_config.configuration
celery_config = CeleryConfig() if config is None else CeleryConfig(**config)
result: "Union[AsyncResult, Any]" = app.send_task(
celery_config.task_name, celery_config.args, kwargs=session
)
status = AsyncResult(id=result.task_id, app=app)
return TransformationStatus(id=result.task_id, status=status.status)
def initialize(self, session: "Optional[Dict[str, Any]]" = None) -> SessionUpdate:
"""Initialize a job."""
return SessionUpdate()
def status(self, task_id: str) -> TransformationStatus:
"""Get job status."""
result = AsyncResult(id=task_id, app=app)
return TransformationStatus(id=task_id, status=result.state)
def get(self, session: "Optional[Dict[str, Any]]" = None) -> "SessionUpdateCelery":
"""Get transformation."""
# TODO: update and return global state # pylint: disable=fixme
return SessionUpdateCelery(data={})
get(self, session=None)
¶
Get transformation.
Source code in oteapi/strategies/transformation/celery_remote.py
def get(self, session: "Optional[Dict[str, Any]]" = None) -> "SessionUpdateCelery":
"""Get transformation."""
# TODO: update and return global state # pylint: disable=fixme
return SessionUpdateCelery(data={})
initialize(self, session=None)
¶
Initialize a job.
Source code in oteapi/strategies/transformation/celery_remote.py
def initialize(self, session: "Optional[Dict[str, Any]]" = None) -> SessionUpdate:
"""Initialize a job."""
return SessionUpdate()
run(self, session=None)
¶
Run a job, return a job ID.
Source code in oteapi/strategies/transformation/celery_remote.py
def run(self, session: "Optional[Dict[str, Any]]" = None) -> TransformationStatus:
"""Run a job, return a job ID."""
config = self.transformation_config.configuration
celery_config = CeleryConfig() if config is None else CeleryConfig(**config)
result: "Union[AsyncResult, Any]" = app.send_task(
celery_config.task_name, celery_config.args, kwargs=session
)
status = AsyncResult(id=result.task_id, app=app)
return TransformationStatus(id=result.task_id, status=status.status)
status(self, task_id)
¶
Get job status.
Source code in oteapi/strategies/transformation/celery_remote.py
def status(self, task_id: str) -> TransformationStatus:
"""Get job status."""
result = AsyncResult(id=task_id, app=app)
return TransformationStatus(id=task_id, status=result.state)
SessionUpdateCelery (SessionUpdate)
pydantic-model
¶
Class for returning values from XLSXParse.
Source code in oteapi/strategies/transformation/celery_remote.py
class SessionUpdateCelery(SessionUpdate):
"""Class for returning values from XLSXParse."""
data: Dict[str, list] = Field(
...,
description="A dict with column-name/column-value pairs. The values are lists.",
)
data: Dict[str, list]
pydantic-field
required
¶
A dict with column-name/column-value pairs. The values are lists.