Partitioned Jobs#

Relevant APIs#

NameDescription
PartitionedConfigDetermines a set of partitions and how to generate run config for a partition.
@daily_partitioned_configDecorator for constructing partitioned config where each partition is a date.
@hourly_partitioned_configDecorator for constructing partitioned config where each partition is an hour of a date.
@weekly_partitioned_configDecorator for constructing partitioned config where each partition is a week.
@monthly_partitioned_configDecorator for constructing partitioned config where each partition is a month.
@static_partitioned_configDecorator for constructing partitioned config for a static set of partition keys.
@dynamic_partitioned_configDecorator for constructing partitioned config for a set of partition keys that can grow over time.
build_schedule_from_partitioned_jobA function that constructs a schedule whose interval matches the partitioning of a partitioned job.

Overview#

A partitioned job is a job where each run corresponds to a "partition". The choice of partition determines the run's config. Most commonly, each partition is a time window, so, when a job executes, it processes data within one of the time windows.

Having defined a partitioned job, you can:

  • View runs by partition in Dagit.
  • Define a schedule that fills in a partition each time it runs. For example, a job might run each day and process the data that arrived during the previous day.
  • Launch backfills, which are sets of runs that each process a different partition. For example, after making a code change, you might want to re-run your job on every date that it has run on in the past.

Defining Partitioned Jobs#

You define a partitioned job by constructing a PartitionedConfig object and supplying it when you construct your job.

Defining a Job with Time Partitions#

The most common kind of partitioned job is a time-partitioned job - each partition is a time window, and each run for a partition processes data within that time window.

Non-Partitioned Job with Date Config#

Before we define a partitioned job, let's look at a non-partitioned job that computes some data for a given date:

from dagster import job, op


@op(config_schema={"date": str})
def process_data_for_date(context):
    date = context.op_config["date"]
    context.log.info(f"processing data for {date}")


@job
def do_stuff():
    process_data_for_date()

It takes, as config, a string date. This piece of config defines which date to compute data for. For example, if you wanted to compute for May 5th, 2020, you would execute the graph with the following config:

graph:
  process_data_for_date:
    config:
      date: "2020-05-05"

Date-Partitioned Job#

With the job above, it's possible to supply any value for the date param, which means that, if you wanted to launch a backfill, Dagster wouldn't know what values to run it on. You can instead build a partitioned job that operates on a defined set of dates.

First, you define the PartitionedConfig. In this case, because each partition is a date, you can use the @daily_partitioned_config decorator. It defines the full set of partitions - every date between the start date and the current date, as well as how to determine the run config for a given partition.

from dagster import daily_partitioned_config
from datetime import datetime


@daily_partitioned_config(start_date=datetime(2020, 1, 1))
def my_partitioned_config(start: datetime, _end: datetime):
    return {
        "ops": {
            "process_data_for_date": {"config": {"date": start.strftime("%Y-%m-%d")}}
        }
    }

Then you can build a job that uses the PartitionedConfig by supplying it to the config argument when you construct the job:

@job(config=my_partitioned_config)
def do_stuff_partitioned():
    process_data_for_date()

In addition to the @daily_partitioned_config decorator, Dagster also provides @monthly_partitioned_config, @weekly_partitioned_config, @hourly_partitioned_config.

Defining a Job with Static Partitions#

Not all jobs are partitioned by time. Here's a partitioned job where the partitions are continents:

from dagster import job, op, static_partitioned_config

CONTINENTS = [
    "Africa",
    "Antarctica",
    "Asia",
    "Europe",
    "North America",
    "Oceania",
    "South America",
]


@static_partitioned_config(partition_keys=CONTINENTS)
def continent_config(partition_key: str):
    return {"ops": {"continent_op": {"config": {"continent_name": partition_key}}}}


@op(config_schema={"continent_name": str})
def continent_op(context):
    context.log.info(context.op_config["continent_name"])


@job(config=continent_config)
def continent_job():
    continent_op()

Creating Schedules from Partitioned Jobs#

It's common that, when you have a partitioned job, you want to run it on a schedule. For example, if your job has a partition for each date, you likely want to run that job every day, on the partition for that day.

The build_schedule_from_partitioned_job function allows you to construct a schedule from a date partitioned job. It creates a schedule with an interval that matches the spacing of your partition. If you wanted to create a schedule for do_stuff_partitioned job defined above, you could write:

from dagster import build_schedule_from_partitioned_job, job


@job(config=my_partitioned_config)
def do_stuff_partitioned():
    ...


do_stuff_partitioned_schedule = build_schedule_from_partitioned_job(
    do_stuff_partitioned,
)

Schedules can also be made from static partitioned jobs. If you wanted to make a schedule for the continent_job above that runs each partition, you could write:

from dagster import schedule


@schedule(cron_schedule="0 0 * * *", job=continent_job)
def continent_schedule():
    for c in CONTINENTS:
        request = continent_job.run_request_for_partition(partition_key=c, run_key=c)
        yield request

Or a schedule that will run a subselection of the partition

@schedule(cron_schedule="0 0 * * *", job=continent_job)
def antarctica_schedule():
    request = continent_job.run_request_for_partition(
        partition_key="Antarctica", run_key=None
    )
    yield request

The Schedules concept page describes how construct both kinds of schedules in more detail.

Testing#

Testing Partitioned Config#

Invoking a PartitionedConfig object will directly invoke the decorated function.

If you want to check whether the generated run config is valid for the config of job, you can use the validate_run_config function.

from dagster import validate_run_config, daily_partitioned_config
from datetime import datetime


@daily_partitioned_config(start_date=datetime(2020, 1, 1))

Testing Partitioned Jobs#

To run a partitioned job in-process on a particular partition, you can supply a value for the partition_key argument of JobDefinition.execute_in_process

def test_do_stuff_partitioned():
    assert do_stuff_partitioned.execute_in_process(partition_key="2020-01-01").success

Partitions in Dagit#

The Partitions Tab#

In Dagit, you can view runs by partition in the Partitions tab of a Job page.

In the "Run Matrix", each column corresponds to one of the partitions in the job. Each row corresponds to one of the steps in the job.

Partitions Tab

You can click on an individual box to navigate to logs and run information for the step.

Launching Partitioned Runs from the Launchpad#

You can view and use partitions in the Dagit Launchpad tab for a job. In the top bar, you can select from the list of all available partitions. Within the config editor, the config for the selected partition will be populated.

In the screenshot below, we select the 2020-05-01 partition, and we can see that the run config for the partition has been populated in the editor.

Partitions in Dagit Launchpad