Source code for dagster.core.storage.event_log.base

import warnings
from abc import ABC, abstractmethod
from datetime import datetime
from typing import (
    Callable,
    Iterable,
    List,
    Mapping,
    NamedTuple,
    Optional,
    Sequence,
    Set,
    Tuple,
    Union,
)

from dagster import check
from dagster.core.definitions.events import AssetKey
from dagster.core.events import DagsterEventType
from dagster.core.events.log import EventLogEntry
from dagster.core.execution.stats import (
    RunStepKeyStatsSnapshot,
    build_run_stats_from_events,
    build_run_step_stats_from_events,
)
from dagster.core.instance import MayHaveInstanceWeakref
from dagster.core.storage.pipeline_run import PipelineRunStatsSnapshot
from dagster.serdes import whitelist_for_serdes


[docs]class RunShardedEventsCursor(NamedTuple): """Pairs an id-based event log cursor with a timestamp-based run cursor, for improved performance on run-sharded event log storages (e.g. the default SqliteEventLogStorage). For run-sharded storages, the id field is ignored, since they may not be unique across shards """ id: int run_updated_after: datetime
[docs]class EventLogRecord(NamedTuple): """Internal representation of an event record, as stored in a :py:class:`~dagster.core.storage.event_log.EventLogStorage`. """ storage_id: int event_log_entry: EventLogEntry
[docs]@whitelist_for_serdes class EventRecordsFilter( NamedTuple( "_EventRecordsFilter", [ ("event_type", Optional[DagsterEventType]), ("asset_key", Optional[AssetKey]), ("asset_partitions", Optional[List[str]]), ("after_cursor", Optional[Union[int, RunShardedEventsCursor]]), ("before_cursor", Optional[Union[int, RunShardedEventsCursor]]), ("after_timestamp", Optional[float]), ("before_timestamp", Optional[float]), ], ) ): """Defines a set of filter fields for fetching a set of event log entries or event log records. Args: event_type (Optional[DagsterEventType]): Filter argument for dagster event type asset_key (Optional[AssetKey]): Asset key for which to get asset materialization event entries / records. asset_partitions (Optional[List[str]]): Filter parameter such that only asset materialization events with a partition value matching one of the provided values. Only valid when the `asset_key` parameter is provided. after_cursor (Optional[Union[int, RunShardedEventsCursor]]): Filter parameter such that only records with storage_id greater than the provided value are returned. Using a run-sharded events cursor will result in a significant performance gain when run against a SqliteEventLogStorage implementation (which is run-sharded) before_cursor (Optional[Union[int, RunShardedEventsCursor]]): Filter parameter such that records with storage_id less than the provided value are returned. Using a run-sharded events cursor will result in a significant performance gain when run against a SqliteEventLogStorage implementation (which is run-sharded) after_timestamp (Optional[float]): Filter parameter such that only event records for events with timestamp greater than the provided value are returned. before_timestamp (Optional[float]): Filter parameter such that only event records for events with timestamp less than the provided value are returned. """ def __new__( cls, event_type: Optional[DagsterEventType] = None, asset_key: Optional[AssetKey] = None, asset_partitions: Optional[List[str]] = None, after_cursor: Optional[Union[int, RunShardedEventsCursor]] = None, before_cursor: Optional[Union[int, RunShardedEventsCursor]] = None, after_timestamp: Optional[float] = None, before_timestamp: Optional[float] = None, ): check.opt_list_param(asset_partitions, "asset_partitions", of_type=str) return super(EventRecordsFilter, cls).__new__( cls, event_type=check.opt_inst_param(event_type, "event_type", DagsterEventType), asset_key=check.opt_inst_param(asset_key, "asset_key", AssetKey), asset_partitions=asset_partitions, after_cursor=check.opt_inst_param( after_cursor, "after_cursor", (int, RunShardedEventsCursor) ), before_cursor=check.opt_inst_param( before_cursor, "before_cursor", (int, RunShardedEventsCursor) ), after_timestamp=check.opt_float_param(after_timestamp, "after_timestamp"), before_timestamp=check.opt_float_param(before_timestamp, "before_timestamp"), )
[docs]class EventLogStorage(ABC, MayHaveInstanceWeakref): """Abstract base class for storing structured event logs from pipeline runs. Note that event log storages using SQL databases as backing stores should implement :py:class:`~dagster.core.storage.event_log.SqlEventLogStorage`. Users should not directly instantiate concrete subclasses of this class; they are instantiated by internal machinery when ``dagit`` and ``dagster-graphql`` load, based on the values in the ``dagster.yaml`` file in ``$DAGSTER_HOME``. Configuration of concrete subclasses of this class should be done by setting values in that file. """ @abstractmethod def get_logs_for_run( self, run_id: str, cursor: Optional[int] = -1, of_type: Optional[Union[DagsterEventType, Set[DagsterEventType]]] = None, limit: Optional[int] = None, ) -> Iterable[EventLogEntry]: """Get all of the logs corresponding to a run. Args: run_id (str): The id of the run for which to fetch logs. cursor (Optional[int]): Zero-indexed logs will be returned starting from cursor + 1, i.e., if cursor is -1, all logs will be returned. (default: -1) of_type (Optional[DagsterEventType]): the dagster event type to filter the logs. """ def get_stats_for_run(self, run_id: str) -> PipelineRunStatsSnapshot: """Get a summary of events that have ocurred in a run.""" return build_run_stats_from_events(run_id, self.get_logs_for_run(run_id)) def get_step_stats_for_run(self, run_id: str, step_keys=None) -> List[RunStepKeyStatsSnapshot]: """Get per-step stats for a pipeline run.""" logs = self.get_logs_for_run(run_id) if step_keys: logs = [ event for event in logs if event.is_dagster_event and event.get_dagster_event().step_key in step_keys ] return build_run_step_stats_from_events(run_id, logs) @abstractmethod def store_event(self, event: EventLogEntry): """Store an event corresponding to a pipeline run. Args: event (EventLogEntry): The event to store. """ @abstractmethod def delete_events(self, run_id: str): """Remove events for a given run id""" @abstractmethod def upgrade(self): """This method should perform any schema migrations necessary to bring an out-of-date instance of the storage up to date. """ @abstractmethod def reindex_events(self, print_fn: Callable = lambda _: None, force: bool = False): """Call this method to run any data migrations across the event_log tables.""" @abstractmethod def reindex_assets(self, print_fn: Callable = lambda _: None, force: bool = False): """Call this method to run any data migrations across the asset tables.""" @abstractmethod def wipe(self): """Clear the log storage.""" @abstractmethod def watch(self, run_id: str, start_cursor: int, callback: Callable): """Call this method to start watching.""" @abstractmethod def end_watch(self, run_id: str, handler: Callable): """Call this method to stop watching.""" @property @abstractmethod def is_persistent(self) -> bool: """bool: Whether the storage is persistent.""" def dispose(self): """Explicit lifecycle management.""" def optimize_for_dagit(self, statement_timeout: int): """Allows for optimizing database connection / use in the context of a long lived dagit process""" @abstractmethod def get_event_records( self, event_records_filter: Optional[EventRecordsFilter] = None, limit: Optional[int] = None, ascending: bool = False, ) -> Iterable[EventLogRecord]: pass @abstractmethod def has_asset_key(self, asset_key: AssetKey) -> bool: pass @abstractmethod def all_asset_keys(self) -> Iterable[AssetKey]: pass def get_asset_keys( self, prefix: Optional[List[str]] = None, limit: Optional[int] = None, cursor: Optional[str] = None, ) -> Iterable[AssetKey]: # base implementation of get_asset_keys, using the existing `all_asset_keys` and doing the # filtering in-memory asset_keys = sorted(self.all_asset_keys(), key=str) if prefix: asset_keys = [ asset_key for asset_key in asset_keys if asset_key.path[: len(prefix)] == prefix ] if cursor: cursor_asset = AssetKey.from_db_string(cursor) if cursor_asset and cursor_asset in asset_keys: idx = asset_keys.index(cursor_asset) asset_keys = asset_keys[idx + 1 :] if limit: asset_keys = asset_keys[:limit] return asset_keys @abstractmethod def get_latest_materialization_events( self, asset_keys: Sequence[AssetKey] ) -> Mapping[AssetKey, Optional[EventLogEntry]]: pass @abstractmethod def get_asset_events( self, asset_key: AssetKey, partitions: Optional[List[str]] = None, before_cursor: Optional[int] = None, after_cursor: Optional[int] = None, limit: Optional[int] = None, ascending: bool = False, include_cursor: bool = False, before_timestamp=None, cursor: Optional[int] = None, # deprecated ) -> Union[Iterable[EventLogEntry], Iterable[Tuple[int, EventLogEntry]]]: pass @abstractmethod def get_asset_run_ids(self, asset_key: AssetKey) -> Iterable[str]: pass @abstractmethod def wipe_asset(self, asset_key: AssetKey): """Remove asset index history from event log for given asset_key""" @abstractmethod def get_materialization_count_by_partition( self, asset_keys: Sequence[AssetKey] ) -> Mapping[AssetKey, Mapping[str, int]]: pass
def extract_asset_events_cursor(cursor, before_cursor, after_cursor, ascending): if cursor: warnings.warn( "Parameter `cursor` is a deprecated for `get_asset_events`. Use `before_cursor` or `after_cursor` instead" ) if ascending and after_cursor is None: after_cursor = cursor if not ascending and before_cursor is None: before_cursor = cursor if after_cursor is not None: try: after_cursor = int(after_cursor) except ValueError: after_cursor = None if before_cursor is not None: try: before_cursor = int(before_cursor) except ValueError: before_cursor = None return before_cursor, after_cursor