Source code for dagster.core.definitions.decorators.repository

from functools import update_wrapper
from typing import Any, Callable, Optional, Union, overload

from dagster import check
from dagster.core.errors import DagsterInvalidDefinitionError

from ..graph_definition import GraphDefinition
from ..partition import PartitionSetDefinition
from ..pipeline_definition import PipelineDefinition
from ..repository_definition import (
    VALID_REPOSITORY_DATA_DICT_KEYS,
    CachingRepositoryData,
    RepositoryData,
    RepositoryDefinition,
)
from ..schedule_definition import ScheduleDefinition
from ..sensor_definition import SensorDefinition


class _Repository:
    def __init__(self, name: Optional[str] = None, description: Optional[str] = None):
        self.name = check.opt_str_param(name, "name")
        self.description = check.opt_str_param(description, "description")

    def __call__(self, fn: Callable[[], Any]) -> RepositoryDefinition:
        from dagster.core.asset_defs import AssetGroup

        check.callable_param(fn, "fn")

        if not self.name:
            self.name = fn.__name__

        repository_definitions = fn()

        repository_data: Union[CachingRepositoryData, RepositoryData]
        if isinstance(repository_definitions, list):
            bad_definitions = []
            for i, definition in enumerate(repository_definitions):
                if not (
                    isinstance(definition, PipelineDefinition)
                    or isinstance(definition, PartitionSetDefinition)
                    or isinstance(definition, ScheduleDefinition)
                    or isinstance(definition, SensorDefinition)
                    or isinstance(definition, GraphDefinition)
                    or isinstance(definition, AssetGroup)
                ):
                    bad_definitions.append((i, type(definition)))
            if bad_definitions:
                bad_definitions_str = ", ".join(
                    [
                        "value of type {type_} at index {i}".format(type_=type_, i=i)
                        for i, type_ in bad_definitions
                    ]
                )
                raise DagsterInvalidDefinitionError(
                    "Bad return value from repository construction function: all elements of list "
                    "must be of type JobDefinition, GraphDefinition, PipelineDefinition, "
                    "PartitionSetDefinition, ScheduleDefinition, or SensorDefinition. "
                    f"Got {bad_definitions_str}."
                )
            repository_data = CachingRepositoryData.from_list(repository_definitions)

        elif isinstance(repository_definitions, dict):
            if not set(repository_definitions.keys()).issubset(VALID_REPOSITORY_DATA_DICT_KEYS):
                raise DagsterInvalidDefinitionError(
                    "Bad return value from repository construction function: dict must not contain "
                    "keys other than {{'pipelines', 'partition_sets', 'schedules', 'jobs'}}: found "
                    "{bad_keys}".format(
                        bad_keys=", ".join(
                            [
                                "'{key}'".format(key=key)
                                for key in repository_definitions.keys()
                                if key not in VALID_REPOSITORY_DATA_DICT_KEYS
                            ]
                        )
                    )
                )
            repository_data = CachingRepositoryData.from_dict(repository_definitions)
        elif isinstance(repository_definitions, RepositoryData):
            repository_data = repository_definitions
        else:
            raise DagsterInvalidDefinitionError(
                "Bad return value of type {type_} from repository construction function: must "
                "return list, dict, or RepositoryData. See the @repository decorator docstring for "
                "details and examples".format(type_=type(repository_definitions)),
            )

        repository_def = RepositoryDefinition(
            name=self.name, description=self.description, repository_data=repository_data
        )

        update_wrapper(repository_def, fn)
        return repository_def


@overload
def repository(name: Callable[..., Any]) -> RepositoryDefinition:
    ...


@overload
def repository(name: Optional[str] = ..., description: Optional[str] = ...) -> _Repository:
    ...


[docs]def repository( name: Optional[Union[str, Callable[..., Any]]] = None, description: Optional[str] = None ) -> Union[RepositoryDefinition, _Repository]: """Create a repository from the decorated function. The decorated function should take no arguments and its return value should one of: 1. ``List[Union[JobDefinition, PipelineDefinition, PartitionSetDefinition, ScheduleDefinition, SensorDefinition]]``. Use this form when you have no need to lazy load pipelines or other definitions. This is the typical use case. 2. A dict of the form: .. code-block:: python { 'jobs': Dict[str, Callable[[], JobDefinition]], 'pipelines': Dict[str, Callable[[], PipelineDefinition]], 'partition_sets': Dict[str, Callable[[], PartitionSetDefinition]], 'schedules': Dict[str, Callable[[], ScheduleDefinition]] 'sensors': Dict[str, Callable[[], SensorDefinition]] } This form is intended to allow definitions to be created lazily when accessed by name, which can be helpful for performance when there are many definitions in a repository, or when constructing the definitions is costly. 3. A :py:class:`RepositoryData`. Return this object if you need fine-grained control over the construction and indexing of definitions within the repository, e.g., to create definitions dynamically from .yaml files in a directory. Args: name (Optional[str]): The name of the repository. Defaults to the name of the decorated function. description (Optional[str]): A string description of the repository. Example: .. code-block:: python ###################################################################### # A simple repository using the first form of the decorated function ###################################################################### @op(config_schema={n: Field(Int)}) def return_n(context): return context.op_config['n'] @job def simple_job(): return_n() @job def some_job(): ... @sensor(job=some_job) def some_sensor(): if foo(): yield RunRequest( run_key= ..., run_config={ 'ops': {'return_n': {'config': {'n': bar()}}} } ) @job def my_job(): ... my_schedule = ScheduleDefinition(cron_schedule="0 0 * * *", job=my_job) @repository def simple_repository(): return [simple_job, some_sensor, my_schedule] ###################################################################### # A lazy-loaded repository ###################################################################### def make_expensive_job(): @job def expensive_job(): for i in range(10000): return_n.alias(f'return_n_{i}')() return expensive_job def make_expensive_schedule(): @job def other_expensive_job(): for i in range(11000): return_n.alias(f'my_return_n_{i}')() return ScheduleDefinition(cron_schedule="0 0 * * *", job=other_expensive_job) @repository def lazy_loaded_repository(): return { 'jobs': {'expensive_job': make_expensive_job}, 'schedules': {'expensive_schedule: make_expensive_schedule} } ###################################################################### # A complex repository that lazily constructs jobs from a directory # of files in a bespoke YAML format ###################################################################### class ComplexRepositoryData(RepositoryData): def __init__(self, yaml_directory): self._yaml_directory = yaml_directory def get_all_pipelines(self): return [ self._construct_job_def_from_yaml_file( self._yaml_file_for_job_name(file_name) ) for file_name in os.listdir(self._yaml_directory) ] ... @repository def complex_repository(): return ComplexRepositoryData('some_directory') """ if callable(name): check.invariant(description is None) return _Repository()(name) return _Repository(name=name, description=description)