Source code for dagster.core.definitions.decorators.sensor

import inspect
from functools import update_wrapper
from typing import TYPE_CHECKING, Callable, Generator, List, Optional, Sequence, Union

from dagster import check
from dagster.core.definitions.sensor_definition import (
    DefaultSensorStatus,
    RunRequest,
    SensorDefinition,
    SkipReason,
)
from dagster.core.errors import DagsterInvariantViolationError

from ...errors import DagsterInvariantViolationError
from ..events import AssetKey
from ..graph_definition import GraphDefinition
from ..job_definition import JobDefinition
from ..sensor_definition import (
    AssetSensorDefinition,
    RunRequest,
    SensorDefinition,
    SensorEvaluationContext,
    SkipReason,
)

if TYPE_CHECKING:
    from ...events.log import EventLogEntry


[docs]def sensor( pipeline_name: Optional[str] = None, name: Optional[str] = None, solid_selection: Optional[List[str]] = None, mode: Optional[str] = None, minimum_interval_seconds: Optional[int] = None, description: Optional[str] = None, job: Optional[Union[GraphDefinition, JobDefinition]] = None, jobs: Optional[Sequence[Union[GraphDefinition, JobDefinition]]] = None, default_status: DefaultSensorStatus = DefaultSensorStatus.STOPPED, ) -> Callable[ [ Callable[ [SensorEvaluationContext], Union[Generator[Union[RunRequest, SkipReason], None, None], RunRequest, SkipReason], ] ], SensorDefinition, ]: """ Creates a sensor where the decorated function is used as the sensor's evaluation function. The decorated function may: 1. Return a `RunRequest` object. 2. Yield multiple of `RunRequest` objects. 3. Return or yield a `SkipReason` object, providing a descriptive message of why no runs were requested. 4. Return or yield nothing (skipping without providing a reason) Takes a :py:class:`~dagster.SensorEvaluationContext`. Args: pipeline_name (Optional[str]): (legacy) Name of the target pipeline. Cannot be used in conjunction with `job` or `jobs` parameters. name (Optional[str]): The name of the sensor. Defaults to the name of the decorated function. solid_selection (Optional[List[str]]): (legacy) A list of solid subselection (including single solid names) to execute for runs for this sensor e.g. ``['*some_solid+', 'other_solid']``. Cannot be used in conjunction with `job` or `jobs` parameters. mode (Optional[str]): (legacy) The mode to apply when executing runs for this sensor. Cannot be used in conjunction with `job` or `jobs` parameters. (default: 'default') minimum_interval_seconds (Optional[int]): The minimum number of seconds that will elapse between sensor evaluations. description (Optional[str]): A human-readable description of the sensor. job (Optional[Union[GraphDefinition, JobDefinition]]): The job to be executed when the sensor fires. jobs (Optional[Sequence[Union[GraphDefinition, JobDefinition]]]): (experimental) A list of jobs to be executed when the sensor fires. default_status (DefaultSensorStatus): Whether the sensor starts as running or not. The default status can be overridden from Dagit or via the GraphQL API. """ check.opt_str_param(name, "name") def inner( fn: Callable[ ["SensorEvaluationContext"], Union[Generator[Union[SkipReason, RunRequest], None, None], SkipReason, RunRequest], ] ) -> SensorDefinition: check.callable_param(fn, "fn") sensor_def = SensorDefinition( name=name, pipeline_name=pipeline_name, evaluation_fn=fn, solid_selection=solid_selection, mode=mode, minimum_interval_seconds=minimum_interval_seconds, description=description, job=job, jobs=jobs, default_status=default_status, ) update_wrapper(sensor_def, wrapped=fn) return sensor_def return inner
[docs]def asset_sensor( asset_key: AssetKey, pipeline_name: Optional[str] = None, name: Optional[str] = None, solid_selection: Optional[List[str]] = None, mode: Optional[str] = None, minimum_interval_seconds: Optional[int] = None, description: Optional[str] = None, job: Optional[Union[GraphDefinition, JobDefinition]] = None, jobs: Optional[Sequence[Union[GraphDefinition, JobDefinition]]] = None, default_status: DefaultSensorStatus = DefaultSensorStatus.STOPPED, ) -> Callable[ [ Callable[ [ "SensorEvaluationContext", "EventLogEntry", ], Union[Generator[Union[RunRequest, SkipReason], None, None], RunRequest, SkipReason], ] ], AssetSensorDefinition, ]: """ Creates an asset sensor where the decorated function is used as the asset sensor's evaluation function. The decorated function may: 1. Return a `RunRequest` object. 2. Yield multiple of `RunRequest` objects. 3. Return or yield a `SkipReason` object, providing a descriptive message of why no runs were requested. 4. Return or yield nothing (skipping without providing a reason) Takes a :py:class:`~dagster.SensorEvaluationContext` and an EventLogEntry corresponding to an AssetMaterialization event. Args: asset_key (AssetKey): The asset_key this sensor monitors. pipeline_name (Optional[str]): (legacy) Name of the target pipeline. Cannot be used in conjunction with `job` or `jobs` parameters. name (Optional[str]): The name of the sensor. Defaults to the name of the decorated function. solid_selection (Optional[List[str]]): (legacy) A list of solid subselection (including single solid names) to execute for runs for this sensor e.g. ``['*some_solid+', 'other_solid']``. Cannot be used in conjunction with `job` or `jobs` parameters. mode (Optional[str]): (legacy) The mode to apply when executing runs for this sensor. Cannot be used in conjunction with `job` or `jobs` parameters. (default: 'default') minimum_interval_seconds (Optional[int]): The minimum number of seconds that will elapse between sensor evaluations. description (Optional[str]): A human-readable description of the sensor. job (Optional[Union[GraphDefinition, JobDefinition]]): The job to be executed when the sensor fires. jobs (Optional[Sequence[Union[GraphDefinition, JobDefinition]]]): (experimental) A list of jobs to be executed when the sensor fires. default_status (DefaultSensorStatus): Whether the sensor starts as running or not. The default status can be overridden from Dagit or via the GraphQL API. """ check.opt_str_param(name, "name") def inner( fn: Callable[ [ "SensorEvaluationContext", "EventLogEntry", ], Union[Generator[Union[SkipReason, RunRequest], None, None], SkipReason, RunRequest], ] ) -> AssetSensorDefinition: check.callable_param(fn, "fn") sensor_name = name or fn.__name__ def _wrapped_fn(context, event): result = fn(context, event) if inspect.isgenerator(result): for item in result: yield item elif isinstance(result, (RunRequest, SkipReason)): yield result elif result is not None: raise DagsterInvariantViolationError( ( "Error in sensor {sensor_name}: Sensor unexpectedly returned output " "{result} of type {type_}. Should only return SkipReason or " "RunRequest objects." ).format(sensor_name=sensor_name, result=result, type_=type(result)) ) return AssetSensorDefinition( name=sensor_name, asset_key=asset_key, pipeline_name=pipeline_name, asset_materialization_fn=_wrapped_fn, solid_selection=solid_selection, mode=mode, minimum_interval_seconds=minimum_interval_seconds, description=description, job=job, jobs=jobs, default_status=default_status, ) return inner