Source code for dagster.core.definitions.run_status_sensor_definition

import warnings
from datetime import datetime
from typing import Any, Callable, List, NamedTuple, Optional, Union, cast

import pendulum

from dagster import check
from dagster.core.definitions import GraphDefinition, PipelineDefinition
from dagster.core.definitions.sensor_definition import (
    DefaultSensorStatus,
    PipelineRunReaction,
    SensorDefinition,
    SensorEvaluationContext,
    SkipReason,
    is_context_provided,
)
from dagster.core.errors import (
    DagsterInvalidInvocationError,
    RunStatusSensorExecutionError,
    user_code_error_boundary,
)
from dagster.core.events import PIPELINE_RUN_STATUS_TO_EVENT_TYPE, DagsterEvent
from dagster.core.instance import DagsterInstance
from dagster.core.storage.pipeline_run import DagsterRun, PipelineRun, PipelineRunStatus, RunsFilter
from dagster.serdes import (
    deserialize_json_to_dagster_namedtuple,
    serialize_dagster_namedtuple,
    whitelist_for_serdes,
)
from dagster.serdes.errors import DeserializationError
from dagster.serdes.serdes import register_serdes_tuple_fallbacks
from dagster.seven import JSONDecodeError
from dagster.utils import utc_datetime_from_timestamp
from dagster.utils.error import serializable_error_info_from_exc_info

from ..decorator_utils import get_function_params


@whitelist_for_serdes
class RunStatusSensorCursor(
    NamedTuple(
        "_RunStatusSensorCursor",
        [("record_id", int), ("update_timestamp", str)],
    )
):
    def __new__(cls, record_id, update_timestamp):

        return super(RunStatusSensorCursor, cls).__new__(
            cls,
            record_id=check.int_param(record_id, "record_id"),
            update_timestamp=check.str_param(update_timestamp, "update_timestamp"),
        )

    @staticmethod
    def is_valid(json_str: str) -> bool:
        try:
            obj = deserialize_json_to_dagster_namedtuple(json_str)
            return isinstance(obj, RunStatusSensorCursor)
        except (JSONDecodeError, DeserializationError):
            return False

    def to_json(self) -> str:
        return serialize_dagster_namedtuple(cast(NamedTuple, self))

    @staticmethod
    def from_json(json_str: str) -> tuple:
        return deserialize_json_to_dagster_namedtuple(json_str)


# handle backcompat
register_serdes_tuple_fallbacks({"PipelineSensorCursor": RunStatusSensorCursor})


[docs]class RunStatusSensorContext( NamedTuple( "_RunStatusSensorContext", [ ("sensor_name", str), ("dagster_run", DagsterRun), ("dagster_event", DagsterEvent), ("instance", DagsterInstance), ], ) ): """The ``context`` object available to a decorated function of ``run_status_sensor``. Attributes: sensor_name (str): the name of the sensor. dagster_run (DagsterRun): the run of the job or pipeline. dagster_event (DagsterEvent): the event associated with the job or pipeline run status. instance (DagsterInstance): the current instance. """ def __new__(cls, sensor_name, dagster_run, dagster_event, instance): return super(RunStatusSensorContext, cls).__new__( cls, sensor_name=check.str_param(sensor_name, "sensor_name"), dagster_run=check.inst_param(dagster_run, "dagster_run", DagsterRun), dagster_event=check.inst_param(dagster_event, "dagster_event", DagsterEvent), instance=check.inst_param(instance, "instance", DagsterInstance), ) def for_pipeline_failure(self): return PipelineFailureSensorContext( sensor_name=self.sensor_name, dagster_run=self.dagster_run, dagster_event=self.dagster_event, instance=self.instance, )
[docs] def for_run_failure(self): """Converts RunStatusSensorContext to RunFailureSensorContext.""" return RunFailureSensorContext( sensor_name=self.sensor_name, dagster_run=self.dagster_run, dagster_event=self.dagster_event, instance=self.instance, )
@property def pipeline_run(self) -> PipelineRun: warnings.warn( "`RunStatusSensorContext.pipeline_run` is deprecated as of 0.13.0; use " "`RunStatusSensorContext.dagster_run` instead." ) return self.dagster_run
[docs]class PipelineFailureSensorContext(RunStatusSensorContext): """The ``context`` object available to a decorated function of ``pipeline_failure_sensor``. Attributes: sensor_name (str): the name of the sensor. pipeline_run (PipelineRun): the failed pipeline run. failure_event (DagsterEvent): the pipeline failure event. """ @property def failure_event(self): return self.dagster_event
[docs]class RunFailureSensorContext(RunStatusSensorContext): """The ``context`` object available to a decorated function of ``run_failure_sensor``. Attributes: sensor_name (str): the name of the sensor. pipeline_run (PipelineRun): the failed pipeline run. failure_event (DagsterEvent): the pipeline failure event. """ @property def failure_event(self): return self.dagster_event
[docs]def build_run_status_sensor_context( sensor_name: str, dagster_event: DagsterEvent, dagster_instance: DagsterInstance, dagster_run: DagsterRun, ) -> RunStatusSensorContext: """ Builds run status sensor context from provided parameters. This function can be used to provide the context argument when directly invoking a function decorated with `@run_status_sensor` or `@run_failure_sensor`, such as when writing unit tests. Args: sensor_name (str): The name of the sensor the context is being constructed for. dagster_event (DagsterEvent): A DagsterEvent with the same event type as the one that triggers the run_status_sensor dagster_instance (DagsterInstance): The dagster instance configured for the context. dagster_run (DagsterRun): DagsterRun object from running a job Examples: .. code-block:: python instance = DagsterInstance.ephemeral() result = my_job.execute_in_process(instance=instance) dagster_run = result.dagster_run dagster_event = result.get_job_success_event() # or get_job_failure_event() context = build_run_status_sensor_context( sensor_name="run_status_sensor_to_invoke", dagster_instance=instance, dagster_run=dagster_run, dagster_event=dagster_event, ) run_status_sensor_to_invoke(context) """ return RunStatusSensorContext( sensor_name=sensor_name, instance=dagster_instance, dagster_run=dagster_run, dagster_event=dagster_event, )
[docs]def pipeline_failure_sensor( name: Optional[Union[Callable[..., Any], str]] = None, minimum_interval_seconds: Optional[int] = None, description: Optional[str] = None, pipeline_selection: Optional[List[str]] = None, default_status: DefaultSensorStatus = DefaultSensorStatus.STOPPED, ) -> Callable[ [Callable[[PipelineFailureSensorContext], Union[SkipReason, PipelineRunReaction]]], SensorDefinition, ]: """ Creates a sensor that reacts to pipeline failure events, where the decorated function will be run when a pipeline run fails. Takes a :py:class:`~dagster.PipelineFailureSensorContext`. Args: name (Optional[str]): The name of the pipeline failure sensor. Defaults to the name of the decorated function. minimum_interval_seconds (Optional[int]): The minimum number of seconds that will elapse between sensor evaluations. description (Optional[str]): A human-readable description of the sensor. pipeline_selection (Optional[List[str]]): Names of the pipelines that will be monitored by this failure sensor. Defaults to None, which means the alert will be sent when any pipeline in the repository fails. default_status (DefaultSensorStatus): Whether the sensor starts as running or not. The default status can be overridden from Dagit or via the GraphQL API. """ def inner( fn: Callable[[PipelineFailureSensorContext], Union[SkipReason, PipelineRunReaction]] ) -> SensorDefinition: check.callable_param(fn, "fn") if name is None or callable(name): sensor_name = fn.__name__ else: sensor_name = name @run_status_sensor( pipeline_run_status=PipelineRunStatus.FAILURE, pipeline_selection=pipeline_selection, name=sensor_name, minimum_interval_seconds=minimum_interval_seconds, description=description, default_status=default_status, ) def _pipeline_failure_sensor(context: RunStatusSensorContext): fn(context.for_pipeline_failure()) return _pipeline_failure_sensor # This case is for when decorator is used bare, without arguments, i.e. @pipeline_failure_sensor if callable(name): return inner(name) return inner
[docs]def run_failure_sensor( name: Optional[Union[Callable[..., Any], str]] = None, minimum_interval_seconds: Optional[int] = None, description: Optional[str] = None, job_selection: Optional[List[Union[PipelineDefinition, GraphDefinition]]] = None, pipeline_selection: Optional[List[str]] = None, default_status: DefaultSensorStatus = DefaultSensorStatus.STOPPED, ) -> Callable[ [Callable[[RunFailureSensorContext], Union[SkipReason, PipelineRunReaction]]], SensorDefinition, ]: """ Creates a sensor that reacts to job failure events, where the decorated function will be run when a run fails. Takes a :py:class:`~dagster.RunFailureSensorContext`. Args: name (Optional[str]): The name of the job failure sensor. Defaults to the name of the decorated function. minimum_interval_seconds (Optional[int]): The minimum number of seconds that will elapse between sensor evaluations. description (Optional[str]): A human-readable description of the sensor. job_selection (Optional[List[Union[JobDefinition, GraphDefinition]]]): The jobs that will be monitored by this failure sensor. Defaults to None, which means the alert will be sent when any job in the repository fails. pipeline_selection (Optional[List[str]]): (legacy) Names of the pipelines that will be monitored by this sensor. Defaults to None, which means the alert will be sent when any pipeline in the repository fails. default_status (DefaultSensorStatus): Whether the sensor starts as running or not. The default status can be overridden from Dagit or via the GraphQL API. """ def inner( fn: Callable[[RunFailureSensorContext], Union[SkipReason, PipelineRunReaction]] ) -> SensorDefinition: check.callable_param(fn, "fn") if name is None or callable(name): sensor_name = fn.__name__ else: sensor_name = name @run_status_sensor( pipeline_run_status=PipelineRunStatus.FAILURE, name=sensor_name, minimum_interval_seconds=minimum_interval_seconds, description=description, job_selection=job_selection, pipeline_selection=pipeline_selection, default_status=default_status, ) def _run_failure_sensor(context: RunStatusSensorContext): fn(context.for_run_failure()) return _run_failure_sensor # This case is for when decorator is used bare, without arguments, i.e. @pipeline_failure_sensor if callable(name): return inner(name) return inner
[docs]class RunStatusSensorDefinition(SensorDefinition): """ Define a sensor that reacts to a given status of pipeline execution, where the decorated function will be evaluated when a run is at the given status. Args: name (str): The name of the sensor. Defaults to the name of the decorated function. pipeline_run_status (PipelineRunStatus): The status of a run which will be monitored by the sensor. run_status_sensor_fn (Callable[[RunStatusSensorContext], Union[SkipReason, PipelineRunReaction]]): The core evaluation function for the sensor. Takes a :py:class:`~dagster.RunStatusSensorContext`. pipeline_selection (Optional[List[str]]): (legacy) Names of the pipelines that will be monitored by this sensor. Defaults to None, which means the alert will be sent when any pipeline in the repository fails. minimum_interval_seconds (Optional[int]): The minimum number of seconds that will elapse between sensor evaluations. description (Optional[str]): A human-readable description of the sensor. job_selection (Optional[List[Union[JobDefinition, GraphDefinition]]]): The jobs that will be monitored by this sensor. Defaults to None, which means the alert will be sent when any job in the repository fails. default_status (DefaultSensorStatus): Whether the sensor starts as running or not. The default status can be overridden from Dagit or via the GraphQL API. """ def __init__( self, name: str, pipeline_run_status: PipelineRunStatus, run_status_sensor_fn: Callable[ [RunStatusSensorContext], Union[SkipReason, PipelineRunReaction] ], pipeline_selection: Optional[List[str]] = None, minimum_interval_seconds: Optional[int] = None, description: Optional[str] = None, job_selection: Optional[List[Union[PipelineDefinition, GraphDefinition]]] = None, default_status: DefaultSensorStatus = DefaultSensorStatus.STOPPED, ): from dagster.core.storage.event_log.base import EventRecordsFilter, RunShardedEventsCursor check.str_param(name, "name") check.inst_param(pipeline_run_status, "pipeline_run_status", PipelineRunStatus) check.callable_param(run_status_sensor_fn, "run_status_sensor_fn") check.opt_list_param(pipeline_selection, "pipeline_selection", str) check.opt_int_param(minimum_interval_seconds, "minimum_interval_seconds") check.opt_str_param(description, "description") check.opt_list_param(job_selection, "job_selection", (PipelineDefinition, GraphDefinition)) check.inst_param(default_status, "default_status", DefaultSensorStatus) self._run_status_sensor_fn = check.callable_param( run_status_sensor_fn, "run_status_sensor_fn" ) def _wrapped_fn(context: SensorEvaluationContext): # initiate the cursor to (most recent event id, current timestamp) when: # * it's the first time starting the sensor # * or, the cursor isn't in valid format (backcompt) if context.cursor is None or not RunStatusSensorCursor.is_valid(context.cursor): most_recent_event_records = list( context.instance.get_event_records(ascending=False, limit=1) ) most_recent_event_id = ( most_recent_event_records[0].storage_id if len(most_recent_event_records) == 1 else -1 ) new_cursor = RunStatusSensorCursor( update_timestamp=pendulum.now("UTC").isoformat(), record_id=most_recent_event_id, ) context.update_cursor(new_cursor.to_json()) yield SkipReason(f"Initiating {name}. Set cursor to {new_cursor}") return record_id, update_timestamp = RunStatusSensorCursor.from_json(context.cursor) # Fetch events after the cursor id # * we move the cursor forward to the latest visited event's id to avoid revisits # * when the daemon is down, bc we persist the cursor info, we can go back to where we # left and backfill alerts for the qualified events (up to 5 at a time) during the downtime # Note: this is a cross-run query which requires extra handling in sqlite, see details in SqliteEventLogStorage. event_records = context.instance.get_event_records( EventRecordsFilter( after_cursor=RunShardedEventsCursor( id=record_id, run_updated_after=cast(datetime, pendulum.parse(update_timestamp)), ), event_type=PIPELINE_RUN_STATUS_TO_EVENT_TYPE[pipeline_run_status], ), ascending=True, limit=5, ) for event_record in event_records: event_log_entry = event_record.event_log_entry storage_id = event_record.storage_id # get run info run_records = context.instance.get_run_records( filters=RunsFilter(run_ids=[event_log_entry.run_id]) ) # skip if we couldn't find the right run if len(run_records) != 1: # bc we couldn't find the run, we use the event timestamp as the approximate # run update timestamp approximate_update_timestamp = utc_datetime_from_timestamp( event_log_entry.timestamp ) context.update_cursor( RunStatusSensorCursor( record_id=storage_id, update_timestamp=approximate_update_timestamp.isoformat(), ).to_json() ) continue pipeline_run = run_records[0].pipeline_run update_timestamp = run_records[0].update_timestamp # skip if any of of the followings happens: if ( # the pipeline does not have a repository (manually executed) not pipeline_run.external_pipeline_origin or # the pipeline does not belong to the current repository pipeline_run.external_pipeline_origin.external_repository_origin.repository_name != context.repository_name or # if pipeline is not selected (pipeline_selection and pipeline_run.pipeline_name not in pipeline_selection) or # if job not selected ( job_selection and pipeline_run.pipeline_name not in map(lambda x: x.name, job_selection) ) ): context.update_cursor( RunStatusSensorCursor( record_id=storage_id, update_timestamp=update_timestamp.isoformat() ).to_json() ) continue serializable_error = None try: with user_code_error_boundary( RunStatusSensorExecutionError, lambda: f'Error occurred during the execution sensor "{name}".', ): # one user code invocation maps to one failure event run_status_sensor_fn( RunStatusSensorContext( sensor_name=name, dagster_run=pipeline_run, dagster_event=event_log_entry.dagster_event, instance=context.instance, ) ) except RunStatusSensorExecutionError as run_status_sensor_execution_error: # When the user code errors, we report error to the sensor tick not the original run. serializable_error = serializable_error_info_from_exc_info( run_status_sensor_execution_error.original_exc_info ) context.update_cursor( RunStatusSensorCursor( record_id=storage_id, update_timestamp=update_timestamp.isoformat() ).to_json() ) # Yield PipelineRunReaction to indicate the execution success/failure. # The sensor machinery would # * report back to the original run if success # * update cursor and job state yield PipelineRunReaction( pipeline_run=pipeline_run, run_status=pipeline_run_status, error=serializable_error, ) super(RunStatusSensorDefinition, self).__init__( name=name, evaluation_fn=_wrapped_fn, minimum_interval_seconds=minimum_interval_seconds, description=description, default_status=default_status, ) def __call__(self, *args, **kwargs): context_provided = is_context_provided(get_function_params(self._run_status_sensor_fn)) if context_provided: if len(args) + len(kwargs) == 0: raise DagsterInvalidInvocationError( "Run status sensor function expected context argument, but no context argument " "was provided when invoking." ) if len(args) + len(kwargs) > 1: raise DagsterInvalidInvocationError( "Run status sensor invocation received multiple arguments. Only a first " "positional context parameter should be provided when invoking." ) context_param_name = get_function_params(self._run_status_sensor_fn)[0].name if args: context = check.opt_inst_param(args[0], context_param_name, RunStatusSensorContext) else: if context_param_name not in kwargs: raise DagsterInvalidInvocationError( f"Run status sensor invocation expected argument '{context_param_name}'." ) context = check.opt_inst_param( kwargs[context_param_name], context_param_name, RunStatusSensorContext ) if not context: raise DagsterInvalidInvocationError( "Context must be provided for direct invocation of run status sensor." ) return self._run_status_sensor_fn(context) else: if len(args) + len(kwargs) > 0: raise DagsterInvalidInvocationError( "Run status sensor decorated function has no arguments, but arguments were " "provided to invocation." ) return self._run_status_sensor_fn()
[docs]def run_status_sensor( pipeline_run_status: PipelineRunStatus, pipeline_selection: Optional[List[str]] = None, name: Optional[str] = None, minimum_interval_seconds: Optional[int] = None, description: Optional[str] = None, job_selection: Optional[List[Union[PipelineDefinition, GraphDefinition]]] = None, default_status: DefaultSensorStatus = DefaultSensorStatus.STOPPED, ) -> Callable[ [Callable[[RunStatusSensorContext], Union[SkipReason, PipelineRunReaction]]], RunStatusSensorDefinition, ]: """ Creates a sensor that reacts to a given status of pipeline execution, where the decorated function will be run when a pipeline is at the given status. Takes a :py:class:`~dagster.RunStatusSensorContext`. Args: pipeline_run_status (PipelineRunStatus): The status of pipeline execution which will be monitored by the sensor. pipeline_selection (Optional[List[str]]): Names of the pipelines that will be monitored by this sensor. Defaults to None, which means the alert will be sent when any pipeline in the repository fails. name (Optional[str]): The name of the sensor. Defaults to the name of the decorated function. minimum_interval_seconds (Optional[int]): The minimum number of seconds that will elapse between sensor evaluations. description (Optional[str]): A human-readable description of the sensor. job_selection (Optional[List[Union[PipelineDefinition, GraphDefinition]]]): Jobs that will be monitored by this sensor. Defaults to None, which means the alert will be sent when any job in the repository fails. default_status (DefaultSensorStatus): Whether the sensor starts as running or not. The default status can be overridden from Dagit or via the GraphQL API. """ def inner( fn: Callable[["RunStatusSensorContext"], Union[SkipReason, PipelineRunReaction]] ) -> RunStatusSensorDefinition: check.callable_param(fn, "fn") sensor_name = name or fn.__name__ def _wrapped_fn(context: RunStatusSensorContext): fn(context) return RunStatusSensorDefinition( name=sensor_name, pipeline_run_status=pipeline_run_status, run_status_sensor_fn=_wrapped_fn, pipeline_selection=pipeline_selection, minimum_interval_seconds=minimum_interval_seconds, description=description, job_selection=job_selection, default_status=default_status, ) return inner