Source code for dagster.core.asset_defs.source_asset
from typing import NamedTuple, Optional, Sequence, Union
import dagster.check as check
from dagster.core.definitions.events import AssetKey
from dagster.core.definitions.metadata import (
MetadataEntry,
MetadataMapping,
MetadataUserInput,
PartitionMetadataEntry,
normalize_metadata,
)
from dagster.core.definitions.partition import PartitionsDefinition
[docs]class SourceAsset(
NamedTuple(
"_SourceAsset",
[
("key", AssetKey),
("metadata_entries", Sequence[Union[MetadataEntry, PartitionMetadataEntry]]),
("io_manager_key", str),
("description", Optional[str]),
("partitions_def", Optional[PartitionsDefinition]),
],
)
):
"""A SourceAsset represents an asset that is not generated by any Dagster op in the repository
that it's referenced from.
Attributes:
key (AssetKey): The key of the asset.
metadata_entries (List[MetadataEntry]): Metadata associated with the asset.
io_manager_key (str): The key for the IOManager that will be used to load the contents of
the asset when it's used as an input to other assets inside a job.
description (Optional[str]): The description of the asset.
partitions_def (Optional[PartitionsDefinition]): Defines the set of partition keys that
compose the asset.
"""
def __new__(
cls,
key: AssetKey,
metadata: Optional[MetadataUserInput] = None,
io_manager_key: str = "io_manager",
description: Optional[str] = None,
partitions_def: Optional[PartitionsDefinition] = None,
):
metadata = check.opt_dict_param(metadata, "metadata", key_type=str)
metadata_entries = normalize_metadata(metadata, [], allow_invalid=True)
return super().__new__(
cls,
key=check.inst_param(key, "key", AssetKey),
metadata_entries=metadata_entries,
io_manager_key=check.str_param(io_manager_key, "io_manager_key"),
description=check.opt_str_param(description, "description"),
partitions_def=check.opt_inst_param(
partitions_def, "partitions_def", PartitionsDefinition
),
)
@property
def metadata(self) -> MetadataMapping:
# PartitionMetadataEntry (unstable API) case is unhandled
return {entry.label: entry.entry_data for entry in self.metadata_entries} # type: ignore