Skip to content

Launcher

executor.engine.launcher.core.launcher(func=None, engine=None, async_mode=False, job_type='process', name=None, description=None, tags=None, job_attrs=None)

Create a launcher for a function.

Parameters:

Name Type Description Default
func Optional[Union[Callable, Cmd2Func]]

The function to be launched. If the function is instance of Cmd2Func, the launcher will be in subprocess mode, will launch SubprocessJob on each submit.

None
engine Optional[Engine]

The engine to use. If not specified, the default engine will be used.

None
async_mode bool

If True, the launcher will be AsyncLauncher.

False
job_type JOB_TYPES

The job type to use. Default is 'process'.

'process'
name Optional[str]

The name of the launcher.

None
description Optional[str]

The description of the launcher.

None
tags Optional[List[str]]

The tags of the launcher.

None
job_attrs Optional[dict]

The attributes for creating the job.

None
Source code in executor/engine/launcher/core.py
def launcher(
        func: T.Optional[T.Union[T.Callable, Cmd2Func]] = None,
        engine: T.Optional['Engine'] = None,
        async_mode: bool = False,
        job_type: JOB_TYPES = 'process',
        name: T.Optional[str] = None,
        description: T.Optional[str] = None,
        tags: T.Optional[T.List[str]] = None,
        job_attrs: T.Optional[dict] = None):
    """Create a launcher for a function.

    Args:
        func: The function to be launched. If the function is instance of
            [Cmd2Func](https://github.com/Nanguage/cmd2func),
            the launcher will be in subprocess mode,
            will launch `SubprocessJob` on each submit.
        engine: The engine to use. If not specified, the default engine
            will be used.
        async_mode: If True, the launcher will be AsyncLauncher.
        job_type: The job type to use. Default is 'process'.
        name: The name of the launcher.
        description: The description of the launcher.
        tags: The tags of the launcher.
        job_attrs: The attributes for creating the job.
    """
    if func is None:
        return functools.partial(
            launcher, engine=engine, async_mode=async_mode,
            job_type=job_type, name=name,
            description=description, tags=tags,
            job_attrs=job_attrs
        )

    launcher_cls: T.Union[T.Type[AsyncLauncher], T.Type[SyncLauncher]]
    if async_mode:
        launcher_cls = AsyncLauncher
    else:
        launcher_cls = SyncLauncher

    return launcher_cls(
        func, engine, job_type,
        name, description, tags, job_attrs,
    )

executor.engine.launcher.core.LauncherBase

Bases: object

Source code in executor/engine/launcher/core.py
class LauncherBase(object):
    def __init__(
            self, target_func: T.Union[T.Callable, Cmd2Func],
            engine: T.Optional['Engine'] = None,
            job_type: JOB_TYPES = 'process',
            name: T.Optional[str] = None,
            description: T.Optional[str] = None,
            tags: T.Optional[T.List[str]] = None,
            job_attrs: T.Optional[dict] = None,):
        self._engine = engine
        self.target_func = target_func
        self.__signature__ = inspect.signature(target_func)
        self.job_type = job_type
        if isinstance(target_func, Cmd2Func):
            if self.job_type != 'webapp':
                self.job_type = 'subprocess'
        self.desc = parse_func(target_func)
        self.name = name or get_callable_name(target_func)
        self.description = description or self.target_func.__doc__
        functools.update_wrapper(self, target_func)
        self.tags = tags or []
        job_attrs = job_attrs or {}
        self.job_attrs = job_attrs
        self.job_attrs.update({
            'name': self.name,
        })

    @property
    def engine(self) -> Engine:
        """Get the engine of the launcher."""
        if self._engine is None:
            self._engine = get_default_engine()
        return self._engine

    @engine.setter
    def engine(self, engine: Engine):
        """Set the engine of the launcher."""
        self._engine = engine

    def create_job(self, args: tuple, kwargs: dict, **attrs) -> 'Job':
        """Create a job from the launcher."""
        job_attrs = copy(self.job_attrs)
        job_attrs.update(attrs)
        job_class = job_type_classes[self.job_type]
        if isinstance(self.target_func, Cmd2Func):
            assert job_class in (SubprocessJob, WebappJob)
            cmd_or_gen = self.target_func.get_cmd_str(*args, **kwargs)
            if isinstance(cmd_or_gen, str):
                cmd = cmd_or_gen
            else:
                cmd = next(cmd_or_gen)
            job = job_class(cmd, **job_attrs)  # type: ignore
        else:
            if job_class is WebappJob:
                kwargs = copy(kwargs)
                kwargs.update(job_attrs)
                job = job_class(self.target_func, *args, **kwargs)
            else:
                job = job_class(
                    self.target_func, args, kwargs,
                    **job_attrs
                )
        return job

    @staticmethod
    def _fetch_result(job: 'Job') -> T.Any:
        if job.status == "failed":
            raise job.exception()
        elif job.status == "cancelled":
            raise RuntimeError("Job cancelled")
        else:
            return job.result()

    def __call__(
            self, *args: T.Any, **kwargs: T.Any) -> T.Any:  # pragma: no cover
        raise NotImplementedError("Subclasses must implement __call__")

engine property writable

Get the engine of the launcher.

create_job(args, kwargs, **attrs)

Create a job from the launcher.

Source code in executor/engine/launcher/core.py
def create_job(self, args: tuple, kwargs: dict, **attrs) -> 'Job':
    """Create a job from the launcher."""
    job_attrs = copy(self.job_attrs)
    job_attrs.update(attrs)
    job_class = job_type_classes[self.job_type]
    if isinstance(self.target_func, Cmd2Func):
        assert job_class in (SubprocessJob, WebappJob)
        cmd_or_gen = self.target_func.get_cmd_str(*args, **kwargs)
        if isinstance(cmd_or_gen, str):
            cmd = cmd_or_gen
        else:
            cmd = next(cmd_or_gen)
        job = job_class(cmd, **job_attrs)  # type: ignore
    else:
        if job_class is WebappJob:
            kwargs = copy(kwargs)
            kwargs.update(job_attrs)
            job = job_class(self.target_func, *args, **kwargs)
        else:
            job = job_class(
                self.target_func, args, kwargs,
                **job_attrs
            )
    return job

executor.engine.launcher.core.SyncLauncher

Bases: LauncherBase

Source code in executor/engine/launcher/core.py
class SyncLauncher(LauncherBase):

    @property
    def async_mode(self):
        """Check if the launcher is in async mode."""
        return False

    def submit(self, *args, **kwargs) -> Job:
        """Submit a job to the engine."""
        job = self.create_job(args, kwargs)
        self.engine.submit(job)
        return job

    def __call__(self, *args, **kwargs) -> T.Any:
        """Submit a job to the engine and wait for the result."""
        job = self.submit(*args, **kwargs)
        self.engine.wait_job(job)
        return self._fetch_result(job)

    def to_async(self) -> "AsyncLauncher":
        """Convert the launcher to async mode."""
        return AsyncLauncher(
            self.target_func, self._engine, self.job_type,
            self.name, self.description, self.tags,
            job_attrs=self.job_attrs,
        )

async_mode property

Check if the launcher is in async mode.

__call__(*args, **kwargs)

Submit a job to the engine and wait for the result.

Source code in executor/engine/launcher/core.py
def __call__(self, *args, **kwargs) -> T.Any:
    """Submit a job to the engine and wait for the result."""
    job = self.submit(*args, **kwargs)
    self.engine.wait_job(job)
    return self._fetch_result(job)

submit(*args, **kwargs)

Submit a job to the engine.

Source code in executor/engine/launcher/core.py
def submit(self, *args, **kwargs) -> Job:
    """Submit a job to the engine."""
    job = self.create_job(args, kwargs)
    self.engine.submit(job)
    return job

to_async()

Convert the launcher to async mode.

Source code in executor/engine/launcher/core.py
def to_async(self) -> "AsyncLauncher":
    """Convert the launcher to async mode."""
    return AsyncLauncher(
        self.target_func, self._engine, self.job_type,
        self.name, self.description, self.tags,
        job_attrs=self.job_attrs,
    )

executor.engine.launcher.core.AsyncLauncher

Bases: LauncherBase

Source code in executor/engine/launcher/core.py
class AsyncLauncher(LauncherBase):
    @property
    def async_mode(self):
        """Check if the launcher is in async mode."""
        return True

    async def submit(self, *args, **kwargs):
        """Submit a job to the engine."""
        job = self.create_job(args, kwargs)
        await self.engine.submit_async(job)
        return job

    async def __call__(self, *args, **kwargs) -> T.Any:
        """Submit a job to the engine and wait for the result."""
        job = await self.submit(*args, **kwargs)
        await job.join()
        return self._fetch_result(job)

    def to_sync(self) -> "SyncLauncher":
        """Convert the launcher to sync mode."""
        return SyncLauncher(
            self.target_func, self._engine, self.job_type,
            self.name, self.description, self.tags,
            job_attrs=self.job_attrs,
        )

async_mode property

Check if the launcher is in async mode.

__call__(*args, **kwargs) async

Submit a job to the engine and wait for the result.

Source code in executor/engine/launcher/core.py
async def __call__(self, *args, **kwargs) -> T.Any:
    """Submit a job to the engine and wait for the result."""
    job = await self.submit(*args, **kwargs)
    await job.join()
    return self._fetch_result(job)

submit(*args, **kwargs) async

Submit a job to the engine.

Source code in executor/engine/launcher/core.py
async def submit(self, *args, **kwargs):
    """Submit a job to the engine."""
    job = self.create_job(args, kwargs)
    await self.engine.submit_async(job)
    return job

to_sync()

Convert the launcher to sync mode.

Source code in executor/engine/launcher/core.py
def to_sync(self) -> "SyncLauncher":
    """Convert the launcher to sync mode."""
    return SyncLauncher(
        self.target_func, self._engine, self.job_type,
        self.name, self.description, self.tags,
        job_attrs=self.job_attrs,
    )

executor.engine.launcher.core.get_default_engine()

Get the default engine.

Source code in executor/engine/launcher/core.py
def get_default_engine() -> Engine:
    """Get the default engine."""
    global _engine
    if _engine is None:
        _engine = Engine()
    return _engine

executor.engine.launcher.core.set_default_engine(engine)

Set the default engine.

Source code in executor/engine/launcher/core.py
def set_default_engine(engine: Engine):
    """Set the default engine."""
    global _engine
    _engine = engine