Source code for dagster.core.definitions.op_definition

from typing import Dict

from .input import In
from .output import Out
from .solid_definition import SolidDefinition


[docs]class OpDefinition(SolidDefinition): """ Defines an op, the functional unit of user-defined computation. For more details on what a op is, refer to the `Ops Overview <../../concepts/ops-jobs-graphs/ops>`_ . End users should prefer the :func:`@op <op>` decorator. OpDefinition is generally intended to be used by framework authors or for programatically generated ops. Args: name (str): Name of the op. Must be unique within any :py:class:`GraphDefinition` or :py:class:`JobDefinition` that contains the op. input_defs (List[InputDefinition]): Inputs of the op. compute_fn (Callable): The core of the op, the function that performs the actual computation. The signature of this function is determined by ``input_defs``, and optionally, an injected first argument, ``context``, a collection of information provided by the system. This function will be coerced into a generator or an async generator, which must yield one :py:class:`Output` for each of the op's ``output_defs``, and additionally may yield other types of Dagster events, including :py:class:`AssetMaterialization` and :py:class:`ExpectationResult`. output_defs (List[OutputDefinition]): Outputs of the op. config_schema (Optional[ConfigSchema): The schema for the config. If set, Dagster will check that the config provided for the op matches this schema and will fail if it does not. If not set, Dagster will accept any config provided for the op. description (Optional[str]): Human-readable description of the op. tags (Optional[Dict[str, Any]]): Arbitrary metadata for the op. Frameworks may expect and require certain metadata to be attached to a op. Users should generally not set metadata directly. Values that are not strings will be json encoded and must meet the criteria that `json.loads(json.dumps(value)) == value`. required_resource_keys (Optional[Set[str]]): Set of resources handles required by this op. version (Optional[str]): (Experimental) The version of the op's compute_fn. Two ops should have the same version if and only if they deterministically produce the same outputs when provided the same inputs. retry_policy (Optional[RetryPolicy]): The retry policy for this op. Examples: .. code-block:: python def _add_one(_context, inputs): yield Output(inputs["num"] + 1) OpDefinition( name="add_one", input_defs=[InputDefinition("num", Int)], output_defs=[OutputDefinition(Int)], # default name ("result") compute_fn=_add_one, ) """ @property def node_type_str(self) -> str: return "op" @property def is_graph_job_op_node(self) -> bool: return True @property def ins(self) -> Dict[str, In]: return {input_def.name: In.from_definition(input_def) for input_def in self.input_defs} @property def outs(self) -> Dict[str, Out]: return {output_def.name: Out.from_definition(output_def) for output_def in self.output_defs}