from functools import update_wrapper
from typing import (
TYPE_CHECKING,
AbstractSet,
Any,
Dict,
List,
Mapping,
Optional,
Tuple,
Type,
Union,
cast,
)
from dagster import check
from dagster.core.definitions.composition import MappedInputPlaceholder
from dagster.core.definitions.dependency import (
DependencyDefinition,
DynamicCollectDependencyDefinition,
IDependencyDefinition,
MultiDependencyDefinition,
Node,
NodeHandle,
NodeInvocation,
SolidOutputHandle,
)
from dagster.core.definitions.node_definition import NodeDefinition
from dagster.core.definitions.policy import RetryPolicy
from dagster.core.errors import DagsterInvalidDefinitionError, DagsterInvalidSubsetError
from dagster.core.selector.subset_selector import (
LeafNodeSelection,
OpSelectionData,
parse_op_selection,
)
from dagster.core.storage.fs_asset_io_manager import fs_asset_io_manager
from dagster.core.storage.tags import PARTITION_NAME_TAG
from dagster.core.utils import str_format_set
from .executor_definition import ExecutorDefinition
from .graph_definition import GraphDefinition, SubselectedGraphDefinition
from .hook_definition import HookDefinition
from .mode import ModeDefinition
from .partition import PartitionSetDefinition
from .pipeline_definition import PipelineDefinition
from .preset import PresetDefinition
from .resource_definition import ResourceDefinition
from .run_request import RunRequest
from .version_strategy import VersionStrategy
if TYPE_CHECKING:
from dagster.core.execution.execute_in_process_result import ExecuteInProcessResult
from dagster.core.instance import DagsterInstance
from dagster.core.snap import PipelineSnapshot
[docs]class JobDefinition(PipelineDefinition):
def __init__(
self,
mode_def: ModeDefinition,
graph_def: GraphDefinition,
name: Optional[str] = None,
description: Optional[str] = None,
preset_defs: Optional[List[PresetDefinition]] = None,
tags: Optional[Dict[str, Any]] = None,
hook_defs: Optional[AbstractSet[HookDefinition]] = None,
op_retry_policy: Optional[RetryPolicy] = None,
version_strategy: Optional[VersionStrategy] = None,
_op_selection_data: Optional[OpSelectionData] = None,
):
self._cached_partition_set: Optional["PartitionSetDefinition"] = None
self._op_selection_data = check.opt_inst_param(
_op_selection_data, "_op_selection_data", OpSelectionData
)
super(JobDefinition, self).__init__(
name=name,
description=description,
mode_defs=[mode_def],
preset_defs=preset_defs,
tags=tags,
hook_defs=hook_defs,
solid_retry_policy=op_retry_policy,
graph_def=graph_def,
version_strategy=version_strategy,
)
@property
def target_type(self) -> str:
return "job"
@property
def is_job(self) -> bool:
return True
def describe_target(self):
return f"{self.target_type} '{self.name}'"
@property
def executor_def(self) -> ExecutorDefinition:
return self.mode_definitions[0].executor_defs[0]
@property
def resource_defs(self) -> Mapping[str, ResourceDefinition]:
return self.mode_definitions[0].resource_defs
[docs] def execute_in_process(
self,
run_config: Optional[Dict[str, Any]] = None,
instance: Optional["DagsterInstance"] = None,
partition_key: Optional[str] = None,
raise_on_error: bool = True,
op_selection: Optional[List[str]] = None,
) -> "ExecuteInProcessResult":
"""
Execute the Job in-process, gathering results in-memory.
The `executor_def` on the Job will be ignored, and replaced with the in-process executor.
If using the default `io_manager`, it will switch from filesystem to in-memory.
Args:
run_config (Optional[Dict[str, Any]]:
The configuration for the run
instance (Optional[DagsterInstance]):
The instance to execute against, an ephemeral one will be used if none provided.
partition_key: (Optional[str])
The string partition key that specifies the run config to execute. Can only be used
to select run config for jobs with partitioned config.
raise_on_error (Optional[bool]): Whether or not to raise exceptions when they occur.
Defaults to ``True``.
op_selection (Optional[List[str]]): A list of op selection queries (including single op
names) to execute. For example:
* ``['some_op']``: selects ``some_op`` itself.
* ``['*some_op']``: select ``some_op`` and all its ancestors (upstream dependencies).
* ``['*some_op+++']``: select ``some_op``, all its ancestors, and its descendants
(downstream dependencies) within 3 levels down.
* ``['*some_op', 'other_op_a', 'other_op_b+']``: select ``some_op`` and all its
ancestors, ``other_op_a`` itself, and ``other_op_b`` and its direct child ops.
Returns:
:py:class:`~dagster.ExecuteInProcessResult`
"""
from dagster.core.definitions.executor_definition import execute_in_process_executor
from dagster.core.execution.execute_in_process import core_execute_in_process
run_config = check.opt_dict_param(run_config, "run_config")
op_selection = check.opt_list_param(op_selection, "op_selection", str)
partition_key = check.opt_str_param(partition_key, "partition_key")
check.invariant(
len(self._mode_definitions) == 1,
"execute_in_process only supported on job / single mode pipeline",
)
base_mode = self.get_mode_definition()
# create an ephemeral in process mode by replacing the executor_def and
# switching the default fs io_manager to in mem, if another was not set
in_proc_mode = ModeDefinition(
name="in_process",
executor_defs=[execute_in_process_executor],
resource_defs=_swap_default_io_man(base_mode.resource_defs, self),
logger_defs=base_mode.loggers,
_config_mapping=base_mode.config_mapping,
_partitioned_config=base_mode.partitioned_config,
)
ephemeral_job = JobDefinition(
name=self._name,
graph_def=self._graph_def,
mode_def=in_proc_mode,
hook_defs=self.hook_defs,
tags=self.tags,
op_retry_policy=self._solid_retry_policy,
version_strategy=self.version_strategy,
).get_job_def_for_op_selection(op_selection)
if partition_key:
if not base_mode.partitioned_config:
check.failed(
f"Provided partition key `{partition_key}` for job `{self._name}` without a partitioned config"
)
check.invariant(
not run_config,
"Cannot provide both run_config and partition_key arguments to `execute_in_process`",
)
run_config = base_mode.partitioned_config.get_run_config(partition_key)
return core_execute_in_process(
node=self._graph_def,
ephemeral_pipeline=ephemeral_job,
run_config=run_config,
instance=instance,
output_capturing_enabled=True,
raise_on_error=raise_on_error,
run_tags={PARTITION_NAME_TAG: partition_key} if partition_key else None,
)
@property
def op_selection_data(self) -> Optional[OpSelectionData]:
return self._op_selection_data
def get_job_def_for_op_selection(
self,
op_selection: Optional[List[str]] = None,
) -> "JobDefinition":
if not op_selection:
return self
op_selection = check.opt_list_param(op_selection, "op_selection", str)
resolved_op_selection_dict = parse_op_selection(self, op_selection)
sub_graph = get_subselected_graph_definition(self.graph, resolved_op_selection_dict)
return JobDefinition(
name=self.name,
description=self.description,
mode_def=self.get_mode_definition(),
preset_defs=self.preset_defs,
tags=self.tags,
hook_defs=self.hook_defs,
op_retry_policy=self._solid_retry_policy,
graph_def=sub_graph,
version_strategy=self.version_strategy,
_op_selection_data=OpSelectionData(
op_selection=op_selection,
resolved_op_selection=set(
resolved_op_selection_dict.keys()
), # equivalent to solids_to_execute. currently only gets top level nodes.
parent_job_def=self, # used by pipeline snapshot lineage
),
)
def get_partition_set_def(self) -> Optional["PartitionSetDefinition"]:
if not self.is_single_mode:
return None
mode = self.get_mode_definition()
if not mode.partitioned_config:
return None
if not self._cached_partition_set:
self._cached_partition_set = PartitionSetDefinition(
job_name=self.name,
name=f"{self.name}_partition_set",
partitions_def=mode.partitioned_config.partitions_def,
run_config_fn_for_partition=mode.partitioned_config.run_config_for_partition_fn,
mode=mode.name,
)
return self._cached_partition_set
def run_request_for_partition(self, partition_key: str, run_key: Optional[str]) -> RunRequest:
partition_set = self.get_partition_set_def()
if not partition_set:
check.failed("Called run_request_for_partition on a non-partitioned job")
partition = partition_set.get_partition(partition_key)
run_config = partition_set.run_config_for_partition(partition)
tags = partition_set.tags_for_partition(partition)
return RunRequest(run_key=run_key, run_config=run_config, tags=tags)
[docs] def with_hooks(self, hook_defs: AbstractSet[HookDefinition]) -> "JobDefinition":
"""Apply a set of hooks to all op instances within the job."""
hook_defs = check.set_param(hook_defs, "hook_defs", of_type=HookDefinition)
job_def = JobDefinition(
name=self.name,
graph_def=self._graph_def,
mode_def=self.mode_definitions[0],
preset_defs=self.preset_defs,
tags=self.tags,
hook_defs=hook_defs | self.hook_defs,
description=self._description,
op_retry_policy=self._solid_retry_policy,
_op_selection_data=self._op_selection_data,
)
update_wrapper(job_def, self, updated=())
return job_def
def get_parent_pipeline_snapshot(self) -> Optional["PipelineSnapshot"]:
return (
self.op_selection_data.parent_job_def.get_pipeline_snapshot()
if self.op_selection_data
else None
)
def _swap_default_io_man(resources: Dict[str, ResourceDefinition], job: PipelineDefinition):
"""
Used to create the user facing experience of the default io_manager
switching to in-memory when using execute_in_process.
"""
from dagster.core.storage.mem_io_manager import mem_io_manager
from .graph_definition import default_job_io_manager
if (
# pylint: disable=comparison-with-callable
resources.get("io_manager") in [default_job_io_manager, fs_asset_io_manager]
and job.version_strategy is None
):
updated_resources = dict(resources)
updated_resources["io_manager"] = mem_io_manager
return updated_resources
return resources
def _dep_key_of(node: Node) -> NodeInvocation:
return NodeInvocation(
name=node.definition.name,
alias=node.name,
tags=node.tags,
hook_defs=node.hook_defs,
retry_policy=node.retry_policy,
)
def get_subselected_graph_definition(
graph: GraphDefinition,
resolved_op_selection_dict: Dict,
parent_handle: Optional[NodeHandle] = None,
) -> SubselectedGraphDefinition:
deps: Dict[
Union[str, NodeInvocation],
Dict[str, IDependencyDefinition],
] = {}
selected_nodes: List[Tuple[str, NodeDefinition]] = []
for node in graph.solids_in_topological_order:
node_handle = NodeHandle(node.name, parent=parent_handle)
# skip if the node isn't selected
if node.name not in resolved_op_selection_dict:
continue
# rebuild graph if any nodes inside the graph are selected
if node.is_graph and resolved_op_selection_dict[node.name] is not LeafNodeSelection:
definition = get_subselected_graph_definition(
node.definition,
resolved_op_selection_dict[node.name],
parent_handle=node_handle,
)
# use definition if the node as a whole is selected. this includes selecting the entire graph
else:
definition = node.definition
selected_nodes.append((node.name, definition))
# build dependencies for the node. we do it for both cases because nested graphs can have
# inputs and outputs too
deps[_dep_key_of(node)] = {}
for input_handle in node.input_handles():
if graph.dependency_structure.has_direct_dep(input_handle):
output_handle = graph.dependency_structure.get_direct_dep(input_handle)
if output_handle.solid.name in resolved_op_selection_dict:
deps[_dep_key_of(node)][input_handle.input_def.name] = DependencyDefinition(
solid=output_handle.solid.name, output=output_handle.output_def.name
)
elif graph.dependency_structure.has_dynamic_fan_in_dep(input_handle):
output_handle = graph.dependency_structure.get_dynamic_fan_in_dep(input_handle)
if output_handle.solid.name in resolved_op_selection_dict:
deps[_dep_key_of(node)][
input_handle.input_def.name
] = DynamicCollectDependencyDefinition(
solid_name=output_handle.solid.name,
output_name=output_handle.output_def.name,
)
elif graph.dependency_structure.has_fan_in_deps(input_handle):
output_handles = graph.dependency_structure.get_fan_in_deps(input_handle)
multi_dependencies = [
DependencyDefinition(
solid=output_handle.solid.name, output=output_handle.output_def.name
)
for output_handle in output_handles
if (
isinstance(output_handle, SolidOutputHandle)
and output_handle.solid.name in resolved_op_selection_dict
)
]
deps[_dep_key_of(node)][input_handle.input_def.name] = MultiDependencyDefinition(
cast(
List[Union[DependencyDefinition, Type[MappedInputPlaceholder]]],
multi_dependencies,
)
)
# else input is unconnected
# filter out unselected input/output mapping
new_input_mappings = list(
filter(
lambda input_mapping: input_mapping.maps_to.solid_name
in [name for name, _ in selected_nodes],
graph._input_mappings, # pylint: disable=protected-access
)
)
new_output_mappings = list(
filter(
lambda output_mapping: output_mapping.maps_from.solid_name
in [name for name, _ in selected_nodes],
graph._output_mappings, # pylint: disable=protected-access
)
)
try:
return SubselectedGraphDefinition(
parent_graph_def=graph,
dependencies=deps,
node_defs=[definition for _, definition in selected_nodes],
input_mappings=new_input_mappings,
output_mappings=new_output_mappings,
)
except DagsterInvalidDefinitionError as exc:
# This handles the case when you construct a subset such that an unsatisfied
# input cannot be loaded from config. Instead of throwing a DagsterInvalidDefinitionError,
# we re-raise a DagsterInvalidSubsetError.
raise DagsterInvalidSubsetError(
f"The attempted subset {str_format_set(resolved_op_selection_dict)} for graph "
f"{graph.name} results in an invalid graph."
) from exc