Ops are the core unit of computation in Dagster. Multiple ops can be connected to create a Graph.
Name | Description |
---|---|
@op | A decorator used to define ops. Returns an OpDefinition . The decorated function is called the "compute function". |
In | An input to an op. Defined on the ins argument to the @op decorator. |
Out | An output of an op. Defined on the out argument to the @op decorator. |
OpExecutionContext | An object exposing Dagster system APIs for resource access, logging, and more. Can be injected into an op by specifying context as the first argument of the compute function. |
OpDefinition | Class for ops. You will rarely want to instantiate this class directly. Instead, you should use the @op . |
Ops are Dagster's core unit of computation. Individual ops should perform relatively simple tasks. Collections of ops can then be assembled into Graphs to perform more complex tasks. Some examples of tasks appropriate for a single op:
The op as computational unit enables many useful features for data orchestration:
database
), with resource definitions later bound at the Job level. Op logic can thus remain uncoupled to any particular implementation of an external system.AssetMaterialization
), the result of a data quality check (ExpectationResult
), or other arbitrary information. Event streams can be visualized by Dagster's browser UI Dagit. This rich log of execution facilitates debugging, inspection, and real-time monitoring of running jobs.To define an op, use the @op
decorator. The decorated function is called the compute_fn
.
@op
def my_op():
return "hello"
Each op has a set of inputs and outputs, which define the data it consumes and produces. Inputs and outputs are used to define dependencies between ops and to pass data between ops.
Both definitions have a few important properties:
IOManager
, which defines how the output or input is stored and loaded. See the IOManager concept page for more info.Inputs are passed as arguments to an op's compute_fn
. The value of an input can be passed from the output of another op, or stubbed (hardcoded) using config.
The most common way to define inputs is just to add arguments to the decorated function:
@op
def my_input_op(abc, xyz):
pass
An op only starts to execute once all of its inputs have been resolved. Inputs can be resolved in two ways:
You can use a Dagster Type to provide a function that validates an op's input every time the op runs. In this case, you use a dictionary of Ins
corresponding to the decorated function arguments.
MyDagsterType = DagsterType(
type_check_fn=lambda _, value: value % 2 == 0, name="MyDagsterType"
)
@op(ins={"abc": In(dagster_type=MyDagsterType)})
def my_typed_input_op(abc):
pass
Outputs are yielded from an op's compute_fn
. By default, all ops have a single output called "result".
When you have one output, you can return the output value directly.
@op
def my_output_op():
return 5
To define multiple outputs, or to use a different output name than "result", you can provide a dictionary of Outs
to the @op
decorator.
When you have more than one output, you can return a tuple of values, one for each output.
@op(out={"first_output": Out(), "second_output": Out()})
def my_multi_output_op():
return 5, 6
Like inputs, outputs can also have Dagster Types.
When writing an op, users can optionally provide a first parameter, context
. When this parameter is supplied, Dagster will supply a context object to the body of the op. The context provides access to system information like op configuration, loggers, resources, and the current run id. See OpExecutionContext
for the full list of properties accessible from the op context.
For example, to access the logger and log a info message:
@op(config_schema={"name": str})
def context_op(context):
name = context.op_config["name"]
context.log.info(f"My name is {name}")
All definitions in dagster expose a config_schema
, making them configurable and parameterizable. The configuration system is explained in detail on Config Schema.
Op definitions can specify a config_schema
for the op's configuration. The configuration is accessible through the op context at runtime. Therefore, op configuration can be used to specify op behavior at runtime, making ops more flexible and reusable.
For example, we can define an op where the API endpoint it queries is define through it's configuration:
@op(config_schema={"api_endpoint": str})
def my_configurable_op(context):
api_endpoint = context.op_config["api_endpoint"]
data = requests.get(f"{api_endpoint}/data").json()
return data
Ops are used within a Job or Graph. You can also execute a single op, usually within a test context, by directly invoking it. More information can be found at Testing ops.
You may find the need to create utilities that help generate ops. In most cases, you should parameterize op behavior by adding op configuration. You should reach for this pattern if you find yourself needing to vary the arguments to the @op
decorator or OpDefinition
themselves, since they cannot be modified based on op configuration.
To create an op factory, you define a function that returns an OpDefinition
, either directly or by decorating a function with the op decorator.
def my_op_factory(
name="default_name",
ins=None,
**kwargs,
):
"""
Args:
name (str): The name of the new op.
ins (Dict[str, In]): Any Ins for the new op. Default: None.
Returns:
function: The new op.
"""
@op(name=name, ins=ins or {"start": In(Nothing)}, **kwargs)
def my_inner_op(**kwargs):
# Op logic here
pass
return my_inner_op