Jobs & Graphs#

A job is an executable graph of ops, with optional resources and configuration.

Relevant APIs#

NameDescription
@jobThe decorator used to create a job.
JobDefinitionA job definition. Jobs are the main unit of execution and monitoring in Dagster. Typically constructed using the @job decorator.
@graphThe decorator used to define a graph, which can form the basis for multiple jobs.
GraphDefinitionA graph definition, which is a set of ops (or sub-graphs) wired together. Forms the core of a job. Typically constructed using the @job decorator.

Jobs are the main unit of execution and monitoring in Dagster. The core of a job is a graph of ops - connected via data dependencies. Jobs can also contain resources and configuration, which determine some of the behavior of those ops.

Ops are linked together by defining the dependencies between their inputs and outputs. An important difference between Dagster and other workflow systems is that, in Dagster, op dependencies are expressed as data dependencies, not just execution dependencies. This difference enables Dagster to support richer modeling of dependencies. Instead of merely ensuring that the order of execution is correct, dependencies in Dagster provide a variety of compile and run-time checks.

You can run a job in a variety of ways - in the Python process where it's defined, via the command line, from Dagit, or via a GraphQL API.

The Dagit UI centers on jobs. Dagit makes it easy to view all the historical runs of each job in one place, as well as to manually kick off runs for a job.

You can define schedules to execute jobs at fixed intervals or define sensors to trigger jobs when external changes occur.

Creating a job#

From scratch#

The simplest way to create a job is to use the job decorator.

Within the decorated function body, you can use function calls to indicate the dependency structure between the ops/graphs. In this example, the add_one op depends on the return_five op's output. Because this data dependency exists, the add_one op executes after return_five runs successfully and emits the required output.

from dagster import job, op


@op
def return_five():
    return 5


@op
def add_one(arg):
    return arg + 1


@job
def do_stuff():
    add_one(return_five())

When a defining job, you can provide resources, configuration, hooks, tags, and an executor (follow the links for explanation of how to use each of these).

From a graph#

Dagster is built around the observation that any data DAG typically contains a logical core of data transformation, which is reusable across a set of environments (e.g. prod, local, staging) or scenarios. The graph usually needs to be "customized" for each environment, by plugging in configuration and services that are specific to that environment.

You can model this by building multiple jobs that use the same underlying graph of ops. The graph represents the logical core of data transformation, and the configuration and resources on each job customize the behavior of that job for its environment.

To do this, you first define a graph with the @graph decorator.

from dagster import graph, op


@op(required_resource_keys={"server"})
def interact_with_server(context):
    context.resources.server.ping_server()


@graph
def do_stuff():
    interact_with_server()

Then you build jobs from it using the GraphDefinition.to_job method:

from dagster import ResourceDefinition

prod_server = ResourceDefinition.mock_resource()
local_server = ResourceDefinition.mock_resource()

prod_job = do_stuff.to_job(resource_defs={"server": prod_server}, name="do_stuff_prod")
local_job = do_stuff.to_job(
    resource_defs={"local": local_server}, name="do_stuff_local"
)

to_job accepts the same arguments as the @job decorator: you can provide resources, configuration, hooks, tags, and an executor.

Including a job in a repository#

You make jobs available to Dagit, GraphQLs, and the command line by including them inside repositories. If you include schedules or sensors in a repository, the repository will automatically include jobs that those schedules or sensors target.

from dagster import repository

from .jobs import do_it_all


@repository
def my_repo():
    return [do_it_all]

Advanced job configuration#

Ops and resources often accept configuration that determines how they behave. By default, you supply configuration for these ops and resources at the time you launch the job.

When constructing a job, you can customize how that configuration will be satisfied, by passing a value to the config parameter of the GraphDefinition.to_job method or the @job decorator. The options are discussed below:

Hardcoded configuration#

You can supply a config dictionary. The supplied dictionary will be used to configure the job whenever the job is launched. It will show up in the Dagit Launchpad and can be overridden.

from dagster import job, op


@op(config_schema={"config_param": str})
def do_something(context):
    context.log.info("config_param: " + context.op_config["config_param"])


default_config = {"ops": {"do_something": {"config": {"config_param": "stuff"}}}}


@job(config=default_config)
def do_it_all_with_default_config():
    do_something()


if __name__ == "__main__":
    # Will log "config_param: stuff"
    do_it_all_with_default_config.execute_in_process()

Partitioned jobs#

You can supply a PartitionedConfig. It defines a discrete set of "partitions", along with a function for generating config for a partition. You can configure a job run by selecting a partition.

For more information on how this works, take a look at the Partitions concept page.

Config mapping#

You can supply a ConfigMapping. This allows you to expose a narrower config interface to your job. Instead of needing to configure every op and resource individually when launching the job, you can supply a smaller number of values to the outer config, and the ConfigMapping can translate it into config for all the job's ops and resources.

from dagster import config_mapping, job, op


@op(config_schema={"config_param": str})
def do_something(context):
    context.log.info("config_param: " + context.op_config["config_param"])


@config_mapping(config_schema={"simplified_param": str})
def simplified_config(val):
    return {
        "ops": {"do_something": {"config": {"config_param": val["simplified_param"]}}}
    }


@job(config=simplified_config)
def do_it_all_with_simplified_config():
    do_something()


if __name__ == "__main__":
    # Will log "config_param: stuff"
    do_it_all_with_simplified_config.execute_in_process(
        run_config={"simplified_param": "stuff"}
    )

Examples#

This section covers a few basic patterns you can use to build more complex graphs and jobs.

Linear Dependencies#

The simplest graph structure is the linear graph. We return one output from the root op, and pass along data through single inputs and outputs.

Linear Graph
from dagster import job, op


@op
def return_one(context) -> int:
    return 1


@op
def add_one(context, number: int) -> int:
    return number + 1


@job
def linear():
    add_one(add_one(add_one(return_one())))

Multiple Inputs#

Multiple Inputs

A single output can be passed to multiple inputs on downstream ops. In this example, the output from the first op is passed to two different ops. The outputs of those ops are combined and passed to the final op.

from dagster import job, op


@op
def return_one(context) -> int:
    return 1


@op
def add_one(context, number: int):
    return number + 1


@op
def adder(context, a: int, b: int) -> int:
    return a + b


@job
def inputs_and_outputs():
    value = return_one()
    a = add_one(value)
    b = add_one(value)
    adder(a, b)

Conditional Branching#

Conditional Branch

An op only starts to execute once all of its inputs have been resolved. We can use this behavior to model conditional execution of ops.

In this example, the branching_op outputs either the branch_1 result or branch_2 result. Since op execution is skipped for ops that have unresolved inputs, only one of the downstream ops will execute.

import random

from dagster import Out, Output, job, op


@op(out={"branch_1": Out(is_required=False), "branch_2": Out(is_required=False)})
def branching_op():
    num = random.randint(0, 1)
    if num == 0:
        yield Output(1, "branch_1")
    else:
        yield Output(2, "branch_2")


@op
def branch_1_op(_input):
    pass


@op
def branch_2_op(_input):
    pass


@job
def branching():
    branch_1, branch_2 = branching_op()
    branch_1_op(branch_1)
    branch_2_op(branch_2)

Fixed Fan-in#

Fixed Fan-in

If you have a fixed set of op that all return the same output type, you can collect all the outputs into a list and pass them into a single downstream op.

The downstream op executes only if all of the outputs were successfully created by the upstream op.

from typing import List

from dagster import job, op


@op
def return_one() -> int:
    return 1


@op
def sum_fan_in(nums: List[int]) -> int:
    return sum(nums)


@job
def fan_in():
    fan_outs = []
    for i in range(0, 10):
        fan_outs.append(return_one.alias(f"return_one_{i}")())
    sum_fan_in(fan_outs)

In this example, we have 10 op that all output the number 1. The sum_fan_in op takes all of these outputs as a list and sums them.

Dynamic Mapping & Collect#

In most cases, the structure of a graph is pre-determined before execution. Dagster also has support for creating graphs where the final structure is not determined until run-time. This is useful for graph structures where you want to execute a separate instance of an op for each entry in a certain output.

In this example, we have an op files_in_directory that defines a DynamicOut. We map over the dynamic output which will cause the downstream dependencies to be cloned for each DynamicOutput that is yielded. The downstream copies can be identified by the mapping_key supplied to DynamicOutput. Once that's all complete, we collect over the results of process_file and pass that in to summarize_directory.

import os
from typing import List

from dagster import DynamicOut, DynamicOutput, Field, job, op
from dagster.utils import file_relative_path


@op(
    config_schema={
        "path": Field(str, default_value=file_relative_path(__file__, "sample"))
    },
    out=DynamicOut(str),
)
def files_in_directory(context):
    path = context.op_config["path"]
    dirname, _, filenames = next(os.walk(path))
    for file in filenames:
        yield DynamicOutput(
            value=os.path.join(dirname, file),
            # create a mapping key from the file name
            mapping_key=file.replace(".", "_").replace("-", "_"),
        )


@op
def process_file(path: str) -> int:
    # simple example of calculating size
    return os.path.getsize(path)


@op
def summarize_directory(sizes: List[int]) -> int:
    # simple example of totalling sizes
    return sum(sizes)


@job
def process_directory():
    file_results = files_in_directory().map(process_file)
    summarize_directory(file_results.collect())

Order-based Dependencies (Nothing dependencies)#

Dependencies in Dagster are primarily data dependencies. Using data dependencies means each input of an op depends on the output of an upstream op.

If you have an op, say Op A, that does not depend on any outputs of another op, say Op B, there theoretically shouldn't be a reason for Op A to run after Op B. In most cases, these two ops should be parallelizable. However, there are some cases where an explicit ordering is required, but it doesn't make sense to pass data through inputs and outputs to model the dependency.

If you need to model an explicit ordering dependency, you can use the Nothing Dagster type on the input definition of the downstream op. This type specifies that you are passing "nothing" via Dagster between the ops, while still uses inputs and outputs to model the dependency between the two ops.

from dagster import In, Nothing, job, op


@op
def create_table_1():
    get_database_connection().execute(
        "create table_1 as select * from some_source_table"
    )


@op(ins={"start": In(Nothing)})
def create_table_2():
    get_database_connection().execute("create table_2 as select * from table_1")


@job
def nothing_dependency():
    create_table_2(start=create_table_1())

In this example, create_table_2 has an input of type Nothing meaning that it doesn't expect any data to be provided by the upstream op. This lets us connect them in the graph definition so that create_table_2 executes only after create_table_1 successfully executes.

Nothing type inputs do not have a corresponding parameter in the function since there is no data to pass. When connecting the dependencies, it is recommended to use keyword args to prevent mix-ups with other positional inputs.

Note that in most cases, it is usually possible to pass some data dependency. In the example above, even though we probably don't want to pass the table data itself between the ops, we could pass table pointers. For example, create_table_1 could return a table_pointer output of type str with a value of table_1, and this table name can be used in create_table_2 to more accurately model the data dependency.

Dagster also provides more advanced abstractions to handle dependencies and IO. If you find that you are finding it difficult to model data dependencies when using external storages, check out IOManagers.

Using the Same Op Twice#

You can use the same op definition multiple times in the same graph/job.

@job
def multiple_usage():
    add_one(add_one(return_one()))

To differentiate between the two invocations of add_one, Dagster automatically aliases the op names to be add_one and add_one_2.

You can also manually define the alias by using the .alias method on the op invocation.

@job
def alias():
    add_one.alias("second_addition")(add_one(return_one()))

Patterns#

Constructing GraphDefinitions#

You may run into a situation where you need to programmatically construct the dependencies for a graph. In that case, you can directly define the GraphDefinition object.

To construct a GraphDefinition, you need to pass the constructor a graph name, a list of op or graph definitions, and a dictionary defining the dependency structure. The dependency structure declares the dependencies of each op’s inputs on the outputs of other ops in the graph. The top-level keys of the dependency dictionary are the string names of ops or graphs. If you are using op aliases, be sure to use the aliased name. Values of the top-level keys are also dictionary, which maps input names to a DependencyDefinition.

one_plus_one_from_constructor = GraphDefinition(
    name="one_plus_one",
    node_defs=[return_one, add_one],
    dependencies={"add_one": {"number": DependencyDefinition("return_one")}},
).to_job()

Graph DSL#

Sometimes you may want to construct the dependencies of a graph definition from a YAML file or similar. This is useful when migrating to Dagster from other workflow systems.

For example, you can have a YAML like this:

name: some_example
description: blah blah blah
ops:
  - def: add_one
    alias: A
  - def: add_one
    alias: B
    deps:
      num:
        op: A
  - def: add_two
    alias: C
    deps:
      num:
        op: A
  - def: subtract
    deps:
      left:
        op: B
      right:
        op: C

You can programmatically generate a GraphDefinition from this YAML:

import os

from dagster import DependencyDefinition, GraphDefinition, NodeInvocation, op
from dagster.utils.yaml_utils import load_yaml_from_path


@op
def add_one(num: int) -> int:
    return num + 1


@op
def add_two(num: int) -> int:
    return num + 2


@op
def subtract(left: int, right: int) -> int:
    return left + right


def construct_graph_with_yaml(yaml_file, op_defs) -> GraphDefinition:
    yaml_data = load_yaml_from_path(yaml_file)

    deps = {}

    for op_yaml_data in yaml_data["ops"]:
        def_name = op_yaml_data["def"]
        alias = op_yaml_data.get("alias", def_name)
        op_deps_entry = {}
        for input_name, input_data in op_yaml_data.get("deps", {}).items():
            op_deps_entry[input_name] = DependencyDefinition(
                solid=input_data["op"], output=input_data.get("output", "result")
            )
        deps[NodeInvocation(name=def_name, alias=alias)] = op_deps_entry

    return GraphDefinition(
        name=yaml_data["name"],
        description=yaml_data.get("description"),
        node_defs=op_defs,
        dependencies=deps,
    )


def define_dep_dsl_graph() -> GraphDefinition:
    path = os.path.join(os.path.dirname(__file__), "my_graph.yaml")
    return construct_graph_with_yaml(path, [add_one, add_two, subtract])