Source code for dagster.core.definitions.sensor_definition

import inspect
from contextlib import ExitStack
from enum import Enum
from typing import (
    TYPE_CHECKING,
    Any,
    Callable,
    Generator,
    List,
    NamedTuple,
    Optional,
    Sequence,
    Union,
    cast,
)

from dagster import check
from dagster.core.errors import (
    DagsterInvalidDefinitionError,
    DagsterInvalidInvocationError,
    DagsterInvariantViolationError,
)
from dagster.core.instance import DagsterInstance
from dagster.core.instance.ref import InstanceRef
from dagster.serdes import whitelist_for_serdes
from dagster.seven import funcsigs
from dagster.utils import ensure_gen

from ..decorator_utils import get_function_params
from .events import AssetKey
from .graph_definition import GraphDefinition
from .job_definition import JobDefinition
from .mode import DEFAULT_MODE_NAME
from .pipeline_definition import PipelineDefinition
from .run_request import PipelineRunReaction, RunRequest, SkipReason
from .target import DirectTarget, RepoRelativeTarget
from .utils import check_valid_name

if TYPE_CHECKING:
    from dagster.core.events.log import EventLogEntry


@whitelist_for_serdes
class DefaultSensorStatus(Enum):
    RUNNING = "RUNNING"
    STOPPED = "STOPPED"


DEFAULT_SENSOR_DAEMON_INTERVAL = 30


def is_context_provided(params: List[funcsigs.Parameter]) -> bool:
    return len(params) == 1


[docs]class SensorEvaluationContext: """Sensor execution context. An instance of this class is made available as the first argument to the evaluation function on SensorDefinition. Attributes: instance_ref (Optional[InstanceRef]): The serialized instance configured to run the schedule cursor (Optional[str]): The cursor, passed back from the last sensor evaluation via the cursor attribute of SkipReason and RunRequest last_completion_time (float): DEPRECATED The last time that the sensor was evaluated (UTC). last_run_key (str): DEPRECATED The run key of the RunRequest most recently created by this sensor. Use the preferred `cursor` attribute instead. repository_name (Optional[str]): The name of the repository that the sensor belongs to. instance (Optional[DagsterInstance]): The deserialized instance can also be passed in directly (primarily useful in testing contexts). """ def __init__( self, instance_ref: Optional[InstanceRef], last_completion_time: Optional[float], last_run_key: Optional[str], cursor: Optional[str], repository_name: Optional[str], instance: Optional[DagsterInstance] = None, ): self._exit_stack = ExitStack() self._instance_ref = check.opt_inst_param(instance_ref, "instance_ref", InstanceRef) self._last_completion_time = check.opt_float_param( last_completion_time, "last_completion_time" ) self._last_run_key = check.opt_str_param(last_run_key, "last_run_key") self._cursor = check.opt_str_param(cursor, "cursor") self._repository_name = check.opt_str_param(repository_name, "repository_name") self._instance = check.opt_inst_param(instance, "instance", DagsterInstance) def __enter__(self): return self def __exit__(self, _exception_type, _exception_value, _traceback): self._exit_stack.close() @property def instance(self) -> DagsterInstance: # self._instance_ref should only ever be None when this SensorEvaluationContext was # constructed under test. if not self._instance: if not self._instance_ref: raise DagsterInvariantViolationError( "Attempted to initialize dagster instance, but no instance reference was provided." ) self._instance = self._exit_stack.enter_context( DagsterInstance.from_ref(self._instance_ref) ) return cast(DagsterInstance, self._instance) @property def last_completion_time(self) -> Optional[float]: return self._last_completion_time @property def last_run_key(self) -> Optional[str]: return self._last_run_key @property def cursor(self) -> Optional[str]: """The cursor value for this sensor, which was set in an earlier sensor evaluation.""" return self._cursor def update_cursor(self, cursor: Optional[str]) -> None: """Updates the cursor value for this sensor, which will be provided on the context for the next sensor evaluation. This can be used to keep track of progress and avoid duplicate work across sensor evaluations. Args: cursor (Optional[str]): """ self._cursor = check.opt_str_param(cursor, "cursor") @property def repository_name(self) -> Optional[str]: return self._repository_name
# Preserve SensorExecutionContext for backcompat so type annotations don't break. SensorExecutionContext = SensorEvaluationContext
[docs]class SensorDefinition: """Define a sensor that initiates a set of runs based on some external state Args: evaluation_fn (Callable[[SensorEvaluationContext]]): The core evaluation function for the sensor, which is run at an interval to determine whether a run should be launched or not. Takes a :py:class:`~dagster.SensorEvaluationContext`. This function must return a generator, which must yield either a single SkipReason or one or more RunRequest objects. name (Optional[str]): The name of the sensor to create. Defaults to name of evaluation_fn pipeline_name (Optional[str]): (legacy) The name of the pipeline to execute when the sensor fires. Cannot be used in conjunction with `job` or `jobs` parameters. solid_selection (Optional[List[str]]): (legacy) A list of solid subselection (including single solid names) to execute when the sensor runs. e.g. ``['*some_solid+', 'other_solid']``. Cannot be used in conjunction with `job` or `jobs` parameters. mode (Optional[str]): (legacy) The mode to apply when executing runs triggered by this sensor. Cannot be used in conjunction with `job` or `jobs` parameters. (default: 'default') minimum_interval_seconds (Optional[int]): The minimum number of seconds that will elapse between sensor evaluations. description (Optional[str]): A human-readable description of the sensor. job (Optional[GraphDefinition, JobDefinition]): The job to execute when this sensor fires. jobs (Optional[Sequence[GraphDefinition, JobDefinition]]): (experimental) A list of jobs to execute when this sensor fires. default_status (DefaultSensorStatus): Whether the sensor starts as running or not. The default status can be overridden from Dagit or via the GraphQL API. """ def __init__( self, name: Optional[str] = None, evaluation_fn: Optional[ Callable[ ["SensorEvaluationContext"], Union[Generator[Union[RunRequest, SkipReason], None, None], RunRequest, SkipReason], ] ] = None, pipeline_name: Optional[str] = None, solid_selection: Optional[List[Any]] = None, mode: Optional[str] = None, minimum_interval_seconds: Optional[int] = None, description: Optional[str] = None, job: Optional[Union[GraphDefinition, JobDefinition]] = None, jobs: Optional[Sequence[Union[GraphDefinition, JobDefinition]]] = None, default_status: DefaultSensorStatus = DefaultSensorStatus.STOPPED, ): if evaluation_fn is None: raise DagsterInvalidDefinitionError("Must provide evaluation_fn to SensorDefinition.") if job and jobs: raise DagsterInvalidDefinitionError( "Attempted to provide both job and jobs to SensorDefinition. Must provide only one " "of the two." ) job_param_name = "job" if job else "jobs" jobs = jobs if jobs else [job] if job else None if pipeline_name and jobs: raise DagsterInvalidDefinitionError( f"Attempted to provide both pipeline_name and {job_param_name} to " "SensorDefinition. Must provide only one of the two." ) if solid_selection and jobs: raise DagsterInvalidDefinitionError( f"Attempted to provide solid_selection and {job_param_name} to SensorDefinition. " "The solid_selection argument is incompatible with jobs." ) if mode and jobs: raise DagsterInvalidDefinitionError( f"Attempted to provide mode and {job_param_name} to SensorDefinition. " "The mode argument is incompatible with jobs." ) targets: Optional[List[Union[RepoRelativeTarget, DirectTarget]]] = None if pipeline_name: targets = [ RepoRelativeTarget( pipeline_name=check.str_param(pipeline_name, "pipeline_name"), mode=check.opt_str_param(mode, "mode") or DEFAULT_MODE_NAME, solid_selection=check.opt_nullable_list_param( solid_selection, "solid_selection", of_type=str ), ) ] elif job: targets = [DirectTarget(job)] elif jobs: targets = [DirectTarget(job) for job in jobs] if name: self._name = check_valid_name(name) else: self._name = evaluation_fn.__name__ self._raw_fn = check.callable_param(evaluation_fn, "evaluation_fn") self._evaluation_fn: Callable[ [SensorEvaluationContext], Generator[Union[RunRequest, SkipReason], None, None] ] = wrap_sensor_evaluation(self._name, evaluation_fn) self._min_interval = check.opt_int_param( minimum_interval_seconds, "minimum_interval_seconds", DEFAULT_SENSOR_DAEMON_INTERVAL ) self._description = check.opt_str_param(description, "description") self._targets = check.opt_list_param(targets, "targets", (DirectTarget, RepoRelativeTarget)) self._default_status = check.inst_param( default_status, "default_status", DefaultSensorStatus ) def __call__(self, *args, **kwargs): context_provided = is_context_provided(get_function_params(self._raw_fn)) if context_provided: if len(args) + len(kwargs) == 0: raise DagsterInvalidInvocationError( "Sensor evaluation function expected context argument, but no context argument " "was provided when invoking." ) if len(args) + len(kwargs) > 1: raise DagsterInvalidInvocationError( "Sensor invocation received multiple arguments. Only a first " "positional context parameter should be provided when invoking." ) context_param_name = get_function_params(self._raw_fn)[0].name if args: context = check.opt_inst_param(args[0], context_param_name, SensorEvaluationContext) else: if context_param_name not in kwargs: raise DagsterInvalidInvocationError( f"Sensor invocation expected argument '{context_param_name}'." ) context = check.opt_inst_param( kwargs[context_param_name], context_param_name, SensorEvaluationContext ) context = context if context else build_sensor_context() return self._raw_fn(context) else: if len(args) + len(kwargs) > 0: raise DagsterInvalidInvocationError( "Sensor decorated function has no arguments, but arguments were provided to " "invocation." ) return self._raw_fn() @property def name(self) -> str: return self._name @property def description(self) -> Optional[str]: return self._description @property def minimum_interval_seconds(self) -> Optional[int]: return self._min_interval @property def targets(self) -> List[Union[DirectTarget, RepoRelativeTarget]]: return self._targets @property def job(self) -> PipelineDefinition: if self._targets: if len(self._targets) == 1 and isinstance(self._targets[0], DirectTarget): return self._targets[0].pipeline elif len(self._targets) > 1: raise DagsterInvalidDefinitionError( "Job property not available when SensorDefinition has multiple jobs." ) raise DagsterInvalidDefinitionError("No job was provided to SensorDefinition.") def evaluate_tick(self, context: "SensorEvaluationContext") -> "SensorExecutionData": """Evaluate sensor using the provided context. Args: context (SensorEvaluationContext): The context with which to evaluate this sensor. Returns: SensorExecutionData: Contains list of run requests, or skip message if present. """ check.inst_param(context, "context", SensorEvaluationContext) result = list(ensure_gen(self._evaluation_fn(context))) skip_message: Optional[str] = None run_requests: List[RunRequest] pipeline_run_reactions: List[PipelineRunReaction] if not result or result == [None]: run_requests = [] pipeline_run_reactions = [] skip_message = "Sensor function returned an empty result" elif len(result) == 1: item = result[0] check.inst(item, (SkipReason, RunRequest, PipelineRunReaction)) run_requests = [item] if isinstance(item, RunRequest) else [] pipeline_run_reactions = ( [cast(PipelineRunReaction, item)] if isinstance(item, PipelineRunReaction) else [] ) skip_message = item.skip_message if isinstance(item, SkipReason) else None else: check.is_list(result, (SkipReason, RunRequest, PipelineRunReaction)) has_skip = any(map(lambda x: isinstance(x, SkipReason), result)) has_run_request = any(map(lambda x: isinstance(x, RunRequest), result)) has_run_reaction = any(map(lambda x: isinstance(x, PipelineRunReaction), result)) if has_skip: if has_run_request: check.failed( "Expected a single SkipReason or one or more RunRequests: received both " "RunRequest and SkipReason" ) elif has_run_reaction: check.failed( "Expected a single SkipReason or one or more PipelineRunReaction: " "received both PipelineRunReaction and SkipReason" ) else: check.failed("Expected a single SkipReason: received multiple SkipReasons") if has_run_request: run_requests = cast(List[RunRequest], result) pipeline_run_reactions = [] else: # only run reactions run_requests = [] pipeline_run_reactions = cast(List[PipelineRunReaction], result) self.check_valid_run_requests(run_requests) return SensorExecutionData( run_requests, skip_message, context.cursor, pipeline_run_reactions, ) def has_loadable_targets(self) -> bool: for target in self._targets: if isinstance(target, DirectTarget): return True return False def load_targets(self) -> List[PipelineDefinition]: targets = [] for target in self._targets: if isinstance(target, DirectTarget): targets.append(target.load()) return targets def check_valid_run_requests(self, run_requests: List[RunRequest]): has_multiple_targets = len(self._targets) > 1 target_names = [target.pipeline_name for target in self._targets] if run_requests and not self._targets: raise Exception( f"Error in sensor {self._name}: Sensor evaluation function returned a RunRequest " "for a sensor lacking a specified target (pipeline_name, job, or jobs). Targets " "can be specified by providing job, jobs, or pipeline_name to the @sensor " "decorator." ) for run_request in run_requests: if run_request.job_name is None and has_multiple_targets: raise Exception( f"Error in sensor {self._name}: Sensor returned a RunRequest that did not " f"specify job_name for the requested run. Expected one of: {target_names}" ) elif run_request.job_name and run_request.job_name not in target_names: raise Exception( f"Error in sensor {self._name}: Sensor returned a RunRequest with job_name " f"{run_request.job_name}. Expected one of: {target_names}" ) @property def _target(self) -> Optional[Union[DirectTarget, RepoRelativeTarget]]: return self._targets[0] if self._targets else None @property def pipeline_name(self) -> Optional[str]: return self._target.pipeline_name if self._target else None @property def solid_selection(self) -> Optional[List[Any]]: return self._target.solid_selection if self._target else None @property def mode(self) -> Optional[str]: return self._target.mode if self._target else None @property def default_status(self) -> DefaultSensorStatus: return self._default_status
@whitelist_for_serdes class SensorExecutionData( NamedTuple( "_SensorExecutionData", [ ("run_requests", Optional[List[RunRequest]]), ("skip_message", Optional[str]), ("cursor", Optional[str]), ("pipeline_run_reactions", Optional[List[PipelineRunReaction]]), ], ) ): def __new__( cls, run_requests: Optional[List[RunRequest]] = None, skip_message: Optional[str] = None, cursor: Optional[str] = None, pipeline_run_reactions: Optional[List[PipelineRunReaction]] = None, ): check.opt_list_param(run_requests, "run_requests", RunRequest) check.opt_str_param(skip_message, "skip_message") check.opt_str_param(cursor, "cursor") check.opt_list_param(pipeline_run_reactions, "pipeline_run_reactions", PipelineRunReaction) check.invariant( not (run_requests and skip_message), "Found both skip data and run request data" ) return super(SensorExecutionData, cls).__new__( cls, run_requests=run_requests, skip_message=skip_message, cursor=cursor, pipeline_run_reactions=pipeline_run_reactions, ) def wrap_sensor_evaluation( sensor_name: str, fn: Callable[ ["SensorEvaluationContext"], Union[Generator[Union[RunRequest, SkipReason], None, None], RunRequest, SkipReason], ], ) -> Callable[["SensorEvaluationContext"], Generator[Union[SkipReason, RunRequest], None, None]]: def _wrapped_fn(context): result = fn(context) if is_context_provided(get_function_params(fn)) else fn() if inspect.isgenerator(result): for item in result: yield item elif isinstance(result, (SkipReason, RunRequest)): yield result elif result is not None: raise Exception( ( "Error in sensor {sensor_name}: Sensor unexpectedly returned output " "{result} of type {type_}. Should only return SkipReason or " "RunRequest objects." ).format(sensor_name=sensor_name, result=result, type_=type(result)) ) return _wrapped_fn
[docs]def build_sensor_context( instance: Optional[DagsterInstance] = None, cursor: Optional[str] = None, repository_name: Optional[str] = None, ) -> SensorEvaluationContext: """Builds sensor execution context using the provided parameters. This function can be used to provide a context to the invocation of a sensor definition.If provided, the dagster instance must be persistent; DagsterInstance.ephemeral() will result in an error. Args: instance (Optional[DagsterInstance]): The dagster instance configured to run the sensor. cursor (Optional[str]): A cursor value to provide to the evaluation of the sensor. repository_name (Optional[str]): The name of the repository that the sensor belongs to. Examples: .. code-block:: python context = build_sensor_context() my_sensor(context) """ check.opt_inst_param(instance, "instance", DagsterInstance) check.opt_str_param(cursor, "cursor") check.opt_str_param(repository_name, "repository_name") return SensorEvaluationContext( instance_ref=None, last_completion_time=None, last_run_key=None, cursor=cursor, repository_name=repository_name, instance=instance, )
[docs]class AssetSensorDefinition(SensorDefinition): """Define an asset sensor that initiates a set of runs based on the materialization of a given asset. Args: name (str): The name of the sensor to create. asset_key (AssetKey): The asset_key this sensor monitors. pipeline_name (Optional[str]): (legacy) The name of the pipeline to execute when the sensor fires. Cannot be used in conjunction with `job` or `jobs` parameters. asset_materialization_fn (Callable[[SensorEvaluationContext, EventLogEntry], Union[Generator[Union[RunRequest, SkipReason], None, None], RunRequest, SkipReason]]): The core evaluation function for the sensor, which is run at an interval to determine whether a run should be launched or not. Takes a :py:class:`~dagster.SensorEvaluationContext` and an EventLogEntry corresponding to an AssetMaterialization event. This function must return a generator, which must yield either a single SkipReason or one or more RunRequest objects. solid_selection (Optional[List[str]]): (legacy) A list of solid subselection (including single solid names) to execute when the sensor runs. e.g. ``['*some_solid+', 'other_solid']``. Cannot be used in conjunction with `job` or `jobs` parameters. mode (Optional[str]): (legacy) The mode to apply when executing runs triggered by this sensor. (default: 'default'). Cannot be used in conjunction with `job` or `jobs` parameters. minimum_interval_seconds (Optional[int]): The minimum number of seconds that will elapse between sensor evaluations. description (Optional[str]): A human-readable description of the sensor. job (Optional[Union[GraphDefinition, JobDefinition]]): The job object to target with this sensor. jobs (Optional[Sequence[Union[GraphDefinition, JobDefinition]]]): (experimental) A list of jobs to be executed when the sensor fires. default_status (DefaultSensorStatus): Whether the sensor starts as running or not. The default status can be overridden from Dagit or via the GraphQL API. """ def __init__( self, name: str, asset_key: AssetKey, pipeline_name: Optional[str], asset_materialization_fn: Callable[ ["SensorExecutionContext", "EventLogEntry"], Union[Generator[Union[RunRequest, SkipReason], None, None], RunRequest, SkipReason], ], solid_selection: Optional[List[str]] = None, mode: Optional[str] = None, minimum_interval_seconds: Optional[int] = None, description: Optional[str] = None, job: Optional[Union[GraphDefinition, JobDefinition]] = None, jobs: Optional[Sequence[Union[GraphDefinition, JobDefinition]]] = None, default_status: DefaultSensorStatus = DefaultSensorStatus.STOPPED, ): self._asset_key = check.inst_param(asset_key, "asset_key", AssetKey) from dagster.core.events import DagsterEventType from dagster.core.storage.event_log.base import EventRecordsFilter def _wrap_asset_fn(materialization_fn): def _fn(context): after_cursor = None if context.cursor: try: after_cursor = int(context.cursor) except ValueError: after_cursor = None event_records = context.instance.get_event_records( EventRecordsFilter( event_type=DagsterEventType.ASSET_MATERIALIZATION, asset_key=self._asset_key, after_cursor=after_cursor, ), ascending=False, limit=1, ) if not event_records: return event_record = event_records[0] yield from materialization_fn(context, event_record.event_log_entry) context.update_cursor(str(event_record.storage_id)) return _fn super(AssetSensorDefinition, self).__init__( name=check_valid_name(name), pipeline_name=pipeline_name, evaluation_fn=_wrap_asset_fn( check.callable_param(asset_materialization_fn, "asset_materialization_fn"), ), solid_selection=solid_selection, mode=mode, minimum_interval_seconds=minimum_interval_seconds, description=description, job=job, jobs=jobs, default_status=default_status, ) @property def asset_key(self): return self._asset_key