celery_remote¶
Transformation Plugin that uses the Celery framework to call remote workers.
        
CeleryConfig            (BaseModel)
        
  
      pydantic-model
  
¶
    Celery configuration.
Source code in oteapi/strategies/transformation/celery_remote.py
          class CeleryConfig(BaseModel):
    """Celery configuration."""
    taskName: str = Field(..., description="A task name.")
    args: List[Any] = Field(..., description="List of arguments for the task.")
        
CeleryRemoteStrategy        
  
      dataclass
  
¶
    Submit job to remote Celery runner.
Registers strategies:
("transformation_type", "celery/remote")
Source code in oteapi/strategies/transformation/celery_remote.py
          @dataclass
@StrategyFactory.register(("transformation_type", "celery/remote"))
class CeleryRemoteStrategy:
    """Submit job to remote Celery runner.
    **Registers strategies**:
    - `("transformation_type", "celery/remote")`
    """
    transformation_config: "TransformationConfig"
    def run(self, session: "Optional[Dict[str, Any]]" = None) -> "Dict[str, Any]":
        """Run a job, return a job ID."""
        config = self.transformation_config.configuration
        celeryConfig = CeleryConfig() if config is None else CeleryConfig(**config)
        result = app.send_task(celeryConfig.taskName, celeryConfig.args, kwargs=session)
        return {"result": result.task_id}
    def initialize(
        self, session: "Optional[Dict[str, Any]]" = None
    ) -> "Dict[str, Any]":
        """Initialize a job."""
        return {}
    def status(self, task_id: str) -> TransformationStatus:
        """Get job status."""
        result = AsyncResult(id=task_id, app=app)
        return TransformationStatus(id=task_id, status=result.state)
    def get(self, session: "Optional[Dict[str, Any]]" = None) -> "Dict[str, Any]":
        """Get transformation."""
        # TODO: update and return global state  # pylint: disable=fixme
        return {}
get(self, session=None)
¶
    Get transformation.
Source code in oteapi/strategies/transformation/celery_remote.py
          def get(self, session: "Optional[Dict[str, Any]]" = None) -> "Dict[str, Any]":
    """Get transformation."""
    # TODO: update and return global state  # pylint: disable=fixme
    return {}
initialize(self, session=None)
¶
    Initialize a job.
Source code in oteapi/strategies/transformation/celery_remote.py
          def initialize(
    self, session: "Optional[Dict[str, Any]]" = None
) -> "Dict[str, Any]":
    """Initialize a job."""
    return {}
run(self, session=None)
¶
    Run a job, return a job ID.
Source code in oteapi/strategies/transformation/celery_remote.py
          def run(self, session: "Optional[Dict[str, Any]]" = None) -> "Dict[str, Any]":
    """Run a job, return a job ID."""
    config = self.transformation_config.configuration
    celeryConfig = CeleryConfig() if config is None else CeleryConfig(**config)
    result = app.send_task(celeryConfig.taskName, celeryConfig.args, kwargs=session)
    return {"result": result.task_id}
status(self, task_id)
¶
    Get job status.
Source code in oteapi/strategies/transformation/celery_remote.py
          def status(self, task_id: str) -> TransformationStatus:
    """Get job status."""
    result = AsyncResult(id=task_id, app=app)
    return TransformationStatus(id=task_id, status=result.state)