Skip to content

Job

executor.engine.job.base.Job

Bases: ExecutorObj

Source code in executor/engine/job/base.py
class Job(ExecutorObj):

    status = JobStatusAttr()
    repr_attrs: T.List[T.Tuple[str, T.Callable[["Job"], T.Any]]] = [
        ("status", lambda self: self.status),
        ("id", lambda self: self.id),
        ("func", lambda self: get_callable_name(self.func)),
        ("condition", lambda self: self.condition),
        ("retry_remain", lambda self: self.retry_remain),
    ]

    func: T.Callable
    condition: T.Optional[Condition]
    retry_remain: int

    def __init__(
            self,
            func: T.Callable,
            args: T.Optional[tuple] = None,
            kwargs: T.Optional[dict] = None,
            callback: T.Optional[T.Callable[[T.Any], None]] = None,
            error_callback: T.Optional[T.Callable[[Exception], None]] = None,
            retries: int = 0,
            retry_time_delta: float = 0.0,
            name: T.Optional[str] = None,
            condition: T.Optional[Condition] = None,
            wait_time_delta: float = 0.01,
            redirect_out_err: T.Union[bool, str] = False,
            change_dir: bool = False,
            **attrs
            ) -> None:
        """Base class for job.

        Args:
            func: The function to be executed.
            args: The positional arguments to be passed to the function.
            kwargs: The keyword arguments to be passed to the function.
            callback: The callback function to be called when the job is done.
            error_callback: The callback function to be called
                when the job is failed.
            retries: The number of retries when the job is failed.
            retry_time_delta: The time delta between retries.
            name: The name of the job.
            condition: The condition of the job.
            wait_time_delta: The time delta between each
                check of the condition.
            redirect_out_err: Whether to redirect the stdout
                and stderr to the log. If is a string, it will be
                used as the path of the log file.
            change_dir: Whether to change the working directory
                to the log directory.
            **attrs: The attributes of the job.
        """
        super().__init__()
        self.future = JobFuture(self.id)
        self.func = func
        self.args = args or tuple()
        self.kwargs = kwargs or {}
        if callback is not None:
            self.future.add_done_callback(callback)
        if error_callback is not None:
            self.future.add_error_callback(error_callback)
        self.retries = retries
        self.retry_remain = retries
        self.retry_time_delta = retry_time_delta
        self.engine: T.Optional["Engine"] = None
        self._status: str = "created"
        self.name = name or func.__name__
        self.attrs = attrs
        self.task: T.Optional[asyncio.Task] = None
        self.condition = condition
        self.wait_time_delta = wait_time_delta
        self.redirect_out_err = redirect_out_err
        self.change_dir = change_dir
        self.created_time: datetime = datetime.now()
        self.submit_time: T.Optional[datetime] = None
        self.stoped_time: T.Optional[datetime] = None
        self._executor: T.Any = None
        self.dep_job_ids: T.List[str] = []

    def __repr__(self) -> str:
        attrs = []
        for attr_name, attr_func in self.repr_attrs:
            attr_value = attr_func(self)
            if bool(attr_value) is False:
                continue
            attrs.append(f"{attr_name}={attr_value}")
        attr_str = " ".join(attrs)
        return f"<{self.__class__.__name__} {attr_str}>"

    def __str__(self) -> str:
        return repr(self)

    def has_resource(self) -> bool:
        """Check if the job has resource to run."""
        if self.engine is None:
            return False
        else:
            return self.engine.resource.n_job > 0

    def consume_resource(self) -> bool:
        """Consume the resource of the job."""
        if self.engine is None:
            return False
        else:
            self.engine.resource.n_job -= 1
            return True

    def release_resource(self) -> bool:
        """Release the resource of the job."""
        if self.engine is None:
            return False
        else:
            self.engine.resource.n_job += 1
            return True

    def resolve_dependencies(self) -> None:
        """Resolve args and kwargs
        and auto specify the condition."""
        dep_jobs_ids: T.List[str] = []
        args = itertools.chain(self.args, self.kwargs.values())
        for arg in args:
            if isinstance(arg, JobFuture):
                dep_jobs_ids.append(arg.job_id)
        if len(dep_jobs_ids) > 0:
            after_others = AfterOthers(dep_jobs_ids)
            if self.condition is None:
                self.condition = after_others
            else:
                self.condition = AllSatisfied([self.condition, after_others])
        self.dep_job_ids = dep_jobs_ids

    async def _resolve_arg(self, arg: T.Union[JobFuture, T.Any]) -> T.Any:
        if isinstance(arg, JobFuture):
            assert self.engine is not None
            job = self.engine.jobs.get_job_by_id(arg.job_id)
            if job.status == "done":
                return job.result()
            elif job.status == "failed":
                msg = f"Job {self} cancelled because " \
                      f"of upstream job {job} failed."
                logger.warning(msg)
                await self.cancel()
                raise StopResolve(msg)
            elif job.status == "cancelled":
                msg = f"Job {self} cancelled because " \
                      f"of upstream job {job} cancelled."
                logger.warning(msg)
                await self.cancel()
                raise StopResolve(msg)
            else:  # pragma: no cover
                raise ExecutorError("Unreachable code.")
        else:
            return arg

    async def resolve_args(self):
        """Resolve args and kwargs."""
        if len(self.dep_job_ids) > 0:
            args = []
            kwargs = {}
            for arg in self.args:
                resolved = await self._resolve_arg(arg)
                args.append(resolved)
            for key, value in self.kwargs.items():
                resolved = await self._resolve_arg(value)
                kwargs[key] = resolved
            self.args = tuple(args)
            self.kwargs = kwargs

    def runnable(self) -> bool:
        """Check if the job is runnable.
        Job is runnable if:
        1. engine is not None.
        2. condition is None and condition is satisfied.
        3. has resource."""
        if self.engine is None:
            return False
        if self.condition is not None:
            return self.condition.satisfy(self.engine) and self.has_resource()
        else:
            return self.has_resource()

    async def emit(self) -> asyncio.Task:
        """Emit the job to the engine."""
        logger.info(f"Emit job {self}, watting for run.")
        if self.status != 'pending':
            raise InvalidStateError(self, ['pending'])
        self.resolve_dependencies()
        self.submit_time = datetime.now()
        loop = asyncio.get_running_loop()
        task = loop.create_task(self.wait_and_run())
        self.task = task
        return task

    def process_func(self):
        """Process(decorate) the target func, before run.
        For example, let the func
        change the dir, redirect the stdout and stderr
        before the actual run."""
        cache_dir = self.cache_dir.resolve()
        is_redirect = (self.redirect_out_err is not False)
        if is_redirect and (not isinstance(self.func, CaptureOut)):
            if isinstance(self.redirect_out_err, str):
                path_stdout = Path(self.redirect_out_err)
                path_stderr = Path(self.redirect_out_err)
            else:
                path_stdout = cache_dir / 'stdout.txt'
                path_stderr = cache_dir / 'stderr.txt'
            self.func = CaptureOut(self.func, path_stdout, path_stderr)
        if self.change_dir:
            self.func = ChDir(self.func, cache_dir)

    async def wait_and_run(self):
        """Wait for the condition satisfied and run the job."""
        while True:
            if self.runnable() and self.consume_resource():
                logger.info(f"Start run job {self}")
                self.process_func()
                try:
                    await self.resolve_args()
                except StopResolve:
                    break
                self.status = "running"
                try:
                    res = await self.run()
                    if not isinstance(res, GeneratorWrapper):
                        await self.on_done(res)
                    else:
                        self.future.set_result(res)
                    return res
                except Exception as e:
                    await self.on_failed(e)
                    break
            else:
                await asyncio.sleep(self.wait_time_delta)

    async def run(self):
        """Run the job."""
        if inspect.isgeneratorfunction(self.func) or inspect.isasyncgenfunction(self.func):  # noqa: E501
            return await self.run_generator()
        else:
            return await self.run_function()

    async def run_function(self):  # pragma: no cover
        """Run the job as a function."""
        msg = f"{type(self).__name__} does not implement " \
              "run_function method."
        raise NotImplementedError(msg)

    async def run_generator(self):  # pragma: no cover
        """Run the job as a generator."""
        msg = f"{type(self).__name__} does not implement " \
              "run_generator method."
        raise NotImplementedError(msg)

    async def rerun(self, check_status: bool = True):
        """Rerun the job."""
        _valid_status: T.List[JobStatusType] = ["cancelled", "done", "failed"]
        if check_status and (self.status not in _valid_status):
            raise InvalidStateError(self, _valid_status)
        logger.info(f"Rerun job {self}")
        self.status = "pending"
        await self.emit()

    def _on_finish(self, new_status: JobStatusType = "done"):
        self.status = new_status
        self.release_resource()

    async def on_done(self, res):
        """Callback when the job is done."""
        logger.info(f"Job {self} done.")
        self.future.set_result(res)
        for callback in self.future.done_callbacks:
            if inspect.iscoroutinefunction(callback):
                await callback(res)
            else:
                callback(res)
        self._on_finish("done")

    async def on_failed(self, e: Exception):
        """Callback when the job is failed."""
        logger.error(f"Job {self} failed: {repr(e)}")
        assert self.engine is not None
        if self.engine.print_traceback:
            logger.exception(e)
        self.future.set_exception(e)
        for err_callback in self.future.error_callbacks:
            if inspect.iscoroutinefunction(err_callback):
                await err_callback(e)
            else:
                err_callback(e)
        if self.retry_remain > 0:
            self._on_finish("pending")
            self.retry_remain -= 1
            await asyncio.sleep(self.retry_time_delta)
            await self.rerun(check_status=False)
        else:
            self._on_finish("failed")

    async def cancel(self):
        """Cancel the job."""
        logger.info(f"Cancel job {self}.")
        self.task.cancel()
        if self.status == "running":
            try:
                self.clear_context()
            except Exception as e:  # pragma: no cover
                print(str(e))
            finally:
                self._on_finish("cancelled")
        elif self.status == "pending":
            self.status = "cancelled"

    def clear_context(self):
        """Clear the context of the job."""
        pass

    def result(self) -> T.Any:
        """Get the result of the job."""
        res = self.future.result()
        if not isinstance(res, GeneratorWrapper):
            if self.status != "done":
                raise InvalidStateError(self, ['done'])
        return res

    def exception(self):
        """Get the exception of the job."""
        return self.future.exception()

    def serialization(self) -> bytes:
        """Serialize the job to bytes."""
        job = copy(self)
        job.task = None
        job.engine = None
        job._executor = None
        bytes_ = cloudpickle.dumps(job)
        return bytes_

    @staticmethod
    def deserialization(bytes_: bytes) -> "Job":
        """Deserialize the job from bytes."""
        job: "Job" = cloudpickle.loads(bytes_)
        return job

    async def join(self, timeout: T.Optional[float] = None):
        """Wait for the job done."""
        if self.task is None:
            raise InvalidStateError(self, valid_job_statuses)
        await asyncio.wait([self.task], timeout=timeout)

    async def wait_until(
        self, check_func: T.Callable[["Job"], bool],
        timeout: T.Optional[float] = None
    ):
        """Wait until the check_func return True."""
        total_time = 0.0
        while not check_func(self):
            await asyncio.sleep(self.wait_time_delta)
            total_time += self.wait_time_delta
            if (timeout is not None) and total_time > timeout:
                raise asyncio.TimeoutError

    async def wait_until_status(
            self, status: JobStatusType,
            timeout: T.Optional[float] = None):
        """Wait until the job is in the target status."""
        await self.wait_until(lambda job: job.status == status, timeout)

    @property
    def cache_dir(self) -> T.Optional[Path]:
        """Get the cache dir of the job."""
        if self.engine is None:
            return None
        parent = self.engine.cache_dir
        path = parent / self.id
        path.mkdir(parents=True, exist_ok=True)
        return path

cache_dir property

Get the cache dir of the job.

__init__(func, args=None, kwargs=None, callback=None, error_callback=None, retries=0, retry_time_delta=0.0, name=None, condition=None, wait_time_delta=0.01, redirect_out_err=False, change_dir=False, **attrs)

Base class for job.

Parameters:

Name Type Description Default
func Callable

The function to be executed.

required
args Optional[tuple]

The positional arguments to be passed to the function.

None
kwargs Optional[dict]

The keyword arguments to be passed to the function.

None
callback Optional[Callable[[Any], None]]

The callback function to be called when the job is done.

None
error_callback Optional[Callable[[Exception], None]]

The callback function to be called when the job is failed.

None
retries int

The number of retries when the job is failed.

0
retry_time_delta float

The time delta between retries.

0.0
name Optional[str]

The name of the job.

None
condition Optional[Condition]

The condition of the job.

None
wait_time_delta float

The time delta between each check of the condition.

0.01
redirect_out_err Union[bool, str]

Whether to redirect the stdout and stderr to the log. If is a string, it will be used as the path of the log file.

False
change_dir bool

Whether to change the working directory to the log directory.

False
**attrs

The attributes of the job.

{}
Source code in executor/engine/job/base.py
def __init__(
        self,
        func: T.Callable,
        args: T.Optional[tuple] = None,
        kwargs: T.Optional[dict] = None,
        callback: T.Optional[T.Callable[[T.Any], None]] = None,
        error_callback: T.Optional[T.Callable[[Exception], None]] = None,
        retries: int = 0,
        retry_time_delta: float = 0.0,
        name: T.Optional[str] = None,
        condition: T.Optional[Condition] = None,
        wait_time_delta: float = 0.01,
        redirect_out_err: T.Union[bool, str] = False,
        change_dir: bool = False,
        **attrs
        ) -> None:
    """Base class for job.

    Args:
        func: The function to be executed.
        args: The positional arguments to be passed to the function.
        kwargs: The keyword arguments to be passed to the function.
        callback: The callback function to be called when the job is done.
        error_callback: The callback function to be called
            when the job is failed.
        retries: The number of retries when the job is failed.
        retry_time_delta: The time delta between retries.
        name: The name of the job.
        condition: The condition of the job.
        wait_time_delta: The time delta between each
            check of the condition.
        redirect_out_err: Whether to redirect the stdout
            and stderr to the log. If is a string, it will be
            used as the path of the log file.
        change_dir: Whether to change the working directory
            to the log directory.
        **attrs: The attributes of the job.
    """
    super().__init__()
    self.future = JobFuture(self.id)
    self.func = func
    self.args = args or tuple()
    self.kwargs = kwargs or {}
    if callback is not None:
        self.future.add_done_callback(callback)
    if error_callback is not None:
        self.future.add_error_callback(error_callback)
    self.retries = retries
    self.retry_remain = retries
    self.retry_time_delta = retry_time_delta
    self.engine: T.Optional["Engine"] = None
    self._status: str = "created"
    self.name = name or func.__name__
    self.attrs = attrs
    self.task: T.Optional[asyncio.Task] = None
    self.condition = condition
    self.wait_time_delta = wait_time_delta
    self.redirect_out_err = redirect_out_err
    self.change_dir = change_dir
    self.created_time: datetime = datetime.now()
    self.submit_time: T.Optional[datetime] = None
    self.stoped_time: T.Optional[datetime] = None
    self._executor: T.Any = None
    self.dep_job_ids: T.List[str] = []

cancel() async

Cancel the job.

Source code in executor/engine/job/base.py
async def cancel(self):
    """Cancel the job."""
    logger.info(f"Cancel job {self}.")
    self.task.cancel()
    if self.status == "running":
        try:
            self.clear_context()
        except Exception as e:  # pragma: no cover
            print(str(e))
        finally:
            self._on_finish("cancelled")
    elif self.status == "pending":
        self.status = "cancelled"

clear_context()

Clear the context of the job.

Source code in executor/engine/job/base.py
def clear_context(self):
    """Clear the context of the job."""
    pass

consume_resource()

Consume the resource of the job.

Source code in executor/engine/job/base.py
def consume_resource(self) -> bool:
    """Consume the resource of the job."""
    if self.engine is None:
        return False
    else:
        self.engine.resource.n_job -= 1
        return True

deserialization(bytes_) staticmethod

Deserialize the job from bytes.

Source code in executor/engine/job/base.py
@staticmethod
def deserialization(bytes_: bytes) -> "Job":
    """Deserialize the job from bytes."""
    job: "Job" = cloudpickle.loads(bytes_)
    return job

emit() async

Emit the job to the engine.

Source code in executor/engine/job/base.py
async def emit(self) -> asyncio.Task:
    """Emit the job to the engine."""
    logger.info(f"Emit job {self}, watting for run.")
    if self.status != 'pending':
        raise InvalidStateError(self, ['pending'])
    self.resolve_dependencies()
    self.submit_time = datetime.now()
    loop = asyncio.get_running_loop()
    task = loop.create_task(self.wait_and_run())
    self.task = task
    return task

exception()

Get the exception of the job.

Source code in executor/engine/job/base.py
def exception(self):
    """Get the exception of the job."""
    return self.future.exception()

has_resource()

Check if the job has resource to run.

Source code in executor/engine/job/base.py
def has_resource(self) -> bool:
    """Check if the job has resource to run."""
    if self.engine is None:
        return False
    else:
        return self.engine.resource.n_job > 0

join(timeout=None) async

Wait for the job done.

Source code in executor/engine/job/base.py
async def join(self, timeout: T.Optional[float] = None):
    """Wait for the job done."""
    if self.task is None:
        raise InvalidStateError(self, valid_job_statuses)
    await asyncio.wait([self.task], timeout=timeout)

on_done(res) async

Callback when the job is done.

Source code in executor/engine/job/base.py
async def on_done(self, res):
    """Callback when the job is done."""
    logger.info(f"Job {self} done.")
    self.future.set_result(res)
    for callback in self.future.done_callbacks:
        if inspect.iscoroutinefunction(callback):
            await callback(res)
        else:
            callback(res)
    self._on_finish("done")

on_failed(e) async

Callback when the job is failed.

Source code in executor/engine/job/base.py
async def on_failed(self, e: Exception):
    """Callback when the job is failed."""
    logger.error(f"Job {self} failed: {repr(e)}")
    assert self.engine is not None
    if self.engine.print_traceback:
        logger.exception(e)
    self.future.set_exception(e)
    for err_callback in self.future.error_callbacks:
        if inspect.iscoroutinefunction(err_callback):
            await err_callback(e)
        else:
            err_callback(e)
    if self.retry_remain > 0:
        self._on_finish("pending")
        self.retry_remain -= 1
        await asyncio.sleep(self.retry_time_delta)
        await self.rerun(check_status=False)
    else:
        self._on_finish("failed")

process_func()

Process(decorate) the target func, before run. For example, let the func change the dir, redirect the stdout and stderr before the actual run.

Source code in executor/engine/job/base.py
def process_func(self):
    """Process(decorate) the target func, before run.
    For example, let the func
    change the dir, redirect the stdout and stderr
    before the actual run."""
    cache_dir = self.cache_dir.resolve()
    is_redirect = (self.redirect_out_err is not False)
    if is_redirect and (not isinstance(self.func, CaptureOut)):
        if isinstance(self.redirect_out_err, str):
            path_stdout = Path(self.redirect_out_err)
            path_stderr = Path(self.redirect_out_err)
        else:
            path_stdout = cache_dir / 'stdout.txt'
            path_stderr = cache_dir / 'stderr.txt'
        self.func = CaptureOut(self.func, path_stdout, path_stderr)
    if self.change_dir:
        self.func = ChDir(self.func, cache_dir)

release_resource()

Release the resource of the job.

Source code in executor/engine/job/base.py
def release_resource(self) -> bool:
    """Release the resource of the job."""
    if self.engine is None:
        return False
    else:
        self.engine.resource.n_job += 1
        return True

rerun(check_status=True) async

Rerun the job.

Source code in executor/engine/job/base.py
async def rerun(self, check_status: bool = True):
    """Rerun the job."""
    _valid_status: T.List[JobStatusType] = ["cancelled", "done", "failed"]
    if check_status and (self.status not in _valid_status):
        raise InvalidStateError(self, _valid_status)
    logger.info(f"Rerun job {self}")
    self.status = "pending"
    await self.emit()

resolve_args() async

Resolve args and kwargs.

Source code in executor/engine/job/base.py
async def resolve_args(self):
    """Resolve args and kwargs."""
    if len(self.dep_job_ids) > 0:
        args = []
        kwargs = {}
        for arg in self.args:
            resolved = await self._resolve_arg(arg)
            args.append(resolved)
        for key, value in self.kwargs.items():
            resolved = await self._resolve_arg(value)
            kwargs[key] = resolved
        self.args = tuple(args)
        self.kwargs = kwargs

resolve_dependencies()

Resolve args and kwargs and auto specify the condition.

Source code in executor/engine/job/base.py
def resolve_dependencies(self) -> None:
    """Resolve args and kwargs
    and auto specify the condition."""
    dep_jobs_ids: T.List[str] = []
    args = itertools.chain(self.args, self.kwargs.values())
    for arg in args:
        if isinstance(arg, JobFuture):
            dep_jobs_ids.append(arg.job_id)
    if len(dep_jobs_ids) > 0:
        after_others = AfterOthers(dep_jobs_ids)
        if self.condition is None:
            self.condition = after_others
        else:
            self.condition = AllSatisfied([self.condition, after_others])
    self.dep_job_ids = dep_jobs_ids

result()

Get the result of the job.

Source code in executor/engine/job/base.py
def result(self) -> T.Any:
    """Get the result of the job."""
    res = self.future.result()
    if not isinstance(res, GeneratorWrapper):
        if self.status != "done":
            raise InvalidStateError(self, ['done'])
    return res

run() async

Run the job.

Source code in executor/engine/job/base.py
async def run(self):
    """Run the job."""
    if inspect.isgeneratorfunction(self.func) or inspect.isasyncgenfunction(self.func):  # noqa: E501
        return await self.run_generator()
    else:
        return await self.run_function()

run_function() async

Run the job as a function.

Source code in executor/engine/job/base.py
async def run_function(self):  # pragma: no cover
    """Run the job as a function."""
    msg = f"{type(self).__name__} does not implement " \
          "run_function method."
    raise NotImplementedError(msg)

run_generator() async

Run the job as a generator.

Source code in executor/engine/job/base.py
async def run_generator(self):  # pragma: no cover
    """Run the job as a generator."""
    msg = f"{type(self).__name__} does not implement " \
          "run_generator method."
    raise NotImplementedError(msg)

runnable()

Check if the job is runnable. Job is runnable if: 1. engine is not None. 2. condition is None and condition is satisfied. 3. has resource.

Source code in executor/engine/job/base.py
def runnable(self) -> bool:
    """Check if the job is runnable.
    Job is runnable if:
    1. engine is not None.
    2. condition is None and condition is satisfied.
    3. has resource."""
    if self.engine is None:
        return False
    if self.condition is not None:
        return self.condition.satisfy(self.engine) and self.has_resource()
    else:
        return self.has_resource()

serialization()

Serialize the job to bytes.

Source code in executor/engine/job/base.py
def serialization(self) -> bytes:
    """Serialize the job to bytes."""
    job = copy(self)
    job.task = None
    job.engine = None
    job._executor = None
    bytes_ = cloudpickle.dumps(job)
    return bytes_

wait_and_run() async

Wait for the condition satisfied and run the job.

Source code in executor/engine/job/base.py
async def wait_and_run(self):
    """Wait for the condition satisfied and run the job."""
    while True:
        if self.runnable() and self.consume_resource():
            logger.info(f"Start run job {self}")
            self.process_func()
            try:
                await self.resolve_args()
            except StopResolve:
                break
            self.status = "running"
            try:
                res = await self.run()
                if not isinstance(res, GeneratorWrapper):
                    await self.on_done(res)
                else:
                    self.future.set_result(res)
                return res
            except Exception as e:
                await self.on_failed(e)
                break
        else:
            await asyncio.sleep(self.wait_time_delta)

wait_until(check_func, timeout=None) async

Wait until the check_func return True.

Source code in executor/engine/job/base.py
async def wait_until(
    self, check_func: T.Callable[["Job"], bool],
    timeout: T.Optional[float] = None
):
    """Wait until the check_func return True."""
    total_time = 0.0
    while not check_func(self):
        await asyncio.sleep(self.wait_time_delta)
        total_time += self.wait_time_delta
        if (timeout is not None) and total_time > timeout:
            raise asyncio.TimeoutError

wait_until_status(status, timeout=None) async

Wait until the job is in the target status.

Source code in executor/engine/job/base.py
async def wait_until_status(
        self, status: JobStatusType,
        timeout: T.Optional[float] = None):
    """Wait until the job is in the target status."""
    await self.wait_until(lambda job: job.status == status, timeout)

executor.engine.job.process.ProcessJob

Bases: Job

Job that runs in a process.

Source code in executor/engine/job/process.py
class ProcessJob(Job):
    """Job that runs in a process."""""

    def has_resource(self) -> bool:
        """Check if the job has enough resource to run."""
        if self.engine is None:
            return False
        else:
            return (
                super().has_resource() and
                (self.engine.resource.n_process > 0)
            )

    def consume_resource(self) -> bool:
        """Consume resource for the job."""
        if self.engine is None:
            return False
        else:
            self.engine.resource.n_process -= 1
            return (
                super().consume_resource() and
                True
            )

    def release_resource(self) -> bool:
        """Release resource for the job."""
        if self.engine is None:
            return False
        else:
            self.engine.resource.n_process += 1
            return (
                super().release_resource() and
                True
            )

    async def run_function(self):
        """Run job in process pool."""
        func = functools.partial(self.func, *self.args, **self.kwargs)
        if iscoroutinefunction(func):
            func = functools.partial(run_async_func, func)
        self._executor = ProcessPoolExecutor(1)
        loop = asyncio.get_running_loop()
        fut = loop.run_in_executor(self._executor, func)
        result = await fut
        return result

    async def run_generator(self):
        """Run job as a generator."""
        func = functools.partial(self.func, *self.args, **self.kwargs)
        self._executor = ProcessPoolExecutor(
            1, initializer=_gen_initializer, initargs=(func,))
        result = create_generator_wrapper(self)
        return result

    async def cancel(self):
        """Cancel job."""
        if self.status == "running":
            self._executor.shutdown(wait=True, kill_workers=True)
        await super().cancel()

    def clear_context(self):
        """Clear context."""
        self._executor.shutdown(wait=True, kill_workers=True)
        self._executor = None

cancel() async

Cancel job.

Source code in executor/engine/job/process.py
async def cancel(self):
    """Cancel job."""
    if self.status == "running":
        self._executor.shutdown(wait=True, kill_workers=True)
    await super().cancel()

clear_context()

Clear context.

Source code in executor/engine/job/process.py
def clear_context(self):
    """Clear context."""
    self._executor.shutdown(wait=True, kill_workers=True)
    self._executor = None

consume_resource()

Consume resource for the job.

Source code in executor/engine/job/process.py
def consume_resource(self) -> bool:
    """Consume resource for the job."""
    if self.engine is None:
        return False
    else:
        self.engine.resource.n_process -= 1
        return (
            super().consume_resource() and
            True
        )

has_resource()

Check if the job has enough resource to run.

Source code in executor/engine/job/process.py
def has_resource(self) -> bool:
    """Check if the job has enough resource to run."""
    if self.engine is None:
        return False
    else:
        return (
            super().has_resource() and
            (self.engine.resource.n_process > 0)
        )

release_resource()

Release resource for the job.

Source code in executor/engine/job/process.py
def release_resource(self) -> bool:
    """Release resource for the job."""
    if self.engine is None:
        return False
    else:
        self.engine.resource.n_process += 1
        return (
            super().release_resource() and
            True
        )

run_function() async

Run job in process pool.

Source code in executor/engine/job/process.py
async def run_function(self):
    """Run job in process pool."""
    func = functools.partial(self.func, *self.args, **self.kwargs)
    if iscoroutinefunction(func):
        func = functools.partial(run_async_func, func)
    self._executor = ProcessPoolExecutor(1)
    loop = asyncio.get_running_loop()
    fut = loop.run_in_executor(self._executor, func)
    result = await fut
    return result

run_generator() async

Run job as a generator.

Source code in executor/engine/job/process.py
async def run_generator(self):
    """Run job as a generator."""
    func = functools.partial(self.func, *self.args, **self.kwargs)
    self._executor = ProcessPoolExecutor(
        1, initializer=_gen_initializer, initargs=(func,))
    result = create_generator_wrapper(self)
    return result

executor.engine.job.thread.ThreadJob

Bases: Job

Job that runs in a thread.

Source code in executor/engine/job/thread.py
class ThreadJob(Job):
    """Job that runs in a thread."""

    def has_resource(self) -> bool:
        """Check if the job has enough resource to run."""
        if self.engine is None:
            return False
        else:
            return (
                super().has_resource() and
                (self.engine.resource.n_thread > 0)
            )

    def consume_resource(self) -> bool:
        """Consume resource for the job."""
        if self.engine is None:
            return False
        else:
            self.engine.resource.n_thread -= 1
            return (
                super().consume_resource() and
                True
            )

    def release_resource(self) -> bool:
        """Release resource for the job."""
        if self.engine is None:
            return False
        else:
            self.engine.resource.n_thread += 1
            return (
                super().release_resource() and
                True
            )

    async def run_function(self):
        """Run job in thread pool."""
        func = functools.partial(self.func, *self.args, **self.kwargs)
        if iscoroutinefunction(func):
            func = functools.partial(run_async_func, func)
        self._executor = ThreadPoolExecutor(1)
        loop = asyncio.get_running_loop()
        fut = loop.run_in_executor(self._executor, func)
        result = await fut
        return result

    async def run_generator(self):
        """Run job as a generator."""
        func = functools.partial(self.func, *self.args, **self.kwargs)
        self._executor = ThreadPoolExecutor(
            1, initializer=_gen_initializer, initargs=(func,))
        result = create_generator_wrapper(self)
        return result

    async def cancel(self):
        """Cancel job."""
        if self.status == "running":
            self._executor.shutdown()
        await super().cancel()

    def clear_context(self):
        """Clear context."""
        self._executor.shutdown()
        self._executor = None

cancel() async

Cancel job.

Source code in executor/engine/job/thread.py
async def cancel(self):
    """Cancel job."""
    if self.status == "running":
        self._executor.shutdown()
    await super().cancel()

clear_context()

Clear context.

Source code in executor/engine/job/thread.py
def clear_context(self):
    """Clear context."""
    self._executor.shutdown()
    self._executor = None

consume_resource()

Consume resource for the job.

Source code in executor/engine/job/thread.py
def consume_resource(self) -> bool:
    """Consume resource for the job."""
    if self.engine is None:
        return False
    else:
        self.engine.resource.n_thread -= 1
        return (
            super().consume_resource() and
            True
        )

has_resource()

Check if the job has enough resource to run.

Source code in executor/engine/job/thread.py
def has_resource(self) -> bool:
    """Check if the job has enough resource to run."""
    if self.engine is None:
        return False
    else:
        return (
            super().has_resource() and
            (self.engine.resource.n_thread > 0)
        )

release_resource()

Release resource for the job.

Source code in executor/engine/job/thread.py
def release_resource(self) -> bool:
    """Release resource for the job."""
    if self.engine is None:
        return False
    else:
        self.engine.resource.n_thread += 1
        return (
            super().release_resource() and
            True
        )

run_function() async

Run job in thread pool.

Source code in executor/engine/job/thread.py
async def run_function(self):
    """Run job in thread pool."""
    func = functools.partial(self.func, *self.args, **self.kwargs)
    if iscoroutinefunction(func):
        func = functools.partial(run_async_func, func)
    self._executor = ThreadPoolExecutor(1)
    loop = asyncio.get_running_loop()
    fut = loop.run_in_executor(self._executor, func)
    result = await fut
    return result

run_generator() async

Run job as a generator.

Source code in executor/engine/job/thread.py
async def run_generator(self):
    """Run job as a generator."""
    func = functools.partial(self.func, *self.args, **self.kwargs)
    self._executor = ThreadPoolExecutor(
        1, initializer=_gen_initializer, initargs=(func,))
    result = create_generator_wrapper(self)
    return result

executor.engine.job.local.LocalJob

Bases: Job

Source code in executor/engine/job/local.py
class LocalJob(Job):
    async def run_function(self):
        """Run job in local thread."""
        if iscoroutinefunction(self.func):
            res = await self.func(*self.args, **self.kwargs)
        else:
            res = self.func(*self.args, **self.kwargs)
        return res

    async def run_generator(self):
        """Run job as a generator."""
        return create_generator_wrapper(self)

run_function() async

Run job in local thread.

Source code in executor/engine/job/local.py
async def run_function(self):
    """Run job in local thread."""
    if iscoroutinefunction(self.func):
        res = await self.func(*self.args, **self.kwargs)
    else:
        res = self.func(*self.args, **self.kwargs)
    return res

run_generator() async

Run job as a generator.

Source code in executor/engine/job/local.py
async def run_generator(self):
    """Run job as a generator."""
    return create_generator_wrapper(self)

executor.engine.job.dask.DaskJob

Bases: Job

Job that runs with Dask.

Source code in executor/engine/job/dask.py
class DaskJob(Job):
    """Job that runs with Dask."""

    def has_resource(self) -> bool:
        """Check if the job has enough resource to run."""
        if self.engine is None:
            return False
        else:
            return (
                super().has_resource() and
                (self.engine.resource.n_dask > 0)
            )

    def consume_resource(self) -> bool:
        """Consume resource for the job."""
        if self.engine is None:
            return False
        else:
            self.engine.resource.n_dask -= 1
            return (
                super().consume_resource() and
                True
            )

    def release_resource(self) -> bool:
        """Release resource for the job."""
        if self.engine is None:
            return False
        else:
            self.engine.resource.n_dask += 1
            return (
                super().release_resource() and
                True
            )

    async def run_function(self):
        """Run job with Dask."""
        client = self.engine.dask_client
        func = functools.partial(self.func, *self.args, **self.kwargs)
        if iscoroutinefunction(func):
            func = functools.partial(run_async_func, func)
        fut = client.submit(func)
        self._executor = fut
        result = await fut
        return result

    async def run_generator(self):
        """Run job as a generator."""
        client = self.engine.dask_client
        func = functools.partial(self.func, *self.args, **self.kwargs)
        fut = client.submit(func)
        self._executor = client.get_executor(pure=False)
        result = create_generator_wrapper(self, fut)
        return result

    async def cancel(self):
        """Cancel job."""""
        if self.status == "running":
            await self._executor.cancel()
        await super().cancel()

    def clear_context(self):
        """Clear context."""
        self._executor = None

cancel() async

Cancel job.

Source code in executor/engine/job/dask.py
async def cancel(self):
    """Cancel job."""""
    if self.status == "running":
        await self._executor.cancel()
    await super().cancel()

clear_context()

Clear context.

Source code in executor/engine/job/dask.py
def clear_context(self):
    """Clear context."""
    self._executor = None

consume_resource()

Consume resource for the job.

Source code in executor/engine/job/dask.py
def consume_resource(self) -> bool:
    """Consume resource for the job."""
    if self.engine is None:
        return False
    else:
        self.engine.resource.n_dask -= 1
        return (
            super().consume_resource() and
            True
        )

has_resource()

Check if the job has enough resource to run.

Source code in executor/engine/job/dask.py
def has_resource(self) -> bool:
    """Check if the job has enough resource to run."""
    if self.engine is None:
        return False
    else:
        return (
            super().has_resource() and
            (self.engine.resource.n_dask > 0)
        )

release_resource()

Release resource for the job.

Source code in executor/engine/job/dask.py
def release_resource(self) -> bool:
    """Release resource for the job."""
    if self.engine is None:
        return False
    else:
        self.engine.resource.n_dask += 1
        return (
            super().release_resource() and
            True
        )

run_function() async

Run job with Dask.

Source code in executor/engine/job/dask.py
async def run_function(self):
    """Run job with Dask."""
    client = self.engine.dask_client
    func = functools.partial(self.func, *self.args, **self.kwargs)
    if iscoroutinefunction(func):
        func = functools.partial(run_async_func, func)
    fut = client.submit(func)
    self._executor = fut
    result = await fut
    return result

run_generator() async

Run job as a generator.

Source code in executor/engine/job/dask.py
async def run_generator(self):
    """Run job as a generator."""
    client = self.engine.dask_client
    func = functools.partial(self.func, *self.args, **self.kwargs)
    fut = client.submit(func)
    self._executor = client.get_executor(pure=False)
    result = create_generator_wrapper(self, fut)
    return result