Source code for dagster.core.definitions.time_window_partitions

from datetime import datetime
from typing import Any, Callable, Dict, List, NamedTuple, Optional, Union

import pendulum

from dagster import check
from dagster.utils.partitions import DEFAULT_HOURLY_FORMAT_WITHOUT_TIMEZONE
from dagster.utils.schedules import schedule_execution_time_iterator

from .partition import (
    DEFAULT_DATE_FORMAT,
    Partition,
    PartitionedConfig,
    PartitionsDefinition,
    ScheduleType,
    get_cron_schedule,
)


class TimeWindow(NamedTuple):
    """An interval that is closed at the start and open at the end"""

    start: datetime
    end: datetime


class TimeWindowPartitionsDefinition(
    PartitionsDefinition[TimeWindow],  # pylint: disable=unsubscriptable-object
    NamedTuple(
        "_TimeWindowPartitions",
        [
            ("schedule_type", ScheduleType),
            ("start", datetime),
            ("timezone", str),
            ("fmt", str),
            ("end_offset", int),
        ],
    ),
):
    def __new__(
        cls,
        schedule_type: ScheduleType,
        start: Union[datetime, str],
        timezone: Optional[str],
        fmt: str,
        end_offset: int,
    ):
        if isinstance(start, str):
            start_dt = datetime.strptime(start, fmt)
        else:
            start_dt = start

        return super(TimeWindowPartitionsDefinition, cls).__new__(
            cls, schedule_type, start_dt, timezone or "UTC", fmt, end_offset
        )

    def get_partitions(
        self, current_time: Optional[datetime] = None
    ) -> List[Partition[TimeWindow]]:
        current_timestamp = (
            pendulum.instance(current_time, tz=self.timezone)
            if current_time
            else pendulum.now(self.timezone)
        ).timestamp()

        start_timestamp = pendulum.instance(self.start, tz=self.timezone).timestamp()
        iterator = schedule_execution_time_iterator(
            start_timestamp=start_timestamp,
            cron_schedule=get_cron_schedule(schedule_type=self.schedule_type),
            execution_timezone=self.timezone,
        )

        partitions: List[Partition[TimeWindow]] = []
        prev_time = next(iterator)
        while prev_time.timestamp() < start_timestamp:
            prev_time = next(iterator)

        end_offset = self.end_offset
        partitions_past_current_time = 0
        while True:
            next_time = next(iterator)
            if (
                next_time.timestamp() <= current_timestamp
                or partitions_past_current_time < end_offset
            ):
                partitions.append(
                    Partition(
                        value=TimeWindow(prev_time, next_time),
                        name=prev_time.strftime(self.fmt),
                    )
                )

                if next_time.timestamp() > current_timestamp:
                    partitions_past_current_time += 1
            else:
                break

            prev_time = next_time

        if end_offset < 0:
            partitions = partitions[:end_offset]

        return partitions

    def __str__(self) -> str:
        partition_def_str = f"{self.schedule_type.value.capitalize()}, starting {self.start.strftime(self.fmt)} {self.timezone}."
        if self.end_offset != 0:
            partition_def_str += f" End offsetted by {self.end_offset} partition{'' if self.end_offset == 1 else 's'}."
        return partition_def_str

    def time_window_for_partition_key(self, partition_key: str) -> TimeWindow:
        start = self.start_time_for_partition_key(partition_key)
        iterator = schedule_execution_time_iterator(
            start_timestamp=start.timestamp(),
            cron_schedule=get_cron_schedule(schedule_type=self.schedule_type),
            execution_timezone=self.timezone,
        )
        next(iterator)
        return TimeWindow(start, next(iterator))

    def start_time_for_partition_key(self, partition_key: str) -> datetime:
        return pendulum.instance(datetime.strptime(partition_key, self.fmt), tz=self.timezone)

    def get_default_partition_mapping(self):
        from dagster.core.asset_defs.time_window_partition_mapping import TimeWindowPartitionMapping

        return TimeWindowPartitionMapping()


class DailyPartitionsDefinition(TimeWindowPartitionsDefinition):
    def __new__(
        cls,
        start_date: Union[datetime, str],
        timezone: Optional[str] = None,
        fmt: Optional[str] = None,
        end_offset: int = 0,
    ):
        """A set of daily partitions.

        The first partition in the set will start at the start_date. The last partition in the set will
        end before the current time, unless the end_offset argument is set to a positive number.

        Args:
            start_date (Union[datetime.datetime, str]): The first date in the set of partitions. Can
                provide in either a datetime or string format.
            timezone (Optional[str]): The timezone in which each date should exist.
                Supported strings for timezones are the ones provided by the
                `IANA time zone database <https://www.iana.org/time-zones>` - e.g. "America/Los_Angeles".
            fmt (Optional[str]): The date format to use. Defaults to `%Y-%m-%d`.
            end_offset (int): Extends the partition set by a number of partitions equal to the value
                passed. If end_offset is 0 (the default), the last partition ends before the current
                time. If end_offset is 1, the second-to-last partition ends before the current time,
                and so on.
        """
        _fmt = fmt or DEFAULT_DATE_FORMAT

        return super(DailyPartitionsDefinition, cls).__new__(
            cls,
            schedule_type=ScheduleType.DAILY,
            start=start_date,
            timezone=timezone,
            fmt=_fmt,
            end_offset=end_offset,
        )


[docs]def daily_partitioned_config( start_date: Union[datetime, str], timezone: Optional[str] = None, fmt: Optional[str] = None, end_offset: int = 0, ) -> Callable[[Callable[[datetime, datetime], Dict[str, Any]]], PartitionedConfig]: """Defines run config over a set of daily partitions. The decorated function should accept a start datetime and end datetime, which represent the bounds of the date partition the config should delineate. The decorated function should return a run config dictionary. The resulting object created by this decorator can be provided to the config argument of a Job. The first partition in the set will start at the start_date. The last partition in the set will end before the current time, unless the end_offset argument is set to a positive number. Args: start_date (Union[datetime.datetime, str]): The first date in the set of partitions. Can provide in either a datetime or string format. timezone (Optional[str]): The timezone in which each date should exist. Supported strings for timezones are the ones provided by the `IANA time zone database <https://www.iana.org/time-zones>` - e.g. "America/Los_Angeles". fmt (Optional[str]): The date format to use. Defaults to `%Y-%m-%d`. end_offset (int): Extends the partition set by a number of partitions equal to the value passed. If end_offset is 0 (the default), the last partition ends before the current time. If end_offset is 1, the second-to-last partition ends before the current time, and so on. """ def inner(fn: Callable[[datetime, datetime], Dict[str, Any]]) -> PartitionedConfig: check.callable_param(fn, "fn") return PartitionedConfig( run_config_for_partition_fn=lambda partition: fn( partition.value[0], partition.value[1] ), partitions_def=DailyPartitionsDefinition( start_date=start_date, timezone=timezone, fmt=fmt, end_offset=end_offset ), decorated_fn=fn, ) return inner
class HourlyPartitionsDefinition(TimeWindowPartitionsDefinition): def __new__( cls, start_date: Union[datetime, str], timezone: Optional[str] = None, fmt: Optional[str] = None, end_offset: int = 0, ): """A set of hourly partitions. The first partition in the set will start at the start_date. The last partition in the set will end before the current time, unless the end_offset argument is set to a positive number. Args: start_date (Union[datetime.datetime, str]): The first date in the set of partitions. Can provide in either a datetime or string format. fmt (Optional[str]): The date format to use. Defaults to `%Y-%m-%d`. timezone (Optional[str]): The timezone in which each date should exist. Supported strings for timezones are the ones provided by the `IANA time zone database <https://www.iana.org/time-zones>` - e.g. "America/Los_Angeles". end_offset (int): Extends the partition set by a number of partitions equal to the value passed. If end_offset is 0 (the default), the last partition ends before the current time. If end_offset is 1, the second-to-last partition ends before the current time, and so on. """ _fmt = fmt or DEFAULT_HOURLY_FORMAT_WITHOUT_TIMEZONE return super(HourlyPartitionsDefinition, cls).__new__( cls, schedule_type=ScheduleType.HOURLY, start=start_date, timezone=timezone, fmt=_fmt, end_offset=end_offset, )
[docs]def hourly_partitioned_config( start_date: Union[datetime, str], timezone: Optional[str] = None, fmt: Optional[str] = None, end_offset: int = 0, ) -> Callable[[Callable[[datetime, datetime], Dict[str, Any]]], PartitionedConfig]: """Defines run config over a set of hourly partitions. The decorated function should accept a start datetime and end datetime, which represent the date partition the config should delineate. The decorated function should return a run config dictionary. The resulting object created by this decorator can be provided to the config argument of a Job. The first partition in the set will start at the start_date. The last partition in the set will end before the current time, unless the end_offset argument is set to a positive number. Args: start_date (Union[datetime.datetime, str]): The first date in the set of partitions. Can provide in either a datetime or string format. fmt (Optional[str]): The date format to use. Defaults to `%Y-%m-%d`. timezone (Optional[str]): The timezone in which each date should exist. Supported strings for timezones are the ones provided by the `IANA time zone database <https://www.iana.org/time-zones>` - e.g. "America/Los_Angeles". end_offset (int): Extends the partition set by a number of partitions equal to the value passed. If end_offset is 0 (the default), the last partition ends before the current time. If end_offset is 1, the second-to-last partition ends before the current time, and so on. """ def inner(fn: Callable[[datetime, datetime], Dict[str, Any]]) -> PartitionedConfig: check.callable_param(fn, "fn") return PartitionedConfig( run_config_for_partition_fn=lambda partition: fn( partition.value[0], partition.value[1] ), partitions_def=HourlyPartitionsDefinition( start_date=start_date, timezone=timezone, fmt=fmt, end_offset=end_offset ), decorated_fn=fn, ) return inner
class MonthlyPartitionsDefinition(TimeWindowPartitionsDefinition): def __new__( cls, start_date: Union[datetime, str], timezone: Optional[str] = None, fmt: Optional[str] = None, end_offset: int = 0, ): """A set of monthly partitions. The first partition in the set will start at the start_date. The last partition in the set will end before the current time, unless the end_offset argument is set to a positive number. Args: start_date (Union[datetime.datetime, str]): The first date in the set of partitions. Can provide in either a datetime or string format. timezone (Optional[str]): The timezone in which each date should exist. Supported strings for timezones are the ones provided by the `IANA time zone database <https://www.iana.org/time-zones>` - e.g. "America/Los_Angeles". fmt (Optional[str]): The date format to use. Defaults to `%Y-%m-%d`. end_offset (int): Extends the partition set by a number of partitions equal to the value passed. If end_offset is 0 (the default), the last partition ends before the current time. If end_offset is 1, the second-to-last partition ends before the current time, and so on. """ _fmt = fmt or DEFAULT_DATE_FORMAT return super(MonthlyPartitionsDefinition, cls).__new__( cls, schedule_type=ScheduleType.MONTHLY, start=start_date, timezone=timezone, fmt=_fmt, end_offset=end_offset, )
[docs]def monthly_partitioned_config( start_date: Union[datetime, str], timezone: Optional[str] = None, fmt: Optional[str] = None, end_offset: int = 0, ) -> Callable[[Callable[[datetime, datetime], Dict[str, Any]]], PartitionedConfig]: """Defines run config over a set of monthly partitions. The decorated function should accept a start datetime and end datetime, which represent the date partition the config should delineate. The decorated function should return a run config dictionary. The resulting object created by this decorator can be provided to the config argument of a Job. The first partition in the set will start at the start_date. The last partition in the set will end before the current time, unless the end_offset argument is set to a positive number. Args: start_date (Union[datetime.datetime, str]): The first date in the set of partitions. Can provide in either a datetime or string format. timezone (Optional[str]): The timezone in which each date should exist. Supported strings for timezones are the ones provided by the `IANA time zone database <https://www.iana.org/time-zones>` - e.g. "America/Los_Angeles". fmt (Optional[str]): The date format to use. Defaults to `%Y-%m-%d`. end_offset (int): Extends the partition set by a number of partitions equal to the value passed. If end_offset is 0 (the default), the last partition ends before the current time. If end_offset is 1, the second-to-last partition ends before the current time, and so on. """ def inner(fn: Callable[[datetime, datetime], Dict[str, Any]]) -> PartitionedConfig: check.callable_param(fn, "fn") return PartitionedConfig( run_config_for_partition_fn=lambda partition: fn( partition.value[0], partition.value[1] ), partitions_def=MonthlyPartitionsDefinition( start_date=start_date, timezone=timezone, fmt=fmt, end_offset=end_offset, ), decorated_fn=fn, ) return inner
class WeeklyPartitionsDefinition(TimeWindowPartitionsDefinition): def __new__( cls, start_date: Union[datetime, str], timezone: Optional[str] = None, fmt: Optional[str] = None, end_offset: int = 0, ): """Defines a set of weekly partitions. The first partition in the set will start at the start_date. The last partition in the set will end before the current time, unless the end_offset argument is set to a positive number. Args: start_date (Union[datetime.datetime, str]): The first date in the set of partitions. Can provide in either a datetime or string format. timezone (Optional[str]): The timezone in which each date should exist. Supported strings for timezones are the ones provided by the `IANA time zone database <https://www.iana.org/time-zones>` - e.g. "America/Los_Angeles". fmt (Optional[str]): The date format to use. Defaults to `%Y-%m-%d`. end_offset (int): Extends the partition set by a number of partitions equal to the value passed. If end_offset is 0 (the default), the last partition ends before the current time. If end_offset is 1, the second-to-last partition ends before the current time, and so on. """ _fmt = fmt or DEFAULT_DATE_FORMAT return super(WeeklyPartitionsDefinition, cls).__new__( cls, schedule_type=ScheduleType.WEEKLY, start=start_date, timezone=timezone, fmt=_fmt, end_offset=end_offset, )
[docs]def weekly_partitioned_config( start_date: Union[datetime, str], timezone: Optional[str] = None, fmt: Optional[str] = None, end_offset: int = 0, ) -> Callable[[Callable[[datetime, datetime], Dict[str, Any]]], PartitionedConfig]: """Defines run config over a set of weekly partitions. The decorated function should accept a start datetime and end datetime, which represent the date partition the config should delineate. The decorated function should return a run config dictionary. The resulting object created by this decorator can be provided to the config argument of a Job. The first partition in the set will start at the start_date. The last partition in the set will end before the current time, unless the end_offset argument is set to a positive number. Args: start_date (Union[datetime.datetime, str]): The first date in the set of partitions. Can provide in either a datetime or string format. timezone (Optional[str]): The timezone in which each date should exist. Supported strings for timezones are the ones provided by the `IANA time zone database <https://www.iana.org/time-zones>` - e.g. "America/Los_Angeles". fmt (Optional[str]): The date format to use. Defaults to `%Y-%m-%d`. end_offset (int): Extends the partition set by a number of partitions equal to the value passed. If end_offset is 0 (the default), the last partition ends before the current time. If end_offset is 1, the second-to-last partition ends before the current time, and so on. """ def inner(fn: Callable[[datetime, datetime], Dict[str, Any]]) -> PartitionedConfig: check.callable_param(fn, "fn") return PartitionedConfig( run_config_for_partition_fn=lambda partition: fn( partition.value[0], partition.value[1] ), partitions_def=WeeklyPartitionsDefinition( start_date=start_date, timezone=timezone, fmt=fmt, end_offset=end_offset, ), decorated_fn=fn, ) return inner