from functools import update_wrapper
from typing import TYPE_CHECKING, AbstractSet, Any, Dict, FrozenSet, List, Optional, Set, Union
from dagster import check
from dagster.core.definitions.policy import RetryPolicy
from dagster.core.definitions.resource_definition import ResourceDefinition
from dagster.core.definitions.solid_definition import NodeDefinition
from dagster.core.errors import (
DagsterInvalidDefinitionError,
DagsterInvalidSubsetError,
DagsterInvariantViolationError,
)
from dagster.core.storage.output_manager import IOutputManagerDefinition
from dagster.core.storage.root_input_manager import (
IInputManagerDefinition,
RootInputManagerDefinition,
)
from dagster.core.storage.tags import MEMOIZED_RUN_TAG
from dagster.core.types.dagster_type import DagsterType, DagsterTypeKind
from dagster.core.utils import str_format_set
from dagster.utils import frozentags, merge_dicts
from dagster.utils.backcompat import experimental_class_warning
from .dependency import (
DependencyDefinition,
DependencyStructure,
DynamicCollectDependencyDefinition,
IDependencyDefinition,
MultiDependencyDefinition,
Node,
NodeHandle,
NodeInvocation,
SolidInputHandle,
)
from .graph_definition import GraphDefinition, SubselectedGraphDefinition
from .hook_definition import HookDefinition
from .mode import ModeDefinition
from .node_definition import NodeDefinition
from .preset import PresetDefinition
from .utils import validate_tags
from .version_strategy import VersionStrategy
if TYPE_CHECKING:
from dagster.core.definitions.partition import PartitionSetDefinition
from dagster.core.execution.execute_in_process_result import ExecuteInProcessResult
from dagster.core.host_representation import PipelineIndex
from dagster.core.instance import DagsterInstance
from dagster.core.snap import ConfigSchemaSnapshot, PipelineSnapshot
from .run_config_schema import RunConfigSchema
[docs]class PipelineDefinition:
"""Defines a Dagster pipeline.
A pipeline is made up of
- Solids, each of which is a single functional unit of data computation.
- Dependencies, which determine how the values produced by solids as their outputs flow from
one solid to another. This tells Dagster how to arrange solids, and potentially multiple
aliased instances of solids, into a directed, acyclic graph (DAG) of compute.
- Modes, which can be used to attach resources, custom loggers, custom system storage
options, and custom executors to a pipeline, and to switch between them.
- Presets, which can be used to ship common combinations of pipeline config options in Python
code, and to switch between them.
Args:
solid_defs (List[SolidDefinition]): The set of solids used in this pipeline.
name (str): The name of the pipeline. Must be unique within any
:py:class:`RepositoryDefinition` containing the pipeline.
description (Optional[str]): A human-readable description of the pipeline.
dependencies (Optional[Dict[Union[str, NodeInvocation], Dict[str, DependencyDefinition]]]):
A structure that declares the dependencies of each solid's inputs on the outputs of
other solids in the pipeline. Keys of the top level dict are either the string names of
solids in the pipeline or, in the case of aliased solids,
:py:class:`NodeInvocations <NodeInvocation>`. Values of the top level dict are
themselves dicts, which map input names belonging to the solid or aliased solid to
:py:class:`DependencyDefinitions <DependencyDefinition>`.
mode_defs (Optional[List[ModeDefinition]]): The set of modes in which this pipeline can
operate. Modes are used to attach resources, custom loggers, custom system storage
options, and custom executors to a pipeline. Modes can be used, e.g., to vary available
resource and logging implementations between local test and production runs.
preset_defs (Optional[List[PresetDefinition]]): A set of preset collections of configuration
options that may be used to execute a pipeline. A preset consists of an environment
dict, an optional subset of solids to execute, and a mode selection. Presets can be used
to ship common combinations of options to pipeline end users in Python code, and can
be selected by tools like Dagit.
tags (Optional[Dict[str, Any]]): Arbitrary metadata for any execution run of the pipeline.
Values that are not strings will be json encoded and must meet the criteria that
`json.loads(json.dumps(value)) == value`. These tag values may be overwritten by tag
values provided at invocation time.
hook_defs (Optional[AbstractSet[HookDefinition]]): A set of hook definitions applied to the
pipeline. When a hook is applied to a pipeline, it will be attached to all solid
instances within the pipeline.
solid_retry_policy (Optional[RetryPolicy]): The default retry policy for all solids in
this pipeline. Only used if retry policy is not defined on the solid definition or
solid invocation.
_parent_pipeline_def (INTERNAL ONLY): Used for tracking pipelines created using solid subsets.
Examples:
.. code-block:: python
@solid
def return_one(_):
return 1
@solid(input_defs=[InputDefinition('num')], required_resource_keys={'op'})
def apply_op(context, num):
return context.resources.op(num)
@resource(config_schema=Int)
def adder_resource(init_context):
return lambda x: x + init_context.resource_config
add_mode = ModeDefinition(
name='add_mode',
resource_defs={'op': adder_resource},
description='Mode that adds things',
)
add_three_preset = PresetDefinition(
name='add_three_preset',
run_config={'resources': {'op': {'config': 3}}},
mode='add_mode',
)
pipeline_def = PipelineDefinition(
name='basic',
solid_defs=[return_one, apply_op],
dependencies={'apply_op': {'num': DependencyDefinition('return_one')}},
mode_defs=[add_mode],
preset_defs=[add_three_preset],
)
"""
def __init__(
self,
solid_defs: Optional[List[NodeDefinition]] = None,
name: Optional[str] = None,
description: Optional[str] = None,
dependencies: Optional[
Dict[Union[str, NodeInvocation], Dict[str, IDependencyDefinition]]
] = None,
mode_defs: Optional[List[ModeDefinition]] = None,
preset_defs: Optional[List[PresetDefinition]] = None,
tags: Optional[Dict[str, Any]] = None,
hook_defs: Optional[AbstractSet[HookDefinition]] = None,
solid_retry_policy: Optional[RetryPolicy] = None,
graph_def=None,
_parent_pipeline_def=None, # https://github.com/dagster-io/dagster/issues/2115
version_strategy: Optional[VersionStrategy] = None,
):
# If a graph is specificed directly use it
if check.opt_inst_param(graph_def, "graph_def", GraphDefinition):
self._graph_def = graph_def
self._name = name or graph_def.name
# Otherwise fallback to legacy construction
else:
if name is None:
check.failed("name must be set provided")
self._name = name
if solid_defs is None:
check.failed("solid_defs must be provided")
self._graph_def = GraphDefinition(
name=name,
dependencies=dependencies,
node_defs=solid_defs,
input_mappings=None,
output_mappings=None,
config=None,
description=None,
)
# tags and description can exist on graph as well, but since
# same graph may be in multiple pipelines/jobs, keep separate layer
self._description = check.opt_str_param(description, "description")
self._tags = validate_tags(tags)
self._current_level_node_defs = self._graph_def.node_defs
mode_definitions = check.opt_list_param(mode_defs, "mode_defs", of_type=ModeDefinition)
if not mode_definitions:
mode_definitions = [ModeDefinition()]
self._mode_definitions = mode_definitions
seen_modes = set()
for mode_def in mode_definitions:
if mode_def.name in seen_modes:
raise DagsterInvalidDefinitionError(
(
'Two modes seen with the name "{mode_name}" in "{pipeline_name}". '
"Modes must have unique names."
).format(mode_name=mode_def.name, pipeline_name=self.name)
)
seen_modes.add(mode_def.name)
self._hook_defs = check.opt_set_param(hook_defs, "hook_defs", of_type=HookDefinition)
self._solid_retry_policy = check.opt_inst_param(
solid_retry_policy, "solid_retry_policy", RetryPolicy
)
self._preset_defs = check.opt_list_param(preset_defs, "preset_defs", PresetDefinition)
self._preset_dict: Dict[str, PresetDefinition] = {}
for preset in self._preset_defs:
if preset.name in self._preset_dict:
raise DagsterInvalidDefinitionError(
(
'Two PresetDefinitions seen with the name "{name}" in "{pipeline_name}". '
"PresetDefinitions must have unique names."
).format(name=preset.name, pipeline_name=self.name)
)
if preset.mode not in seen_modes:
raise DagsterInvalidDefinitionError(
(
'PresetDefinition "{name}" in "{pipeline_name}" '
'references mode "{mode}" which is not defined.'
).format(name=preset.name, pipeline_name=self.name, mode=preset.mode)
)
self._preset_dict[preset.name] = preset
self._resource_requirements = {
mode_def.name: _checked_resource_reqs_for_mode(
mode_def,
self._current_level_node_defs,
self._graph_def._dagster_type_dict,
self._graph_def._node_dict,
self._hook_defs,
self._graph_def._dependency_structure,
)
for mode_def in self._mode_definitions
}
# Recursively explore all nodes in the this pipeline
self._all_node_defs = _build_all_node_defs(self._current_level_node_defs)
self._parent_pipeline_def = check.opt_inst_param(
_parent_pipeline_def, "_parent_pipeline_def", PipelineDefinition
)
self._cached_run_config_schemas: Dict[str, "RunConfigSchema"] = {}
self._cached_external_pipeline = None
self.version_strategy = check.opt_inst_param(
version_strategy, "version_strategy", VersionStrategy
)
if self.version_strategy is not None:
experimental_class_warning("VersionStrategy")
@property
def name(self):
return self._name
@property
def target_type(self):
return "pipeline"
@property
def is_job(self) -> bool:
return False
def describe_target(self):
return f"{self.target_type} '{self.name}'"
@property
def tags(self):
return frozentags(**merge_dicts(self._graph_def.tags, self._tags))
@property
def description(self):
return self._description
@property
def graph(self):
return self._graph_def
@property
def dependency_structure(self):
return self._graph_def.dependency_structure
@property
def dependencies(self):
return self._graph_def.dependencies
def get_run_config_schema(self, mode: Optional[str] = None) -> "RunConfigSchema":
check.str_param(mode, "mode")
mode_def = self.get_mode_definition(mode)
if mode_def.name in self._cached_run_config_schemas:
return self._cached_run_config_schemas[mode_def.name]
self._cached_run_config_schemas[mode_def.name] = _create_run_config_schema(
self,
mode_def,
self._resource_requirements[mode_def.name],
)
return self._cached_run_config_schemas[mode_def.name]
@property
def mode_definitions(self) -> List[ModeDefinition]:
return self._mode_definitions
@property
def preset_defs(self) -> List[PresetDefinition]:
return self._preset_defs
def _get_mode_definition(self, mode: str) -> Optional[ModeDefinition]:
check.str_param(mode, "mode")
for mode_definition in self._mode_definitions:
if mode_definition.name == mode:
return mode_definition
return None
def get_default_mode(self) -> ModeDefinition:
return self._mode_definitions[0]
@property
def is_single_mode(self) -> bool:
return len(self._mode_definitions) == 1
@property
def is_multi_mode(self) -> bool:
return len(self._mode_definitions) > 1
def is_using_memoization(self, run_tags: Dict[str, str]) -> bool:
tags = merge_dicts(self.tags, run_tags)
# If someone provides a false value for memoized run tag, then they are intentionally
# switching off memoization.
if tags.get(MEMOIZED_RUN_TAG) == "false":
return False
return (
MEMOIZED_RUN_TAG in tags and tags.get(MEMOIZED_RUN_TAG) == "true"
) or self.version_strategy is not None
def has_mode_definition(self, mode: str) -> bool:
check.str_param(mode, "mode")
return bool(self._get_mode_definition(mode))
def get_default_mode_name(self) -> str:
return self._mode_definitions[0].name
def get_mode_definition(self, mode: Optional[str] = None) -> ModeDefinition:
check.opt_str_param(mode, "mode")
if mode is None:
check.invariant(self.is_single_mode)
return self.get_default_mode()
mode_def = self._get_mode_definition(mode)
if mode_def is None:
check.failed(
"Could not find mode {mode} in pipeline {name}".format(mode=mode, name=self.name),
)
return mode_def
@property
def available_modes(self) -> List[str]:
return [mode_def.name for mode_def in self._mode_definitions]
def get_required_resource_defs_for_mode(self, mode: str) -> Dict[str, ResourceDefinition]:
return {
resource_key: resource
for resource_key, resource in self.get_mode_definition(mode).resource_defs.items()
if resource_key in self._resource_requirements[mode]
}
@property
def all_node_defs(self) -> List[NodeDefinition]:
return list(self._all_node_defs.values())
@property
def top_level_solid_defs(self) -> List[NodeDefinition]:
return self._current_level_node_defs
def solid_def_named(self, name: str) -> NodeDefinition:
check.str_param(name, "name")
check.invariant(name in self._all_node_defs, "{} not found".format(name))
return self._all_node_defs[name]
def has_solid_def(self, name: str) -> bool:
check.str_param(name, "name")
return name in self._all_node_defs
def get_solid(self, handle):
return self._graph_def.get_solid(handle)
def has_solid_named(self, name):
return self._graph_def.has_solid_named(name)
def solid_named(self, name):
return self._graph_def.solid_named(name)
@property
def solids(self):
return self._graph_def.solids
@property
def solids_in_topological_order(self):
return self._graph_def.solids_in_topological_order
def all_dagster_types(self):
return self._graph_def.all_dagster_types()
def has_dagster_type(self, name):
return self._graph_def.has_dagster_type(name)
def dagster_type_named(self, name):
return self._graph_def.dagster_type_named(name)
def get_pipeline_subset_def(
self, solids_to_execute: Optional[AbstractSet[str]]
) -> "PipelineDefinition":
return (
self if solids_to_execute is None else _get_pipeline_subset_def(self, solids_to_execute)
)
def has_preset(self, name: str) -> bool:
check.str_param(name, "name")
return name in self._preset_dict
def get_preset(self, name: str) -> PresetDefinition:
check.str_param(name, "name")
if name not in self._preset_dict:
raise DagsterInvariantViolationError(
(
'Could not find preset for "{name}". Available presets '
'for pipeline "{pipeline_name}" are {preset_names}.'
).format(
name=name,
preset_names=list(self._preset_dict.keys()),
pipeline_name=self.name,
)
)
return self._preset_dict[name]
def get_pipeline_snapshot(self) -> "PipelineSnapshot":
return self.get_pipeline_index().pipeline_snapshot
def get_pipeline_snapshot_id(self) -> str:
return self.get_pipeline_index().pipeline_snapshot_id
def get_pipeline_index(self) -> "PipelineIndex":
from dagster.core.host_representation import PipelineIndex
from dagster.core.snap import PipelineSnapshot
return PipelineIndex(
PipelineSnapshot.from_pipeline_def(self), self.get_parent_pipeline_snapshot()
)
def get_config_schema_snapshot(self) -> "ConfigSchemaSnapshot":
return self.get_pipeline_snapshot().config_schema_snapshot
@property
def is_subset_pipeline(self) -> bool:
return False
@property
def parent_pipeline_def(self) -> Optional["PipelineDefinition"]:
return None
def get_parent_pipeline_snapshot(self) -> Optional["PipelineSnapshot"]:
return None
@property
def solids_to_execute(self) -> Optional[FrozenSet[str]]:
return None
@property
def hook_defs(self) -> AbstractSet[HookDefinition]:
return self._hook_defs
def get_all_hooks_for_handle(self, handle: NodeHandle) -> FrozenSet[HookDefinition]:
"""Gather all the hooks for the given solid from all places possibly attached with a hook.
A hook can be attached to any of the following objects
* Solid (solid invocation)
* PipelineDefinition
Args:
handle (NodeHandle): The solid's handle
Returns:
FrozenSet[HookDefinition]
"""
check.inst_param(handle, "handle", NodeHandle)
hook_defs: AbstractSet[HookDefinition] = set()
current = handle
lineage = []
while current:
lineage.append(current.name)
current = current.parent
# hooks on top-level solid
name = lineage.pop()
solid = self._graph_def.solid_named(name)
hook_defs = hook_defs.union(solid.hook_defs)
# hooks on non-top-level solids
while lineage:
name = lineage.pop()
solid = solid.definition.solid_named(name)
hook_defs = hook_defs.union(solid.hook_defs)
# hooks applied to a pipeline definition will run on every solid
hook_defs = hook_defs.union(self.hook_defs)
return frozenset(hook_defs)
def get_retry_policy_for_handle(self, handle: NodeHandle) -> Optional[RetryPolicy]:
solid = self.get_solid(handle)
if solid.retry_policy:
return solid.retry_policy
elif solid.definition.retry_policy:
return solid.definition.retry_policy
# could be expanded to look in composite_solid / graph containers
else:
return self._solid_retry_policy
def with_hooks(self, hook_defs: AbstractSet[HookDefinition]) -> "PipelineDefinition":
"""Apply a set of hooks to all solid instances within the pipeline."""
hook_defs = check.set_param(hook_defs, "hook_defs", of_type=HookDefinition)
pipeline_def = PipelineDefinition(
name=self.name,
graph_def=self._graph_def,
mode_defs=self.mode_definitions,
preset_defs=self.preset_defs,
tags=self.tags,
hook_defs=hook_defs | self.hook_defs,
description=self._description,
solid_retry_policy=self._solid_retry_policy,
_parent_pipeline_def=self._parent_pipeline_def,
)
update_wrapper(pipeline_def, self, updated=())
return pipeline_def
# make Callable for decorator reference updates
def __call__(self, *args, **kwargs):
if self.is_job:
msg = (
f"Attempted to call job '{self.name}' directly. Jobs should be invoked by "
"using an execution API function (e.g. `job.execute_in_process`)."
)
else:
msg = (
f"Attempted to call pipeline '{self.name}' directly. Pipelines should be invoked by "
"using an execution API function (e.g. `execute_pipeline`)."
)
raise DagsterInvariantViolationError(msg)
class PipelineSubsetDefinition(PipelineDefinition):
@property
def solids_to_execute(self):
return frozenset(self._graph_def.node_names())
@property
def solid_selection(self) -> List[str]:
# we currently don't pass the real solid_selection (the solid query list) down here.
# so in the short-term, to make the call sites cleaner, we will convert the solids to execute
# to a list
return self._graph_def.node_names()
@property
def parent_pipeline_def(self) -> PipelineDefinition:
return self._parent_pipeline_def
def get_parent_pipeline_snapshot(self) -> Optional["PipelineSnapshot"]:
return self._parent_pipeline_def.get_pipeline_snapshot()
@property
def is_subset_pipeline(self) -> bool:
return True
def get_pipeline_subset_def(
self, solids_to_execute: Optional[AbstractSet[str]]
) -> "PipelineSubsetDefinition":
raise DagsterInvariantViolationError("Pipeline subsets may not be subset again.")
def _dep_key_of(solid: Node) -> NodeInvocation:
return NodeInvocation(
name=solid.definition.name,
alias=solid.name,
tags=solid.tags,
hook_defs=solid.hook_defs,
retry_policy=solid.retry_policy,
)
def _get_pipeline_subset_def(
pipeline_def: PipelineDefinition,
solids_to_execute: AbstractSet[str],
) -> "PipelineSubsetDefinition":
"""
Build a pipeline which is a subset of another pipeline.
Only includes the solids which are in solids_to_execute.
"""
check.inst_param(pipeline_def, "pipeline_def", PipelineDefinition)
check.set_param(solids_to_execute, "solids_to_execute", of_type=str)
graph = pipeline_def.graph
for solid_name in solids_to_execute:
if not graph.has_solid_named(solid_name):
raise DagsterInvalidSubsetError(
"{target_type} {pipeline_name} has no {node_type} named {name}.".format(
target_type=pipeline_def.target_type,
pipeline_name=pipeline_def.name,
name=solid_name,
node_type="ops" if pipeline_def.is_job else "solids",
),
)
# go in topo order to ensure deps dict is ordered
solids = list(
filter(lambda solid: solid.name in solids_to_execute, graph.solids_in_topological_order)
)
deps: Dict[
Union[str, NodeInvocation],
Dict[str, IDependencyDefinition],
] = {_dep_key_of(solid): {} for solid in solids}
for solid in solids:
for input_handle in solid.input_handles():
if graph.dependency_structure.has_direct_dep(input_handle):
output_handle = pipeline_def.dependency_structure.get_direct_dep(input_handle)
if output_handle.solid.name in solids_to_execute:
deps[_dep_key_of(solid)][input_handle.input_def.name] = DependencyDefinition(
solid=output_handle.solid.name, output=output_handle.output_def.name
)
elif graph.dependency_structure.has_dynamic_fan_in_dep(input_handle):
output_handle = graph.dependency_structure.get_dynamic_fan_in_dep(input_handle)
if output_handle.solid.name in solids_to_execute:
deps[_dep_key_of(solid)][
input_handle.input_def.name
] = DynamicCollectDependencyDefinition(
solid_name=output_handle.solid.name,
output_name=output_handle.output_def.name,
)
elif graph.dependency_structure.has_fan_in_deps(input_handle):
output_handles = graph.dependency_structure.get_fan_in_deps(input_handle)
deps[_dep_key_of(solid)][input_handle.input_def.name] = MultiDependencyDefinition(
[
DependencyDefinition(
solid=output_handle.solid.name, output=output_handle.output_def.name
)
for output_handle in output_handles
if output_handle.solid.name in solids_to_execute
]
)
# else input is unconnected
try:
sub_pipeline_def = PipelineSubsetDefinition(
name=pipeline_def.name, # should we change the name for subsetted pipeline?
solid_defs=list({solid.definition for solid in solids}),
mode_defs=pipeline_def.mode_definitions,
dependencies=deps,
_parent_pipeline_def=pipeline_def,
tags=pipeline_def.tags,
hook_defs=pipeline_def.hook_defs,
)
return sub_pipeline_def
except DagsterInvalidDefinitionError as exc:
# This handles the case when you construct a subset such that an unsatisfied
# input cannot be loaded from config. Instead of throwing a DagsterInvalidDefinitionError,
# we re-raise a DagsterInvalidSubsetError.
raise DagsterInvalidSubsetError(
f"The attempted subset {str_format_set(solids_to_execute)} for {pipeline_def.target_type} "
f"{pipeline_def.name} results in an invalid {pipeline_def.target_type}"
) from exc
def _checked_resource_reqs_for_mode(
mode_def: ModeDefinition,
node_defs: List[NodeDefinition],
dagster_type_dict: Dict[str, DagsterType],
solid_dict: Dict[str, Node],
pipeline_hook_defs: AbstractSet[HookDefinition],
dependency_structure: DependencyStructure,
) -> Set[str]:
"""
Calculate the resource requirements for the pipeline in this mode and ensure they are
provided by the mode.
We combine these operations in to one traversal to allow for raising excpetions that provide
as much context as possible about where the unsatisfied resource requirement came from.
"""
resource_reqs: Set[str] = set()
mode_output_managers = set(
key
for key, resource_def in mode_def.resource_defs.items()
if isinstance(resource_def, IOutputManagerDefinition)
)
mode_resources = set(mode_def.resource_defs.keys())
for node_def in node_defs:
for solid_def in node_def.iterate_solid_defs():
for required_resource in solid_def.required_resource_keys:
resource_reqs.add(required_resource)
if required_resource not in mode_resources:
error_msg = _get_missing_resource_error_msg(
resource_type="resource",
resource_key=required_resource,
descriptor=solid_def.describe_node(),
mode_def=mode_def,
resource_defs_of_type=mode_resources,
)
raise DagsterInvalidDefinitionError(error_msg)
for output_def in solid_def.output_defs:
resource_reqs.add(output_def.io_manager_key)
if output_def.io_manager_key not in mode_resources:
error_msg = _get_missing_resource_error_msg(
resource_type="IO manager",
resource_key=output_def.io_manager_key,
descriptor=f"output '{output_def.name}' of {solid_def.describe_node()}",
mode_def=mode_def,
resource_defs_of_type=mode_output_managers,
)
raise DagsterInvalidDefinitionError(error_msg)
resource_reqs.update(
_checked_type_resource_reqs_for_mode(
mode_def,
dagster_type_dict,
)
)
# Validate unsatisfied inputs can be materialized from config
resource_reqs.update(
_checked_input_resource_reqs_for_mode(dependency_structure, solid_dict, mode_def)
)
for solid in solid_dict.values():
for hook_def in solid.hook_defs:
for required_resource in hook_def.required_resource_keys:
resource_reqs.add(required_resource)
if required_resource not in mode_resources:
error_msg = _get_missing_resource_error_msg(
resource_type="resource",
resource_key=required_resource,
descriptor=f"hook '{hook_def.name}'",
mode_def=mode_def,
resource_defs_of_type=mode_resources,
)
raise DagsterInvalidDefinitionError(error_msg)
for hook_def in pipeline_hook_defs:
for required_resource in hook_def.required_resource_keys:
resource_reqs.add(required_resource)
if required_resource not in mode_resources:
error_msg = _get_missing_resource_error_msg(
resource_type="resource",
resource_key=required_resource,
descriptor=f"hook '{hook_def.name}'",
mode_def=mode_def,
resource_defs_of_type=mode_resources,
)
raise DagsterInvalidDefinitionError(error_msg)
for resource_key, resource in mode_def.resource_defs.items():
for required_resource in resource.required_resource_keys:
if required_resource not in mode_resources:
error_msg = _get_missing_resource_error_msg(
resource_type="resource",
resource_key=required_resource,
descriptor=f"resource at key '{resource_key}'",
mode_def=mode_def,
resource_defs_of_type=mode_resources,
)
raise DagsterInvalidDefinitionError(error_msg)
# Finally, recursively add any resources that the set of required resources require
while True:
new_resources: Set[str] = set()
for resource_key in resource_reqs:
resource = mode_def.resource_defs[resource_key]
new_resources.update(resource.required_resource_keys - resource_reqs)
if not len(new_resources):
break
resource_reqs.update(new_resources)
return resource_reqs
def _checked_type_resource_reqs_for_mode(
mode_def: ModeDefinition,
dagster_type_dict: Dict[str, DagsterType],
) -> Set[str]:
"""
Calculate all the resource requirements related to DagsterTypes for this mode and ensure the
mode provides those resources.
"""
resource_reqs = set()
mode_resources = set(mode_def.resource_defs.keys())
for dagster_type in dagster_type_dict.values():
for required_resource in dagster_type.required_resource_keys:
resource_reqs.add(required_resource)
if required_resource not in mode_resources:
error_msg = _get_missing_resource_error_msg(
resource_type="resource",
resource_key=required_resource,
descriptor=f"type '{dagster_type.display_name}'",
mode_def=mode_def,
resource_defs_of_type=mode_resources,
)
raise DagsterInvalidDefinitionError(error_msg)
if dagster_type.loader:
for required_resource in dagster_type.loader.required_resource_keys():
resource_reqs.add(required_resource)
if required_resource not in mode_resources:
error_msg = _get_missing_resource_error_msg(
resource_type="resource",
resource_key=required_resource,
descriptor=f"the loader on type '{dagster_type.display_name}'",
mode_def=mode_def,
resource_defs_of_type=mode_resources,
)
raise DagsterInvalidDefinitionError(error_msg)
if dagster_type.materializer:
for required_resource in dagster_type.materializer.required_resource_keys():
resource_reqs.add(required_resource)
if required_resource not in mode_resources:
error_msg = _get_missing_resource_error_msg(
resource_type="resource",
resource_key=required_resource,
descriptor=f"the materializer on type '{dagster_type.display_name}'",
mode_def=mode_def,
resource_defs_of_type=mode_resources,
)
raise DagsterInvalidDefinitionError(error_msg)
return resource_reqs
def _checked_input_resource_reqs_for_mode(
dependency_structure: DependencyStructure,
node_dict: Dict[str, Node],
mode_def: ModeDefinition,
outer_dependency_structures: Optional[List[DependencyStructure]] = None,
outer_solids: Optional[List[Node]] = None,
) -> Set[str]:
outer_dependency_structures = check.opt_list_param(
outer_dependency_structures, "outer_dependency_structures", DependencyStructure
)
outer_solids = check.opt_list_param(outer_solids, "outer_solids", Node)
resource_reqs = set()
mode_root_input_managers = set(
key
for key, resource_def in mode_def.resource_defs.items()
if isinstance(resource_def, RootInputManagerDefinition)
)
for node in node_dict.values():
if node.is_graph:
graph_def = node.definition.ensure_graph_def()
# check inner solids
resource_reqs.update(
_checked_input_resource_reqs_for_mode(
dependency_structure=graph_def.dependency_structure,
node_dict=graph_def.node_dict,
mode_def=mode_def,
outer_dependency_structures=outer_dependency_structures
+ [dependency_structure],
outer_solids=outer_solids + [node],
)
)
for handle in node.input_handles():
source_output_handles = None
if dependency_structure.has_deps(handle):
# input is connected to outputs from the same dependency structure
source_output_handles = dependency_structure.get_deps_list(handle)
else:
# input is connected to outputs from outer dependency structure, e.g. first solids
# in a composite
curr_node = node
curr_handle = handle
curr_index = len(outer_solids) - 1
# Checks to see if input is mapped to an outer dependency structure
while curr_index >= 0 and curr_node.container_maps_input(curr_handle.input_name):
curr_handle = SolidInputHandle(
solid=outer_solids[curr_index],
input_def=curr_node.container_mapped_input(
curr_handle.input_name
).definition,
)
if outer_dependency_structures[curr_index].has_deps(curr_handle):
source_output_handles = outer_dependency_structures[
curr_index
].get_deps_list(curr_handle)
break
curr_node = outer_solids[curr_index]
curr_index -= 1
if source_output_handles:
# input is connected to source output handles within the graph
for source_output_handle in source_output_handles:
output_manager_key = source_output_handle.output_def.io_manager_key
output_manager_def = mode_def.resource_defs[output_manager_key]
if not isinstance(output_manager_def, IInputManagerDefinition):
raise DagsterInvalidDefinitionError(
f'Input "{handle.input_def.name}" of {node.describe_node()} is '
f'connected to output "{source_output_handle.output_def.name}" '
f"of {source_output_handle.solid.describe_node()}. That output does not "
"have an output "
f"manager that knows how to load inputs, so we don't know how "
f"to load the input. To address this, assign an IOManager to "
f"the upstream output."
)
else:
# input is unconnected
input_def = handle.input_def
if (
not input_def.dagster_type.loader
and not input_def.dagster_type.kind == DagsterTypeKind.NOTHING
and not input_def.root_manager_key
and not input_def.has_default_value
):
raise DagsterInvalidDefinitionError(
"Input '{input_name}' in {described_node} is not connected to "
"the output of a previous node and can not be loaded from configuration, "
"making it impossible to execute. "
"Possible solutions are:\n"
" * add a dagster_type_loader for the type '{dagster_type}'\n"
" * connect '{input_name}' to the output of another node\n".format(
described_node=node.describe_node(),
input_name=input_def.name,
dagster_type=input_def.dagster_type.display_name,
)
)
# If a root manager is provided, it's always used. I.e. it has priority over
# the other ways of loading unsatisfied inputs - dagster type loaders and
# default values.
if input_def.root_manager_key:
resource_reqs.add(input_def.root_manager_key)
if input_def.root_manager_key not in mode_def.resource_defs:
error_msg = _get_missing_resource_error_msg(
resource_type="root input manager",
resource_key=input_def.root_manager_key,
descriptor=f"unsatisfied input '{input_def.name}' of {node.describe_node()}",
mode_def=mode_def,
resource_defs_of_type=mode_root_input_managers,
)
raise DagsterInvalidDefinitionError(error_msg)
return resource_reqs
def _get_missing_resource_error_msg(
resource_type, resource_key, descriptor, mode_def, resource_defs_of_type
):
if mode_def.name == "default":
return (
f"{resource_type} key '{resource_key}' is required by "
f"{descriptor}, but is not provided. Provide a {resource_type} for key '{resource_key}', "
f"or change '{resource_key}' to one of the provided {resource_type} keys: "
f"{sorted(resource_defs_of_type)}."
)
else:
return (
f"{resource_type} key '{resource_key}' is required by "
f"{descriptor}, but is not provided by mode '{mode_def.name}'. "
f"In mode '{mode_def.name}', provide a {resource_type} for key '{resource_key}', "
f"or change '{resource_key}' to one of the provided root input managers keys: {sorted(resource_defs_of_type)}."
)
def _build_all_node_defs(node_defs: List[NodeDefinition]) -> Dict[str, NodeDefinition]:
all_defs: Dict[str, NodeDefinition] = {}
for current_level_node_def in node_defs:
for node_def in current_level_node_def.iterate_node_defs():
if node_def.name in all_defs:
if all_defs[node_def.name] != node_def:
raise DagsterInvalidDefinitionError(
'Detected conflicting node definitions with the same name "{name}"'.format(
name=node_def.name
)
)
else:
all_defs[node_def.name] = node_def
return all_defs
def _create_run_config_schema(
pipeline_def: PipelineDefinition,
mode_definition: ModeDefinition,
required_resources: Set[str],
) -> "RunConfigSchema":
from .run_config import (
RunConfigSchemaCreationData,
construct_config_type_dictionary,
define_run_config_schema_type,
)
from .run_config_schema import RunConfigSchema
# When executing with a subset pipeline, include the missing solids
# from the original pipeline as ignored to allow execution with
# run config that is valid for the original
if isinstance(pipeline_def.graph, SubselectedGraphDefinition):
ignored_solids = pipeline_def.graph.get_top_level_omitted_nodes()
elif pipeline_def.is_subset_pipeline:
if pipeline_def.parent_pipeline_def is None:
check.failed("Unexpected subset pipeline state")
ignored_solids = [
solid
for solid in pipeline_def.parent_pipeline_def.graph.solids
if not pipeline_def.has_solid_named(solid.name)
]
else:
ignored_solids = []
run_config_schema_type = define_run_config_schema_type(
RunConfigSchemaCreationData(
pipeline_name=pipeline_def.name,
solids=pipeline_def.graph.solids,
graph_def=pipeline_def.graph,
dependency_structure=pipeline_def.graph.dependency_structure,
mode_definition=mode_definition,
logger_defs=mode_definition.loggers,
ignored_solids=ignored_solids,
required_resources=required_resources,
is_using_graph_job_op_apis=pipeline_def.is_job,
)
)
if mode_definition.config_mapping:
outer_config_type = mode_definition.config_mapping.config_schema.config_type
else:
outer_config_type = run_config_schema_type
if outer_config_type is None:
check.failed("Unexpected outer_config_type value of None")
config_type_dict_by_name, config_type_dict_by_key = construct_config_type_dictionary(
pipeline_def.all_node_defs,
outer_config_type,
)
return RunConfigSchema(
run_config_schema_type=run_config_schema_type,
config_type_dict_by_name=config_type_dict_by_name,
config_type_dict_by_key=config_type_dict_by_key,
config_mapping=mode_definition.config_mapping,
)