Solids, Pipelines, Modes, Presets, Partition Sets, and Composite Solids were a set of Dagster core abstractions that preceded Dagster's current core abstractions: Ops, Jobs, and Graphs. This guide describes how to migrate code that uses the old abstractions to code that uses the present abstractions.
You can also check out the more detailed API reference docs for Jobs, Ops, and Graphs.
Migrating a pipeline to jobs does not require migrating all your other pipelines to jobs. Graphs, jobs, and pipelines can co-exist peacefully in Python.
In Dagit, we display both jobs and pipelines, where pipelines are marked with the Legacy
tag in the left nav.
For simple pipelines, the new APIs just have different names. Here's a pipeline containing a single solid:
from dagster import pipeline, solid
@solid
def do_something():
...
@pipeline
def do_it_all():
do_something()
And here's an equivalent job
containing a single op
:
from dagster import job, op
@op
def do_something():
return "foo"
@job
def do_it_all():
do_something()
Note that jobs can be built out of solids as well as ops. So you don't need to convert all your solids to ops in order to use @job
.
In both cases, you can load the pipeline / job in Dagit with:
dagit -f <path/to/file>
If you want to execute the job in process, instead of using execute_pipeline
, you can invoke its JobDefinition.execute_in_process
method.
result = do_it_all.execute_in_process()
A call to GraphDefinition.execute_in_process
creates an InProcessResult
object. Outputs can be retrieved using the InProcessResult.output_for_node
method, and events can be retrieved using the InProcessResult.events_for_node
method.
output = result.output_for_node("do_something")
assert output == "foo"
events = result.events_for_node("do_something")
With pipelines, supplying resources involves defining a mode:
from dagster import ModeDefinition, pipeline, resource, solid
@resource
def external_service():
...
@solid(required_resource_keys={"external_service"})
def do_something():
...
@pipeline(
mode_defs=[ModeDefinition(resource_defs={"external_service": external_service})]
)
def do_it_all():
do_something()
With the new APIs, resources are supplied directly to a job:
from dagster import job, op, resource
@resource
def external_service():
...
@op(required_resource_keys={"external_service"})
def do_something():
...
@job(resource_defs={"external_service": external_service})
def do_it_all():
do_something()
With pipelines, if you wanted to execute a pipeline with resources for a test, you need to include a mode for those resources on the pipeline definition:
from unittest.mock import MagicMock
from dagster import (
ModeDefinition,
ResourceDefinition,
execute_pipeline,
pipeline,
resource,
solid,
)
@resource
def external_service():
...
@solid(required_resource_keys={"external_service"})
def do_something():
...
@pipeline(
mode_defs=[
ModeDefinition(resource_defs={"external_service": external_service}),
ModeDefinition(
"test",
resource_defs={
"external_service": ResourceDefinition.hardcoded_resource(MagicMock())
},
),
]
)
def do_it_all():
do_something()
def test_do_it_all():
execute_pipeline(do_it_all, mode="test")
This made it difficult to construct mock resources that were specific to a particular test. It also used string indirection instead of object pointers for referring to the mode (mode="test"
), which is error-prone. With the new APIs, you can override the resources to a job at the time you execute:
from unittest.mock import MagicMock
from dagster import job, op, resource
@resource
def external_service():
...
@op(required_resource_keys={"external_service"})
def do_something():
...
@job(resource_defs={"external_service": external_service})
def do_it_all():
do_something()
def test_do_it_all():
result = do_it_all.graph.execute_in_process(
resources={"external_service": MagicMock()}
)
assert result.success
It's common for pipelines to have modes corresponding to different environments - e.g. production and development. Here's a pair of resources: one for production and one for development.
from dagster import resource
@resource
def prod_external_service():
...
@resource
def dev_external_service():
...
And here's a pipeline that includes these resources inside a pair of modes, along with a repository for loading it from Dagit:
@pipeline(
mode_defs=[
ModeDefinition(
"prod", resource_defs={"external_service": prod_external_service}
),
ModeDefinition("dev", resource_defs={"external_service": dev_external_service}),
]
)
def do_it_all():
do_something()
@repository
def repo():
return [do_it_all]
These modes typically correspond to different Dagster Instances. E.g. the pipeline will be executed using the development mode on a local machine and using the production mode from a production instance deployed in the cloud. To point the instances to your repository, you would typically make a workspace yaml that looks something like this:
load_from:
- python_package:
package_name: prod_dev_modes
attribute: repo
With this setup, both modes show up on both instances . This is awkward because, even though the production mode shows up on the development instance, it's not meant to be executed there. And vice versa with the dev mode and the production instance.
With the new APIs, the convention is to include development jobs and production jobs in separate repositories. Then the workspace for the production instance can point to the repository with the production jobs, and the workspace for the development instance can point to the development jobs.
So that we don't have to re-write the computation graph that underlies each job, we can use the graph
decorator to define the computation graph independently, and then create two different jobs from it using the GraphDefinition.to_job
method.
@graph
def do_it_all():
do_something()
@repository
def prod_repo():
return [do_it_all.to_job(resource_defs={"external_service": prod_external_service})]
@repository
def dev_repo():
return [do_it_all.to_job(resource_defs={"external_service": dev_external_service})]
workspace.yaml for the prod instance:
load_from:
- python_package:
package_name: prod_dev_jobs
attribute: prod_repo
workspace.yaml for the dev instance:
load_from:
- python_package:
package_name: prod_dev_jobs
attribute: dev_repo
With the pipeline APIs, if you want to specify config definition time (as opposed to at the time you're running the pipeline), you use a PresetDefinition
.
from dagster import PresetDefinition, pipeline, solid
@solid(config_schema={"param": str})
def do_something(_):
...
@pipeline(
preset_defs=[
PresetDefinition(
"my_preset",
run_config={"solids": {"do_something": {"config": {"param": "some_val"}}}},
)
]
)
def do_it_all():
do_something()
With the new APIs, you can accomplish this by supplying config to the job.
from dagster import job, op
@op(config_schema={"param": str})
def do_something(_):
...
@job(
config={
"ops": {
"do_something": {
"config": {
"param": "some_val",
}
}
}
}
)
def do_it_all():
do_something()
Unlike with presets, this config will be used, by default, any time the job is launched, not just when it's launched from the Dagit Launchpad. I.e. it will also be used when launching the job via a schedule, sensor, do_it_all_job.execute_in_process
or the GraphQL API. In all these cases, it can instead be overridden with config specified at runtime.
Alternatively, you can provide "partial" config via a ConfigMapping
.
@config_mapping(config_schema={"arg": str})
def my_config_mapping(conf):
return {"solids": {"do_something": {"config": {"param": conf["arg"]}}}}
@job(config=my_config_mapping)
def do_it_all():
do_something()
Then, when running the job, you only need to supply more compact config:
do_it_all.execute_in_process(run_config={"arg": "some_value"})
With the pipeline APIs, a schedule targets a pipeline by referencing the pipeline name in the schedule definition.
Both the schedule and its targeted pipeline need to be provided to a repository in order to make use of the schedule.
from dagster import ScheduleDefinition, pipeline, repository, solid
@solid
def do_something():
...
@pipeline
def do_it_all():
do_something()
do_it_all_schedule = ScheduleDefinition(
cron_schedule="0 0 * * *", pipeline_name="do_it_all"
)
@repository
def do_it_all_repository():
return [do_it_all, do_it_all_schedule]
With the new APIs, a schedule targets a job or graph by referencing the job/graph object.
from dagster import ScheduleDefinition, job, op
@op
def do_something():
...
@job
def do_it_all():
do_something()
do_it_all_schedule = ScheduleDefinition(cron_schedule="0 0 * * *", job=do_it_all)
This makes it easier to catch errors and makes it possible to reference the job/graph from the schedule.
The same pattern works for sensors.
With the pipeline APIs, if you want to schedule runs with config that depends on the scheduled execution time, you need to do something like this:
from dagster import pipeline, schedule, solid
@solid(config_schema={"date": str})
def do_something(_):
...
@pipeline
def do_it_all():
do_something()
@schedule(
cron_schedule="0 0 * * *",
pipeline_name="do_it_all",
execution_timezone="US/Central",
)
def do_it_all_schedule(context):
date = context.scheduled_execution_time.strftime("%Y-%m-%d")
return {"solids": {"do_something": {"config": {"date": date}}}}
With the new APIs, you can simply reference the graph or job object:
from dagster import job, op, schedule
@op(config_schema={"date": str})
def do_something(_):
...
@job
def do_it_all():
do_something()
@schedule(cron_schedule="0 0 * * *", job=do_it_all, execution_timezone="US/Central")
def do_it_all_schedule(context):
date = context.scheduled_execution_time.strftime("%Y-%m-%d")
return {"solids": {"do_something": {"config": {"date": date}}}}
With the pipeline APIs, if you want to have the same config available for manual runs of a pipeline and used in a schedule, you need to do something like this:
from dagster import PresetDefinition, pipeline, schedule, solid
@solid(config_schema={"param": str})
def do_something(_):
...
do_it_all_preset = PresetDefinition(
"my_preset",
run_config={"solids": {"do_something": {"config": {"param": "some_val"}}}},
)
@pipeline(preset_defs=[do_it_all_preset])
def do_it_all():
do_something()
@schedule(cron_schedule="0 0 * * *", pipeline_name="do_it_all")
def do_it_all_schedule():
return do_it_all_preset.run_config
With the new APIs, you can supply the config in one place (the job), and it will both show up in the Launchpad and be used by the schedule.
from dagster import ScheduleDefinition, job, op
@op(config_schema={"param": str})
def do_something(_):
...
config = {"solids": {"do_something": {"config": {"param": "some_val"}}}}
@job(config=config)
def do_it_all():
do_something()
do_it_all_schedule = ScheduleDefinition(job=do_it_all, cron_schedule="0 0 * * *")
In order to declare a partitioned schedule using the pipeline-centric APIs, you need to use one of the partitioned schedule decorators, such as daily_schedule
or hourly_schedule
. The created schedule targets a specific pipeline via pipeline_name
.
import datetime
from dagster import daily_schedule, pipeline, repository, solid
@solid(config_schema={"date": str})
def do_something_with_config(context):
return context.solid_config["date"]
@pipeline
def do_it_all():
do_something_with_config()
@daily_schedule(pipeline_name="do_it_all", start_date=datetime.datetime(2020, 1, 1))
def do_it_all_schedule(date):
return {
"solids": {
"do_something_with_config": {
"config": {"date": date.strftime("%Y-%m-%d %H")}
}
}
}
@repository
def do_it_all_repo():
return [do_it_all, do_it_all_schedule]
With the job APIs, you can first define a graph, and then a job with partitioning. These partitions exist independently of any schedule, and can be used for backfills. A schedule can then be derived from that partitioning, while providing execution time information. The resulting schedule can be independently passed to a repository, and the underlying job will be inferred.
from dagster import (
build_schedule_from_partitioned_job,
daily_partitioned_config,
job,
op,
repository,
)
@op(config_schema={"date": str})
def do_something_with_config(context):
return context.op_config["date"]
@daily_partitioned_config(start_date="2020-01-01")
def do_it_all_config(start, _end):
return {"solids": {"do_something_with_config": {"config": {"date": str(start)}}}}
@job(config=do_it_all_config)
def do_it_all():
do_something_with_config()
do_it_all_schedule = build_schedule_from_partitioned_job(do_it_all)
@repository
def do_it_all_repo():
return [do_it_all_schedule]
With the solid API, you define inputs and outputs using the InputDefinition
and OutputDefinition
classes.
from dagster import InputDefinition, OutputDefinition, solid
@solid(
input_defs=[InputDefinition("arg1", metadata={"a": "b"})],
output_defs=[OutputDefinition(metadata={"c": "d"})],
)
def do_something(arg1: str) -> int:
return int(arg1)
The op API supports a less verbose way to define inputs and outputs: with the In
and Out
classes.
from dagster import In, Out, op
@op(ins={"arg1": In(metadata={"a": "b"})}, out=Out(metadata={"c": "d"}))
def do_something(arg1: str) -> int:
return int(arg1)
For single outputs, you can just supply an Out
object, and for multiple outputs, you can use a dictionary:
from typing import Tuple
from dagster import In, Out, op
@op(
ins={"arg1": In(metadata={"a": "b"})},
out={"out1": Out(metadata={"c": "d"}), "out2": Out(metadata={"e": "f"})},
)
def do_something(arg1: str) -> Tuple[int, int]:
return int(arg1), int(arg1) + 1
With the pipeline APIs, if you want multiple layers of composition, you define a composite solid.
from dagster import composite_solid, pipeline, solid
@solid
def do_something():
pass
@solid
def do_something_else():
return 5
@composite_solid
def do_two_things():
do_something()
return do_something_else()
@solid
def do_yet_more(arg1):
assert arg1 == 5
@pipeline
def do_it_all():
do_yet_more(do_two_things())
With the graph APIs, you can just use graphs at both layers:
from dagster import graph, op
@op
def do_something():
pass
@op
def do_something_else():
return 5
@graph
def do_two_things():
do_something()
return do_something_else()
@op
def do_yet_more(arg1):
assert arg1 == 5
@graph
def do_it_all():
do_yet_more(do_two_things())
When using GraphDefinition.execute_in_process
on a graph that has internal graphs, you can retrieve results by appending the graph name with a .
:
result = do_it_all.execute_in_process()
nested_output = result.output_for_node("do_two_things.do_something_else")
With the pipeline APIs, if you want to pass inputs into and return outputs from a composite solid, you define the definitions of inputs and outputs it maps.
from dagster import OutputDefinition, composite_solid, pipeline, solid
from dagster.core.definitions.input import InputDefinition
@solid
def do_something():
pass
@solid
def one():
return 1
@solid(
input_defs=[InputDefinition("arg1", int)],
output_defs=[OutputDefinition(int)],
)
def do_something_else(arg1):
return arg1
@composite_solid(
input_defs=[InputDefinition("arg1", int)],
output_defs=[OutputDefinition(int)],
)
def do_two_things(arg1):
do_something()
return do_something_else(arg1)
@solid
def do_yet_more(arg1):
assert arg1 == 1
@pipeline
def do_it_all():
do_yet_more(do_two_things(one()))
With the graph APIs, you can specify ins
and out
properties to do so:
from dagster import In, Out, graph, op
from dagster.core.definitions.output import GraphOut
@op
def do_something():
pass
@op
def one():
return 1
@op(ins={"arg1": In(int)}, out=Out(int))
def do_something_else(arg1):
return arg1
@graph(out=GraphOut())
def do_two_things(arg1):
do_something()
return do_something_else(arg1)
@op
def do_yet_more(arg1):
assert arg1 == 1
@graph
def do_it_all():
do_yet_more(do_two_things(one()))
Note that graphs no longer require information about the inputs they map to, but they still need infomration about the outputs they map to. Besides, inputs and outputs of a graph now respect the types specified on the underlying Op
instead.
With the pipeline APIs, if you want to have multiple outputs from a composite solid, you define the output definitions it maps and return a dictionary, where the keys are the output names and the values are the output values.
from dagster import Output, OutputDefinition, composite_solid, pipeline, solid
@solid
def do_something():
pass
@solid(output_defs=[OutputDefinition(int, "one"), OutputDefinition(int, "two")])
def return_multi():
yield Output(1, "one")
yield Output(2, "two")
@composite_solid(
output_defs=[OutputDefinition(int, "one"), OutputDefinition(int, "two")]
)
def do_two_things():
do_something()
one, two = return_multi()
return {"one": one, "two": two}
@solid
def do_yet_more(arg1, arg2):
assert arg1 == 1
assert arg2 == 2
@pipeline
def do_it_all():
one, two = do_two_things()
do_yet_more(one, two)
With the graph APIs, you can specify out
property:
from dagster import Out, graph, op
from dagster.core.definitions.output import GraphOut
@op
def do_something():
pass
@op(out={"one": Out(int), "two": Out(int)})
def return_multi():
return 1, 2
@graph(out={"one": GraphOut(), "two": GraphOut()})
def do_two_things():
do_something()
one, two = return_multi()
return (one, two)
@op
def do_yet_more(arg1, arg2):
assert arg1 == 1
assert arg2 == 2
@graph
def do_it_all():
one, two = do_two_things()
do_yet_more(one, two)
Each mode on a pipeline could have multiple executors, but each job can only have one executor configured. This can be added via the executor_def
argument to to_job
and @job
. Since there is only one executor per job, there is no need to specify the name of the executor in config anymore.
from dagster import job, multiprocess_executor
@job(
executor_def=multiprocess_executor,
config={"execution": {"config": {"max_concurrent": 5}}},
)
def do_it_all():
...
The default executor for a pipeline was the in-process executor. For a job, the default executor can switch between multi-process and in-process configurations. By default, multi-process is enabled. Multi-process and in-process can be switched via config.
from dagster import job
# This job will run with multiprocessing execution
@job
def do_it_all():
...
# This job will run with in-process execution
@job(config={"execution": {"config": {"in_process": {}}}})
def do_it_all_in_proc():
...