Source code for dagster.core.definitions.version_strategy

import hashlib
import inspect
from typing import TYPE_CHECKING, Any, NamedTuple, Optional

if TYPE_CHECKING:
    from .op_definition import OpDefinition
    from .resource_definition import ResourceDefinition
    from .solid_definition import SolidDefinition


class OpVersionContext(NamedTuple):
    """Provides execution-time information for computing the version for an op.
    Attributes:
        op_def (OpDefinition): The definition of the op to compute a version for.
        op_config (Any): The parsed config to be passed to the op during execution.
    """

    op_def: "OpDefinition"
    op_config: Any

    @property
    def solid_def(self) -> "SolidDefinition":
        return self.op_def

    @property
    def solid_config(self) -> Any:
        return self.op_config


SolidVersionContext = OpVersionContext


class ResourceVersionContext(NamedTuple):
    """Version-specific resource context.

    Attributes:
        resource_def (ResourceDefinition): The definition of the resource whose version will be computed.
        resource_config (Any): The parsed config to be passed to the resource during execution.
    """

    resource_def: "ResourceDefinition"
    resource_config: Any


[docs]class VersionStrategy: """Abstract class for defining a strategy to version solids and resources. When subclassing, `get_solid_version` must be implemented, and `get_resource_version` can be optionally implemented. `get_solid_version` should ingest a SolidVersionContext, and `get_resource_version` should ingest a ResourceVersionContext. From that, each synthesize a unique string called a `version`, which will be tagged to outputs of that solid in the pipeline. Providing a `VersionStrategy` instance to a job will enable memoization on that job, such that only steps whose outputs do not have an up-to-date version will run. """ def get_solid_version(self, context: SolidVersionContext) -> str: pass def get_op_version(self, context: OpVersionContext) -> str: return self.get_solid_version(context) def get_resource_version( self, context: ResourceVersionContext # pylint: disable=unused-argument ) -> Optional[str]: return None
[docs]class SourceHashVersionStrategy(VersionStrategy): def _get_source_hash(self, fn): code_as_str = inspect.getsource(fn) return hashlib.sha1(code_as_str.encode("utf-8")).hexdigest() def get_op_version(self, context: OpVersionContext) -> str: compute_fn = context.op_def.compute_fn if callable(compute_fn): return self._get_source_hash(compute_fn) else: return self._get_source_hash(compute_fn.decorated_fn) def get_resource_version(self, context: ResourceVersionContext) -> Optional[str]: return self._get_source_hash(context.resource_def.resource_fn)