Software-Defined Assets (Experimental)

Software-defined assets sit on top of the graph/job/op APIs and enable a novel way of constructing Dagster jobs that puts assets at the forefront.

Conceptually, software-defined assets invert the typical relationship between assets and computation. Instead of defining a graph of ops and recording which assets those ops end up materializing, you define a set of assets, each of which knows how to compute its contents from upstream assets.

A software-defined asset combines: - An asset key, e.g. the name of a table. - A function, which can be run to compute the contents of the asset. - A set of upstream assets that are provided as inputs to the function when computing the asset.

@dagster.asset(name=None, namespace=None, ins=None, non_argument_deps=None, metadata=None, description=None, required_resource_keys=None, io_manager_key=None, compute_kind=None, dagster_type=None, partitions_def=None, partition_mappings=None)[source]

Create a definition for how to compute an asset.

A software-defined asset is the combination of: 1. An asset key, e.g. the name of a table. 2. A function, which can be run to compute the contents of the asset. 3. A set of upstream assets that are provided as inputs to the function when computing the asset.

Unlike an op, whose dependencies are determined by the graph it lives inside, an asset knows about the upstream assets it depends on. The upstream assets are inferred from the arguments to the decorated function. The name of the argument designates the name of the upstream asset.

Parameters
  • name (Optional[str]) – The name of the asset. If not provided, defaults to the name of the decorated function.

  • namespace (Optional[Sequence[str]]) – The namespace that the asset resides in. The namespace + the name forms the asset key.

  • ins (Optional[Mapping[str, AssetIn]]) – A dictionary that maps input names to their metadata and namespaces.

  • metadata (Optional[Dict[str, Any]]) – A dict of metadata entries for the asset.

  • required_resource_keys (Optional[Set[str]]) – Set of resource handles required by the op.

  • io_manager_key (Optional[str]) – The resource key of the IOManager used for storing the output of the op as an asset, and for loading it in downstream ops (default: “io_manager”).

  • compute_kind (Optional[str]) – A string to represent the kind of computation that produces the asset, e.g. “dbt” or “spark”. It will be displayed in Dagit as a badge on the asset.

  • dagster_type (Optional[DagsterType]) – Allows specifying type validation functions that will be executed on the output of the decorated function after it runs.

  • partitions_def (Optional[PartitionsDefinition]) – Defines the set of partition keys that compose the asset.

  • partition_mappings (Optional[Mapping[str, PartitionMapping]]) – Defines how to map partition keys for this asset to partition keys of upstream assets. Each key in the dictionary correponds to one of the input assets, and each value is a PartitionMapping. If no entry is provided for a particular asset dependency, the partition mapping defaults to the default partition mapping for the partitions definition, which is typically maps partition keys to the same partition keys in upstream assets.

Examples

@asset
def my_asset(my_upstream_asset: int) -> int:
    return my_upstream_asset + 1
class dagster.AssetGroup(assets, source_assets=None, resource_defs=None, executor_def=None)[source]

Defines a group of assets, along with environment information in the form of resources and an executor.

An AssetGroup can be provided to a RepositoryDefinition. When provided to a repository, the constituent assets can be materialized from Dagit. The AssetGroup also provides an interface for creating jobs from subselections of assets, which can then be provided to a ScheduleDefinition or SensorDefinition.

There can only be one AssetGroup per repository.

Parameters
  • assets (Sequence[AssetsDefinition]) – The set of software-defined assets to group.

  • source_assets (Optional[Sequence[SourceAsset]]) – The set of source assets that the software-defined may depend on.

  • resource_defs (Optional[Mapping[str, ResourceDefinition]]) – A dictionary of resource definitions. When the AssetGroup is constructed, if there are any unsatisfied resource requirements from the assets, it will result in an error. Note that the root_manager key is a reserved resource key, and will result in an error if provided by the user.

  • executor_def (Optional[ExecutorDefinition]) – The executor definition to use when re-materializing assets in this group.

Examples

from dagster import AssetGroup, asset, AssetIn, AssetKey, SourceAsset, resource

source_asset = SourceAsset("source")

@asset(required_resource_keys={"foo"})
def start_asset(context, source):
    ...

@asset
def next_asset(start_asset):
    ...

@resource
def foo_resource():
    ...

asset_group = AssetGroup(
    assets=[start_asset, next_asset],
    source_assets=[source_asset],
    resource_defs={"foo": foo_resource},
)
...
@dagster.multi_asset(outs, name=None, ins=None, non_argument_deps=None, description=None, required_resource_keys=None, compute_kind=None, internal_asset_deps=None)[source]

Create a combined definition of multiple assets that are computed using the same op and same upstream assets.

Each argument to the decorated function references an upstream asset that this asset depends on. The name of the argument designates the name of the upstream asset.

Parameters
  • name (Optional[str]) – The name of the op.

  • outs – (Optional[Dict[str, Out]]): The Outs representing the produced assets.

  • ins (Optional[Mapping[str, AssetIn]]) – A dictionary that maps input names to their metadata and namespaces.

  • required_resource_keys (Optional[Set[str]]) – Set of resource handles required by the op.

  • io_manager_key (Optional[str]) – The resource key of the IOManager used for storing the output of the op as an asset, and for loading it in downstream ops (default: “io_manager”).

  • compute_kind (Optional[str]) – A string to represent the kind of computation that produces the asset, e.g. “dbt” or “spark”. It will be displayed in Dagit as a badge on the asset.

  • internal_asset_deps (Optional[Mapping[str, Set[AssetKey]]]) – By default, it is assumed that all assets produced by a multi_asset depend on all assets that are consumed by that multi asset. If this default is not correct, you pass in a map of output names to a corrected set of AssetKeys that they depend on. Any AssetKeys in this list must be either used as input to the asset or produced within the op.

dagster.build_assets_job(name, assets, source_assets=None, resource_defs=None, description=None, config=None, tags=None, executor_def=None)[source]

Builds a job that materializes the given assets.

The dependencies between the ops in the job are determined by the asset dependencies defined in the metadata on the provided asset nodes.

Parameters
  • name (str) – The name of the job.

  • assets (List[AssetsDefinition]) – A list of assets or multi-assets - usually constructed using the @asset() or @multi_asset() decorator.

  • source_assets (Optional[Sequence[Union[SourceAsset, AssetsDefinition]]]) – A list of assets that are not materialized by this job, but that assets in this job depend on.

  • resource_defs (Optional[Dict[str, ResourceDefinition]]) – Resource defs to be included in this job.

  • description (Optional[str]) – A description of the job.

Examples

@asset
def asset1():
    return 5

@asset
def asset2(asset1):
    return my_upstream_asset + 1

my_assets_job = build_assets_job("my_assets_job", assets=[asset1, asset2])
Returns

A job that materializes the given assets.

Return type

JobDefinition

class dagster.AssetIn(asset_key=None, metadata=None, namespace=None)[source]
class dagster.SourceAsset(key, metadata=None, io_manager_key='io_manager', description=None, partitions_def=None)[source]

A SourceAsset represents an asset that is not generated by any Dagster op in the repository that it’s referenced from.

key

The key of the asset.

Type

AssetKey

metadata_entries

Metadata associated with the asset.

Type

List[MetadataEntry]

io_manager_key

The key for the IOManager that will be used to load the contents of the asset when it’s used as an input to other assets inside a job.

Type

str

description

The description of the asset.

Type

Optional[str]

partitions_def

Defines the set of partition keys that compose the asset.

Type

Optional[PartitionsDefinition]

dagster.fs_asset_io_manager IOManagerDefinition[source]

Config Schema:
base_dir (dagster.StringSource, optional)

IO manager that stores values on the local filesystem, serializing them with pickle.

Each asset is assigned to a single filesystem path, at “<base_dir>/<asset_key>”. If the asset key has multiple components, the final component is used as the name of the file, and the preceding components as parent directories under the base_dir.

Subsequent materializations of an asset will overwrite previous materializations of that asset.

If not provided via configuration, the base dir is the local_artifact_storage in your dagster.yaml file. That will be a temporary directory if not explicitly set.

So, with a base directory of “/my/base/path”, an asset with key AssetKey([“one”, “two”, “three”]) would be stored in a file called “three” in a directory with path “/my/base/path/one/two/”.

Example usage:

1. Specify a collection-level IO manager using the reserved resource key "io_manager", which will set the given IO manager on all assets in the collection.

from dagster import AssetGroup, asset, fs_asset_io_manager

@asset
def asset1():
    # create df ...
    return df

@asset
def asset2(asset1):
    return df[:5]

asset_group = AssetGroup(
    [asset1, asset2],
    resource_defs={
        "io_manager": fs_asset_io_manager.configured({"base_path": "/my/base/path"})
    },
)

2. Specify IO manager on the asset, which allows the user to set different IO managers on different assets.

from dagster import fs_io_manager, job, op, Out

@asset(io_manager_key="my_io_manager")
def asset1():
    # create df ...
    return df

@asset
def asset2(asset1):
    return df[:5]

asset_group = AssetGroup(
    [asset1, asset2],
    resource_defs={
        "my_io_manager": fs_asset_io_manager.configured({"base_path": "/my/base/path"})
    },
)