Advanced: Scheduling Job Runs#

You can find the code for this example on Github

Dagster includes a scheduler that allows you to run jobs at regular intervals, e.g. daily or hourly.

Defining schedules#

As before, we've defined a op and a job.

import csv
from datetime import datetime

import requests

from dagster import get_dagster_logger, job, op, repository, schedule


@op
def hello_cereal(context):
    response = requests.get("https://docs.dagster.io/assets/cereal.csv")
    lines = response.text.split("\n")
    cereals = [row for row in csv.DictReader(lines)]
    date = context.op_config["date"]
    get_dagster_logger().info(f"Today is {date}. Found {len(cereals)} cereals.")


@job
def hello_cereal_job():
    hello_cereal()

Suppose that we need to run our simple cereal job every morning before breakfast, at 6:45 AM. To do this, we'll write a ScheduleDefinition to define our schedule. We can either directly construct a ScheduleDefinition, or use one of the included schedule decorators.

In this example, we'll use the @schedule decorator, which runs a schedule once a day at a specified time.

The decorated function should return the run_config needed to run the schedule at the specified execution time. The function is passed the datetime for which the schedule is running.

@schedule(
    cron_schedule="45 6 * * *",
    job=hello_cereal_job,
    execution_timezone="US/Central",
)
def good_morning_schedule(context):
    date = context.scheduled_execution_time.strftime("%Y-%m-%d")
    return {"ops": {"hello_cereal": {"config": {"date": date}}}}

To complete the picture, we'll need to add the schedule definition to the list of definitions returned from our repository.

@repository
def hello_cereal_repository():
    return [hello_cereal_job, good_morning_schedule]

Starting schedules#

Now, we can load Dagit to view the schedule, start and stop it, and monitor the runs it creates:

dagit -f scheduler.py

The left sidebar of Dagit displays a clock icon next to each job with a schedule.

schedules.png

Clicking on the clock icon will take us to the Schedules view. From here, we can turn on the schedule by pressing the toggle button, at which point Dagit will show us that the schedule is running and will next execute tomorrow at 6:45 AM.

good_morning_schedule.png

Running the schedule#

Scheduled runs are launched from a long-running dagster-daemon process. To start the dagster-daemon process, run the following command in the same folder as your workspace.yaml file:

dagster-daemon run

This process will periodically check for any running schedules and launch their associated runs. If you leave this process running, it will launch a new run for your schedule each day at the expected time.

Schedule filters#

If you need to customize the times at which the schedule should execute, you can pass a function as the should_execute argument to ScheduleDefinition.

For example, we can define a filter that only returns True on weekdays:

def weekday_filter(_context):
    weekno = datetime.today().weekday()
    # Returns true if current day is a weekday
    return weekno < 5

If we combine this should_execute filter with a schedule that runs at 6:45am every day, then we'll have a schedule that runs at 6:45am only on weekdays.

@schedule(
    cron_schedule="45 6 * * *",
    job=hello_cereal_job,
    execution_timezone="US/Central",
    should_execute=weekday_filter,
)
def good_weekday_morning_schedule(context):
    date = context.scheduled_execution_time.strftime("%Y-%m-%d")
    return {"ops": {"hello_cereal": {"inputs": {"date": {"value": date}}}}}