Source code for dagster.core.definitions.partitioned_schedule

from datetime import time
from typing import Optional, Union, cast

from dagster import check

from .job_definition import JobDefinition
from .partition import (
    Partition,
    PartitionSetDefinition,
    PartitionedConfig,
    ScheduleTimeBasedPartitionsDefinition,
    ScheduleType,
)
from .run_request import SkipReason
from .schedule_definition import (
    DefaultScheduleStatus,
    ScheduleDefinition,
    ScheduleEvaluationContext,
)
from .time_window_partitions import TimeWindow, TimeWindowPartitionsDefinition


[docs]def build_schedule_from_partitioned_job( job: JobDefinition, description: Optional[str] = None, name: Optional[str] = None, minute_of_hour: Optional[int] = None, hour_of_day: Optional[int] = None, day_of_week: Optional[int] = None, day_of_month: Optional[int] = None, default_status: DefaultScheduleStatus = DefaultScheduleStatus.STOPPED, ) -> ScheduleDefinition: """ Creates a schedule from a time window-partitioned job. The schedule executes at the cadence specified by the partitioning of the given job. """ check.invariant(len(job.mode_definitions) == 1, "job must only have one mode") check.invariant( job.mode_definitions[0].partitioned_config is not None, "job must be a partitioned job" ) check.invariant( not (day_of_week and day_of_month), "Cannot provide both day_of_month and day_of_week parameter to build_schedule_from_partitioned_job.", ) partitioned_config = cast(PartitionedConfig, job.mode_definitions[0].partitioned_config) partition_set = cast(PartitionSetDefinition, job.get_partition_set_def()) check.inst(partitioned_config.partitions_def, TimeWindowPartitionsDefinition) partitions_def = cast(TimeWindowPartitionsDefinition, partitioned_config.partitions_def) minute_of_hour = cast(int, check.opt_int_param(minute_of_hour, "minute_of_hour", default=0)) if partitions_def.schedule_type == ScheduleType.HOURLY: check.invariant(hour_of_day is None, "Cannot set hour parameter with hourly partitions.") hour_of_day = cast(int, check.opt_int_param(hour_of_day, "hour_of_day", default=0)) execution_time = time(minute=minute_of_hour, hour=hour_of_day) if partitions_def.schedule_type == ScheduleType.DAILY: check.invariant( day_of_week is None, "Cannot set day of week parameter with daily partitions." ) check.invariant( day_of_month is None, "Cannot set day of month parameter with daily partitions." ) if partitions_def.schedule_type == ScheduleType.MONTHLY: execution_day = check.opt_int_param(day_of_month, "day_of_month", default=1) elif partitions_def.schedule_type == ScheduleType.WEEKLY: execution_day = check.opt_int_param(day_of_week, "day_of_week", default=0) else: execution_day = 0 schedule_partitions = ScheduleTimeBasedPartitionsDefinition( schedule_type=partitions_def.schedule_type, start=partitions_def.start, execution_time=execution_time, execution_day=execution_day, offset=1, ) schedule_def = partition_set.create_schedule_definition( schedule_name=check.opt_str_param(name, "name", f"{job.name}_schedule"), cron_schedule=schedule_partitions.get_cron_schedule(), partition_selector=latest_window_partition_selector, execution_timezone=partitions_def.timezone, description=description, job=job, default_status=default_status, ) return schedule_def
schedule_from_partitions = build_schedule_from_partitioned_job def latest_window_partition_selector( context: ScheduleEvaluationContext, partition_set_def: PartitionSetDefinition[TimeWindow] ) -> Union[SkipReason, Partition[TimeWindow]]: """Creates a selector for partitions that are time windows. Selects the latest partition that exists as of the schedule tick time. """ partitions = partition_set_def.get_partitions(context.scheduled_execution_time) if len(partitions) == 0: return SkipReason() else: return partitions[-1]