Source code for dagster.core.definitions.decorators.op

from functools import update_wrapper
from typing import (
    TYPE_CHECKING,
    Any,
    Callable,
    Dict,
    List,
    Optional,
    Sequence,
    Set,
    Union,
    overload,
)

from dagster import check
from dagster.core.decorator_utils import format_docstring_for_description

from ....seven.typing import get_origin
from ...errors import DagsterInvariantViolationError
from ..inference import InferredOutputProps, infer_output_props
from ..input import In, InputDefinition
from ..output import Out, OutputDefinition
from ..policy import RetryPolicy
from ..solid_definition import SolidDefinition
from .solid import (
    DecoratedSolidFunction,
    NoContextDecoratedSolidFunction,
    resolve_checked_solid_fn_inputs,
)

if TYPE_CHECKING:
    from ..op_definition import OpDefinition


class _Op:
    def __init__(
        self,
        name: Optional[str] = None,
        input_defs: Optional[Sequence[InputDefinition]] = None,
        output_defs: Optional[Sequence[OutputDefinition]] = None,
        description: Optional[str] = None,
        required_resource_keys: Optional[Set[str]] = None,
        config_schema: Optional[Union[Any, Dict[str, Any]]] = None,
        tags: Optional[Dict[str, Any]] = None,
        version: Optional[str] = None,
        decorator_takes_context: Optional[bool] = True,
        retry_policy: Optional[RetryPolicy] = None,
        ins: Optional[Dict[str, In]] = None,
        out: Optional[Union[Out, Dict[str, Out]]] = None,
    ):
        self.name = check.opt_str_param(name, "name")
        self.input_defs = check.opt_nullable_sequence_param(
            input_defs, "input_defs", of_type=InputDefinition
        )
        self.output_defs = output_defs
        self.decorator_takes_context = check.bool_param(
            decorator_takes_context, "decorator_takes_context"
        )

        self.description = check.opt_str_param(description, "description")

        # these will be checked within SolidDefinition
        self.required_resource_keys = required_resource_keys
        self.tags = tags
        self.version = version
        self.retry_policy = retry_policy

        # config will be checked within SolidDefinition
        self.config_schema = config_schema

        self.ins = check.opt_nullable_dict_param(ins, "ins", key_type=str, value_type=In)
        self.out = out

    def __call__(self, fn: Callable[..., Any]) -> "OpDefinition":
        from ..op_definition import OpDefinition

        if self.input_defs is not None and self.ins is not None:
            check.failed("Values cannot be provided for both the 'input_defs' and 'ins' arguments")

        if self.output_defs is not None and self.out is not None:
            check.failed("Values cannot be provided for both the 'output_defs' and 'out' arguments")

        inferred_out = infer_output_props(fn)

        if self.ins is not None:
            input_defs = [inp.to_definition(name) for name, inp in self.ins.items()]
        else:
            input_defs = check.opt_list_param(
                self.input_defs, "input_defs", of_type=InputDefinition
            )

        output_defs_from_out = _resolve_output_defs_from_outs(
            inferred_out=inferred_out, out=self.out
        )
        resolved_output_defs = (
            output_defs_from_out if output_defs_from_out is not None else self.output_defs
        )

        if not self.name:
            self.name = fn.__name__

        if resolved_output_defs is None:
            resolved_output_defs = [OutputDefinition.create_from_inferred(infer_output_props(fn))]
        elif len(resolved_output_defs) == 1:
            resolved_output_defs = [
                resolved_output_defs[0].combine_with_inferred(infer_output_props(fn))
            ]

        compute_fn = (
            DecoratedSolidFunction(decorated_fn=fn)
            if self.decorator_takes_context
            else NoContextDecoratedSolidFunction(decorated_fn=fn)
        )

        resolved_input_defs = resolve_checked_solid_fn_inputs(
            decorator_name="@op",
            fn_name=self.name,
            compute_fn=compute_fn,
            explicit_input_defs=input_defs,
            exclude_nothing=True,
        )

        op_def = OpDefinition(
            name=self.name,
            input_defs=resolved_input_defs,
            output_defs=resolved_output_defs,
            compute_fn=compute_fn,
            config_schema=self.config_schema,
            description=self.description or format_docstring_for_description(fn),
            required_resource_keys=self.required_resource_keys,
            tags=self.tags,
            version=self.version,
            retry_policy=self.retry_policy,
        )
        update_wrapper(op_def, compute_fn.decorated_fn)
        return op_def


def _resolve_output_defs_from_outs(
    inferred_out: InferredOutputProps, out: Optional[Union[Out, dict]]
) -> Optional[List[OutputDefinition]]:
    if out is None:
        return None
    if isinstance(out, Out):
        return [out.to_definition(inferred_out.annotation, name=None)]
    else:
        check.dict_param(out, "out", key_type=str, value_type=Out)

        # If only a single entry has been provided to the out dict, then slurp the
        # annotation into the entry.
        if len(out) == 1:
            name = list(out.keys())[0]
            only_out = out[name]
            return [only_out.to_definition(inferred_out.annotation, name)]

        output_defs = []

        # Introspection on type annotations is experimental, so checking
        # metaclass is the best we can do.
        if inferred_out.annotation and not get_origin(inferred_out.annotation) == tuple:
            raise DagsterInvariantViolationError(
                "Expected Tuple annotation for multiple outputs, but received non-tuple annotation."
            )
        if inferred_out.annotation and not len(inferred_out.annotation.__args__) == len(out):
            raise DagsterInvariantViolationError(
                "Expected Tuple annotation to have number of entries matching the "
                f"number of outputs for more than one output. Expected {len(out)} "
                f"outputs but annotation has {len(inferred_out.annotation.__args__)}."
            )
        for idx, (name, cur_out) in enumerate(out.items()):
            annotation_type = (
                inferred_out.annotation.__args__[idx] if inferred_out.annotation else None
            )
            output_defs.append(cur_out.to_definition(annotation_type, name=name))

        return output_defs


@overload
def op(name: Callable[..., Any]) -> SolidDefinition:
    ...


@overload
def op(
    name: Optional[str] = ...,
    description: Optional[str] = ...,
    ins: Optional[Dict[str, In]] = ...,
    out: Optional[Union[Out, Dict[str, Out]]] = ...,
    config_schema: Optional[Union[Any, Dict[str, Any]]] = ...,
    required_resource_keys: Optional[Set[str]] = ...,
    tags: Optional[Dict[str, Any]] = ...,
    version: Optional[str] = ...,
    retry_policy: Optional[RetryPolicy] = ...,
    input_defs: Optional[List[InputDefinition]] = ...,
    output_defs: Optional[List[OutputDefinition]] = ...,
) -> _Op:
    ...


[docs]def op( name: Optional[Union[Callable[..., Any], str]] = None, description: Optional[str] = None, ins: Optional[Dict[str, In]] = None, out: Optional[Union[Out, Dict[str, Out]]] = None, config_schema: Optional[Union[Any, Dict[str, Any]]] = None, required_resource_keys: Optional[Set[str]] = None, tags: Optional[Dict[str, Any]] = None, version: Optional[str] = None, retry_policy: Optional[RetryPolicy] = None, input_defs: Optional[List[InputDefinition]] = None, output_defs: Optional[List[OutputDefinition]] = None, ) -> Union[SolidDefinition, _Op]: """ Create an op with the specified parameters from the decorated function. Ins and outs will be inferred from the type signature of the decorated function if not explicitly provided. The decorated function will be used as the op's compute function. The signature of the decorated function is more flexible than that of the ``compute_fn`` in the core API; it may: 1. Return a value. This value will be wrapped in an :py:class:`Output` and yielded by the compute function. 2. Return an :py:class:`Output`. This output will be yielded by the compute function. 3. Yield :py:class:`Output` or other :ref:`event objects <events>`. Same as default compute behavior. Note that options 1) and 2) are incompatible with yielding other events -- if you would like to decorate a function that yields events, it must also wrap its eventual output in an :py:class:`Output` and yield it. @op supports ``async def`` functions as well, including async generators when yielding multiple events or outputs. Note that async ops will generally be run on their own unless using a custom :py:class:`Executor` implementation that supports running them together. Args: name (Optional[str]): Name of op. Must be unique within any :py:class:`GraphDefinition` using the op. description (Optional[str]): Human-readable description of this op. If not provided, and the decorated function has docstring, that docstring will be used as the description. ins (Optional[Dict[str, In]]): Information about the inputs to the op. Information provided here will be combined with what can be inferred from the function signature. out (Optional[Union[Out, Dict[str, Out]]]): Information about the op outputs. Information provided here will be combined with what can be inferred from the return type signature if the function does not use yield. config_schema (Optional[ConfigSchema): The schema for the config. If set, Dagster will check that config provided for the op matches this schema and fail if it does not. If not set, Dagster will accept any config provided for the op. required_resource_keys (Optional[Set[str]]): Set of resource handles required by this op. tags (Optional[Dict[str, Any]]): Arbitrary metadata for the op. Frameworks may expect and require certain metadata to be attached to a op. Values that are not strings will be json encoded and must meet the criteria that `json.loads(json.dumps(value)) == value`. version (Optional[str]): (Experimental) The version of the op's compute_fn. Two ops should have the same version if and only if they deterministically produce the same outputs when provided the same inputs. retry_policy (Optional[RetryPolicy]): The retry policy for this op. input_defs (Optional[List[InputDefinition]]): (legacy) Preserved to ease migration from :py:class:`solid`. Can be used in place of ins argument. output_defs (Optional[List[OutputDefinition]]): (legacy) Preserved to ease migration from :py:class:`solid`. Can be used in place of out argument. Examples: .. code-block:: python @op def hello_world(): print('hello') @op def echo(msg: str) -> str: return msg @op( ins={'msg': In(str)}, out=Out(str) ) def echo_2(msg): # same as above return msg @op( out={'word': Out(), 'num': Out()} ) def multi_out() -> Tuple[str, int]: return 'cool', 4 """ # This case is for when decorator is used bare, without arguments. e.g. @op versus @op() if callable(name): check.invariant(input_defs is None) check.invariant(output_defs is None) check.invariant(description is None) check.invariant(config_schema is None) check.invariant(required_resource_keys is None) check.invariant(tags is None) check.invariant(version is None) return _Op()(name) return _Op( name=name, description=description, input_defs=input_defs, output_defs=output_defs, config_schema=config_schema, required_resource_keys=required_resource_keys, tags=tags, version=version, retry_policy=retry_policy, ins=ins, out=out, )