Run Configuration#

Run configuration allows providing parameters to jobs at the time they're executed.

Relevant APIs#

NameDescription
ConfigSchemaSee details with code examples in the API documentation.

Overview#

It's often useful to configure jobs at run time. For example, you might want someone to manually operate a deployed job and choose what dataset it operates on when they run it. In general, you should use Dagster's config system when you want the person or software that is executing a job to be able to make choices about what the job does, without needing to modify the job definition.

The objects that compose a job - ops and resources - are each individually configurable. When executing a job, you can supply "run configuration" that specifies the configuration for each of the objects in the job. When you execute a job with Dagster's Python API, you supply run configuration as a Python dictionary. When you execute a job from Dagit or the CLI, you can provide config in a YAML document.

A common use of configuration is for a schedule or sensor to provide configuration to the job run it is launching. For example, a daily schedule might provide the day it's running on to one of the ops as a config value, and that op might use that config value to decide what day's data to read.

Dagster includes a system for gradually-typed configuration schemas. These make it easy to catch configuration errors before job execution, as well as to learn what configuration is required to execute a job.

Using Configuration Inside an Op#

This example shows how to write an op whose behavior is based on values that are passed in via configuration:

from dagster import job, op


@op
def uses_config(context):
    for _ in range(context.op_config["iterations"]):
        context.log.info("hello")


@job
def config_example():
    uses_config()

Providing Run Configuration#

How you specify config values depends on how you're running your job:

Python API#

When executing a job with JobDefinition.execute_in_process, you can specify the config values through run_config argument:

result = config_example.execute_in_process(
        run_config={"ops": {"uses_config": {"config": {"iterations": 1}}}}
    )

You can also build config into jobs, as described in this section of the Jobs concept page.

Dagster CLI#

When executing a job from the command line, the easiest way to provide config is to put it into a YAML file, like:

ops:
  uses_config:
    config:
      iterations: 1

When you invoke dagster job execute, you can point to that YAML file using the --config option:

dagster job execute --config my_config.yaml

Dagit#

When executing a job from Dagit's Launchpad, you can supply config as YAML using the config editor:

Config in Dagit

Config Schema#

Dagster includes a system for gradually-typed configuration schemas. For example, you can specify that a particular op accepts configuration for a particular set of keys, and that values provided for a particular key must be integers. Before executing a job, Dagster will compare the provided run configuration to the config schema for the objects in the job and fail early if they don't match.

Configuration schema helps:

  • Catch configuration errors before job execution.
  • Make deployed jobs self documenting, so that it's easy to learn what configuration is required to launch them.

The full range of config types and ways to specify config schema are documented in the API Reference with examples.

The most common objects to specify ConfigSchema for are OpDefinition and ResourceDefinition (see example code in Configuring a Resource).

Here's an example of an op that defines a config schema:

from dagster import job, op


@op(config_schema={"iterations": int})
def configurable_with_schema(context):
    for _ in range(context.op_config["iterations"]):
        context.log.info(context.op_config["word"])


@job
def nests_configurable():
    configurable_with_schema()

Dagster validates the run_config against the config_schema. If the values violate the schema, it will fail at execution time. For example, the following will raise a DagsterInvalidConfigError:

result = nests_configurable.execute_in_process(
        run_config={
            "ops": {
                "configurable_with_schema": {"config": {"nonexistent_config_value": 1}}
            }
        }
    )

The config editor in Dagit the page comes with typeaheads, schema validation, and schema documentation. You can also click the "Scaffold Missing Config" button to generate dummy values based on the config schema.

Examples#

Configuring a Resource#

You can also configure a ResourceDefinition:

@resource(config_schema={"region": str, "use_unsigned_session": bool})
def s3_session(_init_context):
    """Connect to S3"""

And specify the configurations at runtime via a run config like:

resources:
  key:
    config:
      region: us-east-1
      use_unsigned_session: False

Passing Configuration to Multiple Ops in a Job#

If you want multiple ops to share values, You can use make_values_resource to pass the values via a resource and reference that resource from any op that needs it.

It defaults to Any type, meaning Dagster will accept any config value provided for the resource:

from dagster import job, make_values_resource, op


@op(required_resource_keys={"value"})
def needs_value(context):
    context.log.info(f"value: {context.resources.value}")


@op(required_resource_keys={"value"})
def also_needs_value(context):
    context.log.info(f"value: {context.resources.value}")


@job(resource_defs={"value": make_values_resource()})
def basic_job():
    needs_value()
    also_needs_value()


basic_result = basic_job.execute_in_process(
    run_config={"resources": {"value": {"config": "some_value"}}}
)

You can also specify the schema of the values like:

from dagster import job, make_values_resource, op


@op(required_resource_keys={"values"})
def needs_value(context):
    context.log.info(f"my str: {context.resources.values['my_str']}")


@op(required_resource_keys={"values"})
def needs_different_value(context):
    context.log.info(f"my int: {context.resources.values['my_int']}")


@job(resource_defs={"values": make_values_resource(my_str=str, my_int=int)})
def different_values_job():
    needs_value()
    needs_different_value()


result = different_values_job.execute_in_process(
    run_config={"resources": {"values": {"config": {"my_str": "foo", "my_int": 1}}}}
)

And pass the values via a run config like so:

resources:
  values:
    config:
      my_str: foo
      my_int: 1