Asset Materializations#

Software-defined assets provide a declarative way to define what assets should exist and how to create them. But when it's not known what asset an op is going to materialize until the op runs, you can still create assets using AssetMaterialization events.

"Asset" is Dagster's word for an entity, external to ops, that is mutated or created by an op. An asset might be a table in a database that an op appends to, an ML model in a model store that an op overwrites, or even a slack channel that an op writes messages to.

Op outputs often correspond to assets. For example, an op might be responsible for recreating a table, and one of its outputs might be a dataframe containing the contents of that table.

Assets can also have partitions, which refer to slices of the overall asset. The simplest example would be a table that has a partition for each day. A given op execution may simply write a single day's worth of data to that table, rather than dropping the entire table and replacing it with new data.

Dagster lets you track the interactions between ops, outputs, and assets over time and view them in the Dagit Asset Catalog. Every asset has a "key", which serves as a unique identifier for that particular entity. The act of creating or updating the contents of an asset is called a "materialization", and Dagster tracks these materializations using AssetMaterialization events. These events can either be logged by the user at runtime, or automatically created by Dagster in cases where an AssetKey has been referenced by an op output.

Relevant APIs#

NameDescription
AssetMaterializationDagster event indicating that an op has materialized an asset.
AssetKeyA unique identifier for a particular external asset

Overview#

There are two general patterns for dealing with assets when using Dagster:

  • Put the logic to write/store assets inside the body of an op.
  • Focus the op purely on business logic, and delegate the logic to write/store assets to an IOManager.

Regardless of which pattern you are using, AssetMaterialization events are used to communicate to Dagster that a materialization has occurred. You can create these events either by explicitly logging them at runtime, or (using an experimental interface), have Dagster automatically generate them by defining that a given op output corresponds to a given AssetKey.

Explicit AssetMaterializations#

One way of recording materialization events is to log AssetMaterialization events at runtime. These events should be co-located with your materialization logic, meaning if you store your object within your op body, then you should log from within that op, and if you store your object using an IOManager, then you should log the event from your manager.

Logging an AssetMaterialization from a Op#

To make Dagster aware that we materialized an asset in our op, we can log an AssetMaterialization event using the method OpExecutionContext.log_event. This would involve changing the following op:

from dagster import op


@op
def my_simple_op():
    df = read_df()
    remote_storage_path = persist_to_storage(df)
    return remote_storage_path

into something like this:

from dagster import AssetMaterialization, op


@op
def my_materialization_op(context):
    df = read_df()
    remote_storage_path = persist_to_storage(df)
    context.log_event(
        AssetMaterialization(
            asset_key="my_dataset", description="Persisted result to storage"
        )
    )
    return remote_storage_path

We should now see a materialization event in the event log when we execute a job with this op.

asset-materialization

Logging an AssetMaterialization from an IOManager#

To record that an IOManager has mutated or created an asset, we can log an AssetMaterialization event from its handle_output method. We do this via the method OutputContext.log_event.

from dagster import AssetMaterialization, IOManager


class PandasCsvIOManager(IOManager):
    def load_input(self, context):
        file_path = os.path.join("my_base_dir", context.step_key, context.name)
        return read_csv(file_path)

    def handle_output(self, context, obj):
        file_path = os.path.join("my_base_dir", context.step_key, context.name)

        obj.to_csv(file_path)

        context.log_event(
            AssetMaterialization(
                asset_key=AssetKey(file_path),
                description="Persisted result to storage.",
            )
        )

Attaching Metadata to an AssetMaterialization#

There are a variety of types of metadata that can be associated with a materialization event, all through the MetadataEntry class. Each materialization event optionally takes a list of metadata entries that are then displayed in the event log and the Asset Catalog.

Example: Op body#

from dagster import op, AssetMaterialization, MetadataValue


@op
def my_metadata_materialization_op(context):
    df = read_df()
    remote_storage_path = persist_to_storage(df)
    context.log_event(
        AssetMaterialization(
            asset_key="my_dataset",
            description="Persisted result to storage",
            metadata={
                "text_metadata": "Text-based metadata for this event",
                "path": MetadataValue.path(remote_storage_path),
                "dashboard_url": MetadataValue.url(
                    "http://mycoolsite.com/url_for_my_data"
                ),
                "size (bytes)": calculate_bytes(df),
            },
        )
    )
    return remote_storage_path

Example: IOManager#

from dagster import AssetMaterialization, IOManager


class PandasCsvIOManagerWithAsset(IOManager):
    def load_input(self, context):
        file_path = os.path.join("my_base_dir", context.step_key, context.name)
        return read_csv(file_path)

    def handle_output(self, context, obj):
        file_path = os.path.join("my_base_dir", context.step_key, context.name)

        obj.to_csv(file_path)

        context.log_event(
            AssetMaterialization(
                asset_key=AssetKey(file_path),
                description="Persisted result to storage.",
                metadata={
                    "number of rows": obj.shape[0],
                    "some_column mean": obj["some_column"].mean(),
                },
            )
        )

Check our API docs for MetadataEntry for more details on they types of event metadata available.

Specifying a partition for an AssetMaterialization#

If you are materializing a single slice of an asset (e.g. a single day's worth of data on a larger table), rather than mutating or creating it entirely, you can indicate this to Dagster by including the partition argument on the object.

from dagster import op, AssetMaterialization


@op(config_schema={"date": str})
def my_partitioned_asset_op(context):
    partition_date = context.op_config["date"]
    df = read_df_for_date(partition_date)
    remote_storage_path = persist_to_storage(df)
    context.log_event(
        AssetMaterialization(asset_key="my_dataset", partition=partition_date)
    )
    return remote_storage_path

Linking Op Outputs to Assets
Experimental
#

It is fairly common for an asset to correspond to an op output. In the following simplified example, our op produces a dataframe, persists it to storage, and then passes the dataframe along as an output:

from dagster import op, Output, AssetMaterialization


@op
def my_asset_op(context):
    df = read_df()
    persist_to_storage(df)
    context.log_event(AssetMaterialization(asset_key="my_dataset"))
    return df

In this case, the AssetMaterialization and the Output events both correspond to the same data, the dataframe that we have created. With this in mind, we can simplify the above code, and provide useful information to the Dagster framework, by making this link between the my_dataset asset and the output of this op explicit.

Just as there are two places in which you can log runtime AssetMaterialization events (within an op body and within an IOManager), we provide two different interfaces for linking an op output to to an asset. Regardless of which you choose, every time the op runs and logs that output, an AssetMaterialization event will automatically be created to record this information.

If you use an Output event to yield your output, and specified any metadata entries on it, (see: Op Event Docs), these entries will automatically be attached to the materialization event for this asset.

Linking assets to an Output Definition
Experimental
#

For cases where you are storing your asset within the body of an op, the easiest way of linking an asset to an op output is with the asset_key parameter on the relevant OutputDefinition in your op. To do this, you may define a constant AssetKey that identifies the asset you are linking.

from dagster import op, Output, Out, AssetKey


@op(out=Out(asset_key=AssetKey("my_dataset")))
def my_constant_asset_op(context):
    df = read_df()
    persist_to_storage(df)
    return df

Linking assets to outputs with an IOManager
Experimental
#

If you've defined a custom IOManager to handle storing your op's outputs, the IOManager will likely be the most natural place to define which asset a particular output will be written to. To do this, you can implement the get_output_asset_key function on your IOManager.

Similar to the above interface, this function takes an OutputContext and returns an AssetKey. The following example functions nearly identically to PandasCsvIOManagerWithMetadata from the runtime example above.

from dagster import AssetKey, IOManager, MetadataEntry


class PandasCsvIOManagerWithOutputAsset(IOManager):
    def load_input(self, context):
        file_path = os.path.join("my_base_dir", context.step_key, context.name)
        return read_csv(file_path)

    def handle_output(self, context, obj):
        file_path = os.path.join("my_base_dir", context.step_key, context.name)

        obj.to_csv(file_path)

        yield MetadataEntry.int(obj.shape[0], label="number of rows")
        yield MetadataEntry.float(obj["some_column"].mean(), "some_column mean")

    def get_output_asset_key(self, context):
        file_path = os.path.join("my_base_dir", context.step_key, context.name)
        return AssetKey(file_path)

When an output is linked to an asset in this way, the generated AssetMaterialization event will contain any MetadataEntry information yielded from the handle_output function (in addiition to all of the metadata specified on the corresponding Output event).

See the IOManager docs for more information on yielding these entries from an IOManager.

Specifying partitions for an output-linked asset#

If you are already specifying a get_output_asset_key function on your IOManager, you can optionally specify a set of partitions that this manager will be updating or creating by also defining a get_output_asset_partitions function. If you do this, an AssetMaterialization will be created for each of the specified partitions. One useful pattern to pass this partition information (which will likely vary each run) to the manager, is to specify the set of partitions on the configuration of the output. You can do this by providing per-output configuration on the IOManager.

Then, you can calculate the asset partitions that a particular output will correspond to by reading this output configuration in get_output_asset_partitions:

from dagster import AssetKey, IOManager, MetadataEntry


class PandasCsvIOManagerWithOutputAssetPartitions(IOManager):
    def load_input(self, context):
        file_path = os.path.join("my_base_dir", context.step_key, context.name)
        return read_csv(file_path)

    def handle_output(self, context, obj):
        file_path = os.path.join("my_base_dir", context.step_key, context.name)

        obj.to_csv(file_path)

        yield MetadataEntry.int(obj.shape[0], label="number of rows")
        yield MetadataEntry.float(obj["some_column"].mean(), "some_column mean")

    def get_output_asset_key(self, context):
        file_path = os.path.join("my_base_dir", context.step_key, context.name)
        return AssetKey(file_path)

    def get_output_asset_partitions(self, context):
        return set(context.config["partitions"])

Asset Lineage
Experimental
#

When an op output is linked to an AssetKey, Dagster can automatically generate lineage information that describes how this asset relates to other output-linked assets.

As a simplified example, imagine a two-op job that first scrapes some user data from an API, storing it to a table, then trains an ML model on that data, persisting it to a model store:

from dagster import op, job, AssetKey, Out


@op(out=Out(asset_key=AssetKey("my_db.users")))
def scrape_users():
    users_df = some_api_call()
    persist_to_db(users_df)
    return users_df


@op(out=Out(asset_key=AssetKey("ml_models.user_prediction")))
def get_prediction_model(users_df):
    my_ml_model = train_prediction_model(users_df)
    persist_to_model_store(my_ml_model)
    return my_ml_model


@job
def my_user_model_job():
    get_prediction_model(scrape_users())

In this case, it's certainly fair to say that this ML model, which we have assigned the key ml_models.user_prediction, depends on the table that we created, my_db.users (it uses the data in the table to train the model).

Why is that? By specifying the structure of your job, you have already defined data depedencies between these ops. By linking the output of scrape_users to the input of get_prediction_model, we can now infer that whatever this second op outputs will be some function of its input. Furthermore, since we have linked each of these outputs to external assets, we can use this knowledge to say that the asset associated with the output of get_prediction_model depends on the asset associated with the output of scrape_users.

This feature is still in its early stages, but for now, this lineage information is surfaced in the Asset Catalog page for each asset ("Parent assets"):

asset-lineage

Software-defined assets vs. Asset Materializations#

When working with software-defined assets, the assets and their dependencies must be known at definition time. When you look at software-defined assets in Dagit, you can see exactly what assets are going to be materialized before any code runs.

Asset Materializations, on the other hand, are logged at run time. When you run an op, you find out which assets were materialized while the op is running. This allows for some flexibility, like if you wanted to determine which assets should be materialized based on the output of a previous op.