Source code for dagster.core.definitions.decorators.graph

from functools import update_wrapper
from typing import Any, Callable, Dict, List, Optional, Union, overload

from dagster import check
from dagster.core.decorator_utils import format_docstring_for_description

from ..config import ConfigMapping
from ..graph_definition import GraphDefinition
from ..input import GraphIn, InputDefinition
from ..output import GraphOut, OutputDefinition


class _Graph:
    def __init__(
        self,
        name: Optional[str] = None,
        description: Optional[str] = None,
        input_defs: Optional[List[InputDefinition]] = None,
        output_defs: Optional[List[OutputDefinition]] = None,
        ins: Optional[Dict[str, GraphIn]] = None,
        out: Optional[Union[GraphOut, Dict[str, GraphOut]]] = None,
        tags: Optional[Dict[str, Any]] = None,
        config_mapping: Optional[ConfigMapping] = None,
    ):
        self.name = check.opt_str_param(name, "name")
        self.description = check.opt_str_param(description, "description")
        self.input_defs = check.opt_list_param(input_defs, "input_defs", of_type=InputDefinition)
        self.did_pass_outputs = output_defs is not None or out is not None
        self.output_defs = check.opt_nullable_list_param(
            output_defs, "output_defs", of_type=OutputDefinition
        )
        self.ins = ins
        self.out = out
        self.tags = tags
        self.config_mapping = check.opt_inst_param(config_mapping, "config_mapping", ConfigMapping)

    def __call__(self, fn: Callable[..., Any]) -> GraphDefinition:
        check.callable_param(fn, "fn")

        if not self.name:
            self.name = fn.__name__

        if self.ins is not None:
            input_defs = [inp.to_definition(name) for name, inp in self.ins.items()]
        else:
            input_defs = check.opt_list_param(
                self.input_defs, "input_defs", of_type=InputDefinition
            )

        if self.out is None:
            output_defs = self.output_defs
        elif isinstance(self.out, GraphOut):
            output_defs = [self.out.to_definition(name=None)]
        else:
            check.dict_param(self.out, "out", key_type=str, value_type=GraphOut)
            output_defs = [out.to_definition(name=name) for name, out in self.out.items()]

        from dagster.core.definitions.decorators.composite_solid import do_composition

        (
            input_mappings,
            output_mappings,
            dependencies,
            solid_defs,
            config_mapping,
            positional_inputs,
        ) = do_composition(
            decorator_name="@graph",
            graph_name=self.name,
            fn=fn,
            provided_input_defs=input_defs,
            provided_output_defs=output_defs,
            ignore_output_from_composition_fn=False,
            config_mapping=self.config_mapping,
        )

        graph_def = GraphDefinition(
            name=self.name,
            dependencies=dependencies,
            node_defs=solid_defs,
            description=self.description or format_docstring_for_description(fn),
            input_mappings=input_mappings,
            output_mappings=output_mappings,
            config=config_mapping,
            positional_inputs=positional_inputs,
            tags=self.tags,
        )
        update_wrapper(graph_def, fn)
        return graph_def


@overload
def graph(name: Callable[..., Any]) -> GraphDefinition:
    ...


@overload
def graph(
    name: Optional[str] = ...,
    description: Optional[str] = ...,
    input_defs: Optional[List[InputDefinition]] = ...,
    output_defs: Optional[List[OutputDefinition]] = ...,
    ins: Optional[Dict[str, GraphIn]] = ...,
    out: Optional[Union[GraphOut, Dict[str, GraphOut]]] = ...,
    tags: Optional[Dict[str, Any]] = ...,
    config: Optional[Union[ConfigMapping, Dict[str, Any]]] = ...,
) -> _Graph:
    ...


[docs]def graph( name: Optional[Union[Callable[..., Any], str]] = None, description: Optional[str] = None, input_defs: Optional[List[InputDefinition]] = None, output_defs: Optional[List[OutputDefinition]] = None, ins: Optional[Dict[str, GraphIn]] = None, out: Optional[Union[GraphOut, Dict[str, GraphOut]]] = None, tags: Optional[Dict[str, Any]] = None, config: Optional[Union[ConfigMapping, Dict[str, Any]]] = None, ) -> Union[GraphDefinition, _Graph]: """Create a graph with the specified parameters from the decorated composition function. Using this decorator allows you to build up a dependency graph by writing a function that invokes ops (or other graphs) and passes the output to subsequent invocations. Args: name (Optional[str]): The name of the graph. Must be unique within any :py:class:`RepositoryDefinition` containing the graph. description (Optional[str]): A human-readable description of the graph. input_defs (Optional[List[InputDefinition]]): Information about the inputs that this graph maps. Information provided here will be combined with what can be inferred from the function signature, with these explicit InputDefinitions taking precedence. Uses of inputs in the body of the decorated composition function will determine the :py:class:`InputMappings <InputMapping>` passed to the underlying :py:class:`GraphDefinition`. output_defs (Optional[List[OutputDefinition]]): Output definitions for the graph. If not provided explicitly, these will be inferred from typehints. Uses of these outputs in the body of the decorated composition function, as well as the return value of the decorated function, will be used to infer the appropriate set of :py:class:`OutputMappings <OutputMapping>` for the underlying :py:class:`GraphDefinition`. To map multiple outputs, return a dictionary from the composition function. ins (Optional[Dict[str, GraphIn]]): Information about the inputs that this graph maps. Information provided here will be combined with what can be inferred from the function signature, with these explicit GraphIn taking precedence. out (Optional[Union[GraphOut, Dict[str, GraphOut]]]): Information about the outputs that this graph maps. Information provided here will be combined with what can be inferred from the return type signature if the function does not use yield. To map multiple outputs, return a dictionary from the composition function. tags (Optional[Dict[str, Any]]): Arbitrary metadata for any execution run of the graph. Values that are not strings will be json encoded and must meet the criteria that `json.loads(json.dumps(value)) == value`. These tag values may be overwritten by tag values provided at invocation time. """ if callable(name): check.invariant(description is None) return _Graph()(name) config_mapping = None # Case 1: a dictionary of config is provided, convert to config mapping. if config is not None and not isinstance(config, ConfigMapping): config = check.dict_param(config, "config", key_type=str) config_mapping = ConfigMapping(config_fn=lambda _: config, config_schema=None) # Case 2: actual config mapping is provided. else: config_mapping = config return _Graph( name=name, description=description, input_defs=input_defs, output_defs=output_defs, ins=ins, out=out, tags=tags, config_mapping=config_mapping, )