Source code for dagster.core.execution.context.hook

import warnings
from typing import AbstractSet, Any, Dict, Optional, Set, Union

from dagster import check

from ...definitions.composition import PendingNodeInvocation
from ...definitions.decorators.graph import graph
from ...definitions.dependency import Node
from ...definitions.hook_definition import HookDefinition
from ...definitions.mode import ModeDefinition
from ...definitions.op_definition import OpDefinition
from ...definitions.resource_definition import IContainsGenerator, Resources
from ...definitions.solid_definition import SolidDefinition
from ...errors import DagsterInvalidPropertyError, DagsterInvariantViolationError
from ...log_manager import DagsterLogManager
from ..plan.step import ExecutionStep
from .system import StepExecutionContext


def _property_msg(prop_name: str, method_name: str) -> str:
    return (
        f"The {prop_name} {method_name} is not set when a `HookContext` is constructed from "
        "`build_hook_context`."
    )


def _check_property_on_test_context(
    context: "HookContext", attr_str: str, user_facing_name: str, param_on_builder: str
):
    """Check if attribute is not None on context. If none, error, and point user in direction of
    how to specify the parameter on the context object."""

    value = getattr(context, attr_str)
    if value is None:
        raise DagsterInvalidPropertyError(
            f"Attribute '{user_facing_name}' was not provided when "
            f"constructing context. Provide a value for the '{param_on_builder}' parameter on "
            "'build_hook_context'. To learn more, check out the testing hooks section of Dagster's "
            "concepts docs: https://docs.dagster.io/concepts/ops-jobs-graphs/op-hooks#testing-hooks"
        )
    else:
        return value


[docs]class HookContext: """The ``context`` object available to a hook function on an DagsterEvent. Attributes: log (DagsterLogManager): Centralized log dispatch from user code. hook_def (HookDefinition): The hook that the context object belongs to. solid (Solid): The solid instance associated with the hook. op (Op): The op instance associated with the hook. step_key (str): The key for the step where this hook is being triggered. required_resource_keys (Set[str]): Resources required by this hook. resources (Resources): Resources available in the hook context. solid_config (Any): The parsed config specific to this solid. op_config (Any): The parsed config specific to this op. pipeline_name (str): The name of the pipeline where this hook is being triggered. job_name (str): The name of the job where this hook is being triggered. run_id (str): The id of the run where this hook is being triggered. mode_def (ModeDefinition): The mode with which the pipeline is being run. op_exception (Optional[BaseException]): The thrown exception in a failed op. op_output_values (Dict): Computed output values in an op. """ def __init__( self, step_execution_context: StepExecutionContext, hook_def: HookDefinition, ): self._step_execution_context = step_execution_context self._hook_def = check.inst_param(hook_def, "hook_def", HookDefinition) self._required_resource_keys = hook_def.required_resource_keys self._resources = step_execution_context.scoped_resources_builder.build( self._required_resource_keys ) @property def pipeline_name(self) -> str: return self.job_name @property def job_name(self) -> str: return self._step_execution_context.job_name @property def run_id(self) -> str: return self._step_execution_context.run_id @property def hook_def(self) -> HookDefinition: return self._hook_def @property def solid(self) -> Node: return self.op @property def op(self) -> Node: return self._step_execution_context.solid @property def step(self) -> ExecutionStep: warnings.warn( "The step property of HookContext has been deprecated, and will be removed " "in a future release." ) return self._step_execution_context.step @property def step_key(self) -> str: return self._step_execution_context.step.key @property def mode_def(self) -> Optional[ModeDefinition]: return self._step_execution_context.mode_def @property def required_resource_keys(self) -> AbstractSet[str]: return self._required_resource_keys @property def resources(self) -> "Resources": return self._resources @property def solid_config(self) -> Any: solid_config = self._step_execution_context.resolved_run_config.solids.get( str(self._step_execution_context.step.solid_handle) ) return solid_config.config if solid_config else None @property def op_config(self) -> Any: return self.solid_config # Because of the fact that we directly use the log manager of the step, if a user calls # hook_context.log.with_tags, then they will end up mutating the step's logging tags as well. # This is not problematic because the hook only runs after the step has been completed. @property def log(self) -> DagsterLogManager: return self._step_execution_context.log @property def solid_exception(self) -> Optional[BaseException]: """The thrown exception in a failed solid. Returns: Optional[BaseException]: the exception object, None if the solid execution succeeds. """ return self.op_exception @property def op_exception(self): return self._step_execution_context.step_exception @property def solid_output_values(self) -> Dict[str, Union[Any, Dict[str, Any]]]: """The computed output values. Returns a dictionary where keys are output names and the values are: * the output values in the normal case * a dictionary from mapping key to corresponding value in the mapped case """ results: Dict[str, Union[Any, Dict[str, Any]]] = {} captured = self._step_execution_context.step_output_capture if captured is None: check.failed("Outputs were unexpectedly not captured for hook") # make the returned values more user-friendly for step_output_handle, value in captured.items(): if step_output_handle.mapping_key: if results.get(step_output_handle.output_name) is None: results[step_output_handle.output_name] = { step_output_handle.mapping_key: value } else: results[step_output_handle.output_name][step_output_handle.mapping_key] = value else: results[step_output_handle.output_name] = value return results @property def op_output_values(self): return self.solid_output_values
class UnboundHookContext(HookContext): def __init__( self, resources: Dict[str, Any], mode_def: Optional[ModeDefinition], op: Optional[Union[SolidDefinition, PendingNodeInvocation]], run_id: Optional[str], job_name: Optional[str], op_exception: Optional[Exception], ): # pylint: disable=super-init-not-called from ..build_resources import build_resources from ..context_creation_pipeline import initialize_console_manager self._mode_def = mode_def self._op = None if op is not None: @graph(name="hook_context_container") def temp_graph(): op() self._op = temp_graph.solids[0] # Open resource context manager self._resources_cm = build_resources(resources) self._resources = self._resources_cm.__enter__() # pylint: disable=no-member self._resources_contain_cm = isinstance(self._resources, IContainsGenerator) self._run_id = run_id self._job_name = job_name self._op_exception = op_exception self._log = initialize_console_manager(None) self._cm_scope_entered = False def __enter__(self): self._cm_scope_entered = True return self def __exit__(self, *exc): self._resources_cm.__exit__(*exc) # pylint: disable=no-member def __del__(self): if self._resources_contain_cm and not self._cm_scope_entered: self._resources_cm.__exit__(None, None, None) # pylint: disable=no-member @property def job_name(self) -> str: return self.pipeline_name @property def run_id(self) -> str: return _check_property_on_test_context( self, attr_str="_run_id", user_facing_name="run_id", param_on_builder="run_id" ) @property def hook_def(self) -> HookDefinition: raise DagsterInvalidPropertyError(_property_msg("hook_def", "property")) @property def op(self) -> Node: return _check_property_on_test_context( self, attr_str="_op", user_facing_name="op", param_on_builder="op" ) @property def step(self) -> ExecutionStep: raise DagsterInvalidPropertyError(_property_msg("step", "property")) @property def step_key(self) -> str: raise DagsterInvalidPropertyError(_property_msg("step_key", "property")) @property def mode_def(self) -> Optional[ModeDefinition]: return self._mode_def @property def required_resource_keys(self) -> Set[str]: raise DagsterInvalidPropertyError(_property_msg("hook_def", "property")) @property def resources(self) -> "Resources": if self._resources_contain_cm and not self._cm_scope_entered: raise DagsterInvariantViolationError( "At least one provided resource is a generator, but attempting to access " "resources outside of context manager scope. You can use the following syntax to " "open a context manager: `with build_hook_context(...) as context:`" ) return self._resources @property def solid_config(self) -> Any: raise DagsterInvalidPropertyError(_property_msg("solid_config", "property")) @property def log(self) -> DagsterLogManager: return self._log @property def op_exception(self) -> Optional[BaseException]: return self._op_exception @property def solid_output_values(self) -> Dict[str, Union[Any, Dict[str, Any]]]: """The computed output values. Returns a dictionary where keys are output names and the values are: * the output values in the normal case * a dictionary from mapping key to corresponding value in the mapped case """ raise DagsterInvalidPropertyError(_property_msg("solid_output_values", "method")) class BoundHookContext(HookContext): def __init__( self, hook_def: HookDefinition, resources: Resources, op: Optional[Node], mode_def: Optional[ModeDefinition], log_manager: DagsterLogManager, run_id: Optional[str], job_name: Optional[str], op_exception: Optional[Exception], ): # pylint: disable=super-init-not-called self._hook_def = hook_def self._resources = resources self._op = op self._mode_def = mode_def self._log_manager = log_manager self._run_id = run_id self._job_name = job_name self._op_exception = op_exception @property def job_name(self) -> str: return _check_property_on_test_context( self, attr_str="_job_name", user_facing_name="job_name", param_on_builder="job_name" ) @property def run_id(self) -> str: return _check_property_on_test_context( self, attr_str="_run_id", user_facing_name="run_id", param_on_builder="run_id" ) @property def hook_def(self) -> HookDefinition: return self._hook_def @property def op(self) -> Node: return _check_property_on_test_context( self, attr_str="_op", user_facing_name="op", param_on_builder="op" ) @property def step(self) -> ExecutionStep: raise DagsterInvalidPropertyError(_property_msg("step", "property")) @property def step_key(self) -> str: raise DagsterInvalidPropertyError(_property_msg("step_key", "property")) @property def mode_def(self) -> Optional[ModeDefinition]: return self._mode_def @property def required_resource_keys(self) -> AbstractSet[str]: return self._hook_def.required_resource_keys @property def resources(self) -> "Resources": return self._resources @property def solid_config(self) -> Any: raise DagsterInvalidPropertyError(_property_msg("solid_config", "property")) @property def log(self) -> DagsterLogManager: return self._log_manager @property def op_exception(self): return self._op_exception @property def solid_output_values(self) -> Dict[str, Union[Any, Dict[str, Any]]]: """The computed output values. Returns a dictionary where keys are output names and the values are: * the output values in the normal case * a dictionary from mapping key to corresponding value in the mapped case """ raise DagsterInvalidPropertyError(_property_msg("solid_output_values", "method"))
[docs]def build_hook_context( resources: Optional[Dict[str, Any]] = None, mode_def: Optional[ModeDefinition] = None, solid: Optional[Union[SolidDefinition, PendingNodeInvocation]] = None, op: Optional[Union[OpDefinition, PendingNodeInvocation]] = None, run_id: Optional[str] = None, job_name: Optional[str] = None, op_exception: Optional[Exception] = None, ) -> UnboundHookContext: """Builds hook context from provided parameters. ``build_hook_context`` can be used as either a function or a context manager. If there is a provided resource to ``build_hook_context`` that is a context manager, then it must be used as a context manager. This function can be used to provide the context argument to the invocation of a hook definition. Args: resources (Optional[Dict[str, Any]]): The resources to provide to the context. These can either be values or resource definitions. mode_def (Optional[ModeDefinition]): The mode definition used with the context. op (Optional[OpDefinition, PendingNodeInvocation]): The op definition which the hook may be associated with. solid (Optional[SolidDefinition, PendingNodeInvocation]): (legacy) The solid definition which the hook may be associated with. run_id (Optional[str]): The id of the run in which the hook is invoked (provided for mocking purposes). job_name (Optional[str]): The name of the job in which the hook is used (provided for mocking purposes). op_exception (Optional[Exception]): The exception that caused the hook to be triggered. Examples: .. code-block:: python context = build_hook_context() hook_to_invoke(context) with build_hook_context(resources={"foo": context_manager_resource}) as context: hook_to_invoke(context) """ check.invariant(not (solid and op), "cannot set both `solid` and `op` on `build_hook_context`.") op = check.opt_inst_param(op, "op", (OpDefinition, PendingNodeInvocation)) solid = check.opt_inst_param(solid, "solid", (SolidDefinition, PendingNodeInvocation)) op = op or solid return UnboundHookContext( resources=check.opt_dict_param(resources, "resources", key_type=str), mode_def=check.opt_inst_param(mode_def, "mode_def", ModeDefinition), op=op, run_id=check.opt_str_param(run_id, "run_id"), job_name=check.opt_str_param(job_name, "job_name"), op_exception=check.opt_inst_param(op_exception, "op_exception", Exception), )