Skip to content

Extend job types

executor.engine.job.extend.SubprocessJob(cmd, record_cmd=True, base_class=ThreadJob, callback=None, error_callback=None, retries=0, retry_time_delta=0.0, name=None, condition=None, wait_time_delta=0.01, redirect_out_err=False, target_dir='$current_dir', popen_kwargs=None, **attrs)

Create a job that runs a subprocess.

Parameters:

Name Type Description Default
cmd str

The command to run.

required
record_cmd bool

Whether to record the command to a file.

True
base_class Type[Job]

The base class of the job.

ThreadJob
callback Optional[Callable[[Any], None]]

The callback function.

None
error_callback Optional[Callable[[Exception], None]]

The error callback function.

None
retries int

The number of retries.

0
retry_time_delta float

The time delta between retries.

0.0
name Optional[str]

The name of the job.

None
condition Optional[Condition]

The condition of the job.

None
wait_time_delta float

The time delta between each check.

0.01
redirect_out_err bool

Whether to redirect stdout and stderr to files.

False
target_dir str

The target directory path for run the command. Use '$cache_dir' to represent the cache directory of the job. Use '$current_dir' to represent the current directory of the job. Default is '$current_dir'.

'$current_dir'
popen_kwargs Optional[Dict[str, Any]]

The keyword arguments for subprocess.Popen.

None
**attrs

Other attributes of the job.

{}
Source code in executor/engine/job/extend/subprocess.py
def SubprocessJob(
    cmd: str, record_cmd: bool = True,
    base_class: T.Type[Job] = ThreadJob,
    callback: T.Optional[T.Callable[[T.Any], None]] = None,
    error_callback: T.Optional[T.Callable[[Exception], None]] = None,
    retries: int = 0,
    retry_time_delta: float = 0.0,
    name: T.Optional[str] = None,
    condition: T.Optional[Condition] = None,
    wait_time_delta: float = 0.01,
    redirect_out_err: bool = False,
    target_dir: str = "$current_dir",
    popen_kwargs: T.Optional[T.Dict[str, T.Any]] = None,
    **attrs
):
    """Create a job that runs a subprocess.

    Args:
        cmd: The command to run.
        record_cmd: Whether to record the command to a file.
        base_class: The base class of the job.
        callback: The callback function.
        error_callback: The error callback function.
        retries: The number of retries.
        retry_time_delta: The time delta between retries.
        name: The name of the job.
        condition: The condition of the job.
        wait_time_delta: The time delta between each check.
        redirect_out_err: Whether to redirect stdout and stderr to files.
        target_dir: The target directory path for run the command.
            Use '$cache_dir' to represent the cache directory of the job.
            Use '$current_dir' to represent the current directory of the job.
            Default is '$current_dir'.
        popen_kwargs: The keyword arguments for subprocess.Popen.
        **attrs: Other attributes of the job.
    """
    class _SubprocessJob(base_class):  # type: ignore
        cmd: str
        target_dir: str

        repr_attrs = [
            ('status', lambda self: self.status),
            ('id', lambda self: self.id),
            ('cmd', lambda self: self.cmd),
            ('target_dir', lambda self: self.target_dir),
            ('base_class', lambda _: base_class.__name__),
            ("condition", lambda self: self.condition),
            ("retry_remain", lambda self: self.retry_remain),
        ]

        def __init__(self) -> None:
            self.cmd = cmd
            self.record_cmd = record_cmd
            nonlocal name
            if name is None:
                name = cmd.split()[0]
            self.target_dir = target_dir
            attrs.update({
                'cmd': cmd,
                'target_dir': target_dir,
            })
            super().__init__(
                lambda x: x,
                callback=callback,
                error_callback=error_callback,
                retries=retries,
                retry_time_delta=retry_time_delta,
                name=name,
                condition=condition,
                wait_time_delta=wait_time_delta,
                redirect_out_err=redirect_out_err,
                **attrs
            )
            self.runner: T.Optional[ProcessRunner] = None

        def resolve_target_dir(self, target_dir: str) -> str:
            if target_dir == "$cache_dir":
                return self.cache_dir.resolve().as_posix()
            elif target_dir == "$current_dir":
                return Path.cwd().resolve().as_posix()
            else:
                return Path(target_dir).resolve().as_posix()

        def process_func(self):
            cmd = copy.copy(self.cmd)
            record_cmd = copy.copy(self.record_cmd)
            target_dir = self.resolve_target_dir(self.target_dir)
            self.attrs.update({
                'target_dir': target_dir,
            })
            self.target_dir = target_dir

            cache_dir = self.cache_dir.resolve()
            path_sh = cache_dir / 'command.sh'

            def record_command():
                with open(path_sh, 'w') as f:
                    f.write(cmd + "\n")

            pkwargs = popen_kwargs or {}
            pkwargs['cwd'] = target_dir

            if self.redirect_out_err is not False:
                if isinstance(self.redirect_out_err, str):
                    path_stdout = Path(self.redirect_out_err)
                    if not path_stdout.parent.exists():
                        path_stdout.parent.mkdir(parents=True, exist_ok=True)
                    path_stdout = self.redirect_out_err
                    path_stderr = self.redirect_out_err
                else:
                    path_stdout = cache_dir / 'stdout.txt'
                    path_stderr = cache_dir / 'stderr.txt'

                def _run_cmd(runner: ProcessRunner):  # pragma: no cover
                    runner.run(**pkwargs)
                    if path_stdout == path_stderr:
                        fo = open(path_stdout, 'a')
                        fe = fo
                    else:
                        fo = open(path_stdout, 'a')
                        fe = open(path_stderr, 'a')
                    retcode = runner.write_stream_until_stop(
                        fo, fe, flush_streams_each_time=True)
                    fo.close()
                    if path_stdout != path_stderr:
                        fe.close()
                    return retcode
            else:
                def _run_cmd(runner: ProcessRunner):
                    runner.run(
                        capture_stdout=False,
                        capture_stderr=False,
                        **pkwargs
                    )
                    retcode = runner.proc.wait()
                    return retcode

            if base_class is ThreadJob:
                runner = ProcessRunner(cmd)
                self.runner = runner

                def run_cmd():
                    return _run_cmd(runner)
            else:
                def run_cmd():
                    runner = ProcessRunner(cmd)
                    return _run_cmd(runner)

            def func():
                if record_cmd:
                    record_command()
                retcode = run_cmd()
                if retcode > 0:
                    raise subp.SubprocessError(
                        f"Command '{cmd}' run failed, return code: {retcode}")
                return retcode
            self.func = func

        async def cancel(self):
            if self.runner is not None:
                self.runner.proc.terminate()
            await super().cancel()

    return _SubprocessJob()

executor.engine.job.extend.WebappJob(web_launcher, ip='127.0.0.1', port=None, base_class=ProcessJob, check_times=6, check_delta=1.0, callback=None, error_callback=None, retries=0, retry_time_delta=0.0, name=None, condition=None, wait_time_delta=0.01, redirect_out_err=False, **attrs)

Create a job that runs a web app.

Parameters:

Name Type Description Default
web_launcher Union[LauncherFunc, str]

The function to launch the web app. The function should accept two arguments: ip and port.

required
ip str

The ip address of the web app.

'127.0.0.1'
port Optional[int]

The port of the web app. If None, will find a free port.

None
base_class Type[Job]

The base class of the job.

ProcessJob
check_times int

The number of times to check the web app.

6
check_delta float

The time delta between each check.

1.0
callback Optional[Callable[[Any], None]]

The callback function.

None
error_callback Optional[Callable[[Exception], None]]

The error callback function.

None
retries int

The number of retries.

0
retry_time_delta float

The time delta between retries.

0.0
name Optional[str]

The name of the job.

None
condition Optional[Condition]

The condition of the job.

None
wait_time_delta float

The time delta between each check.

0.01
redirect_out_err bool

Whether to redirect stdout and stderr to files.

False
**attrs

Other attributes of the job.

{}
Source code in executor/engine/job/extend/webapp.py
def WebappJob(
    web_launcher: T.Union[LauncherFunc, str],
    ip: str = "127.0.0.1", port: T.Optional[int] = None,
    base_class: T.Type[Job] = ProcessJob,
    check_times: int = 6,
    check_delta: float = 1.0,
    callback: T.Optional[T.Callable[[T.Any], None]] = None,
    error_callback: T.Optional[T.Callable[[Exception], None]] = None,
    retries: int = 0,
    retry_time_delta: float = 0.0,
    name: T.Optional[str] = None,
    condition: T.Optional[Condition] = None,
    wait_time_delta: float = 0.01,
    redirect_out_err: bool = False,
    **attrs
):
    """Create a job that runs a web app.

    Args:
        web_launcher: The function to launch the web app.
            The function should accept two arguments: ip and port.
        ip: The ip address of the web app.
        port: The port of the web app. If None, will find a free port.
        base_class: The base class of the job.
        check_times: The number of times to check the web app.
        check_delta: The time delta between each check.
        callback: The callback function.
        error_callback: The error callback function.
        retries: The number of retries.
        retry_time_delta: The time delta between retries.
        name: The name of the job.
        condition: The condition of the job.
        wait_time_delta: The time delta between each check.
        redirect_out_err: Whether to redirect stdout and stderr to files.
        **attrs: Other attributes of the job.
    """
    class _WebappJob(base_class):  # type: ignore
        ip: str
        port: T.Optional[int]

        repr_attrs = [
            ('status', lambda self: self.status),
            ('id', lambda self: self.id),
            ('addr', lambda self: f'{self.ip}:{self.port}'),
            ('base_class', lambda _: base_class.__name__),
            ("condition", lambda self: self.condition),
            ("retry_remain", lambda self: self.retry_remain),
        ]

        def __init__(self) -> None:
            self.ip = ip
            if ip not in ("127.0.0.1", "localhost", "0.0.0.0"):
                raise NotImplementedError(
                    "WebappJob now only support launch in local mechine.")
            self.port = port
            self.check_web_launcher(web_launcher)
            self.web_launcher = web_launcher
            self.check_times = check_times
            self.check_delta = check_delta
            if isinstance(web_launcher, str):
                attrs.update({"cmd": web_launcher})
            super().__init__(
                lambda x: x, callback=callback,
                error_callback=error_callback,
                retries=retries,
                retry_time_delta=retry_time_delta,
                name=name,
                condition=condition,
                wait_time_delta=wait_time_delta,
                redirect_out_err=redirect_out_err,
                **attrs
            )

        @staticmethod
        def check_web_launcher(web_launcher: T.Union[LauncherFunc, str]):
            if isinstance(web_launcher, str):
                cmd_temp = web_launcher
                if ('{ip}' not in cmd_temp) or ('{port}' not in cmd_temp):
                    raise ValueError(
                        "web_launcher should has ip and port placeholder.")
            else:
                if not callable(web_launcher):
                    raise TypeError(
                        "web_launcher should be a callable object or str.")

        def consume_resource(self) -> bool:
            if super().consume_resource():
                if self.port is None:
                    self.port = PortManager.get_port()
                else:
                    PortManager.consume_port(self.port)
                self.attrs.update({"address": f"{self.ip}:{self.port}"})
                return True
            else:  # pragma: no cover
                return False

        def release_resource(self) -> bool:
            if self.port is None:  # pragma: no cover
                return False
            if super().release_resource():
                PortManager.release_port(self.port)
                return True
            else:  # pragma: no cover
                return False

        def process_func(self):
            web_launcher = copy.copy(self.web_launcher)
            if self.port is None:  # pragma: no cover
                raise ExecutorError("Unreachable code.")
            ip, port = copy.copy(self.ip), copy.copy(self.port)
            check_times = copy.copy(self.check_times)
            check_delta = copy.copy(self.check_delta)

            def check_port(pid: int) -> bool:  # pragma: no cover
                for _ in range(check_times):
                    time.sleep(check_delta)
                    if PortManager.process_has_port(pid, ip, port):
                        return True
                    print(f"Process is not listen on {ip}:{port}. Try again.")
                return False

            if isinstance(web_launcher, str):
                cmd = web_launcher.format(ip=ip, port=port)

                def func():  # pragma: no cover
                    runner = ProcessRunner(cmd)
                    runner.run()
                    if check_port(runner.proc.pid):
                        retcode = runner.write_stream_until_stop(
                            sys.stdout, sys.stderr)
                        sys.exit(retcode)
                    else:
                        runner.proc.terminate()
                        raise IOError(f"Process is not listen on {ip}:{port}.")
            else:
                def func():  # pragma: no cover
                    proc = LokyProcess(target=web_launcher, args=(ip, port))
                    proc.start()
                    pid = proc.pid
                    if check_port(pid):
                        proc.join()
                    else:
                        proc.terminate()
                        raise IOError(f"Process is not listen on {ip}:{port}.")

            self.func = func
            super().process_func()
    return _WebappJob()

executor.engine.job.extend.SentinelJob(func, sentinel_condition, job_type='process', time_delta=0.01, sentinel_attrs=None, **attrs)

Submit a job when the sentinel condition is met.

Parameters:

Name Type Description Default
func Callable

The function to be executed.

required
sentinel_condition Condition

The sentinel condition.

required
job_type Union[str, Type[Job]]

The type of the job.

'process'
time_delta float

The time delta between each check of the sentinel condition.

0.01
sentinel_attrs Optional[dict]

The attributes of the sentinel job.

None
**attrs

The attributes of the job.

{}
Source code in executor/engine/job/extend/sentinel.py
def SentinelJob(
        func: T.Callable,
        sentinel_condition: Condition,
        job_type: T.Union[str, T.Type[Job]] = "process",
        time_delta: float = 0.01,
        sentinel_attrs: T.Optional[dict] = None,
        **attrs
        ):
    """Submit a job when the sentinel condition is met.

    Args:
        func: The function to be executed.
        sentinel_condition: The sentinel condition.
        job_type: The type of the job.
        time_delta: The time delta between each check
            of the sentinel condition.
        sentinel_attrs: The attributes of the sentinel job.
        **attrs: The attributes of the job.
    """
    sentinel_attrs = sentinel_attrs or {}
    if "name" not in sentinel_attrs:
        sentinel_attrs["name"] = f"sentinel-{func.__name__}"

    base_class: T.Type[Job]
    if isinstance(job_type, str):
        if job_type == "process":
            base_class = ProcessJob
        elif job_type == "thread":
            base_class = ThreadJob
        else:
            base_class = LocalJob
    else:
        base_class = job_type

    async def sentinel(__engine__: "Engine"):
        while True:
            if sentinel_condition.satisfy(__engine__):
                job = base_class(func, **attrs)
                await __engine__.submit_async(job)
            await asyncio.sleep(time_delta)

    sentinel_job = LocalJob(
        sentinel,
        **sentinel_attrs
    )
    return sentinel_job

executor.engine.job.extend.CronJob(func, time_condition, job_type='process', time_delta=0.01, sentinel_attrs=None, **attrs)

Submit a job periodically.

Parameters:

Name Type Description Default
func Callable

The function to be executed.

required
time_period

The time period.

required
job_type Union[str, Type[Job]]

The type of the job.

'process'
time_delta float

The time delta between each check of the sentinel condition.

0.01
sentinel_attrs Optional[dict]

The attributes of the sentinel job.

None
**attrs

The attributes of the job.

{}
Source code in executor/engine/job/extend/cron.py
def CronJob(
        func: T.Callable,
        time_condition: TimeCondition,
        job_type: T.Union[str, T.Type[Job]] = "process",
        time_delta: float = 0.01,
        sentinel_attrs: T.Optional[dict] = None,
        **attrs
        ):
    """Submit a job periodically.

    Args:
        func: The function to be executed.
        time_period: The time period.
        job_type: The type of the job.
        time_delta: The time delta between each
            check of the sentinel condition.
        sentinel_attrs: The attributes of the sentinel job.
        **attrs: The attributes of the job.
    """
    sentinel_attrs = sentinel_attrs or {}
    if "name" not in sentinel_attrs:
        sentinel_attrs["name"] = f"cron-sentinel-{func.__name__}"
    sentinel_job = SentinelJob(
        func,
        time_condition,
        job_type,
        time_delta,
        sentinel_attrs,
        **attrs
    )
    return sentinel_job