Steps in a job often produce persistent artifacts, for instance, graphs or tables describing the result of some computation. Typically these artifacts are saved to disk (or to cloud storage) with a name that has something to do with their origin. But it can be hard to organize and cross-reference artifacts produced by many different runs of a job, or to identify all of the files that might have been created by some job's logic.
Dagster ops can describe their persistent artifacts to the system by logging or yielding AssetMaterialization
events. Like TypeCheck
and ExpectationResult
, asset materializations are side-channels for metadata -- they don't get passed to downstream ops and they aren't used to define the data dependencies that structure a job's DAG.
Suppose that we rewrite our sort_calories
op so that it saves the newly sorted data frame to disk.
# start_materializations_marker_0
from dagster import (
AssetMaterialization,
MetadataValue,
get_dagster_logger,
op,
)
@op
def sort_by_calories(context, cereals):
sorted_cereals = sorted(cereals, key=lambda cereal: int(cereal["calories"]))
least_caloric = sorted_cereals[0]["name"]
most_caloric = sorted_cereals[-1]["name"]
logger = get_dagster_logger()
logger.info(f"Least caloric cereal: {least_caloric}")
logger.info(f"Most caloric cereal: {most_caloric}")
fieldnames = list(sorted_cereals[0].keys())
sorted_cereals_csv_path = os.path.abspath(
We've taken the basic precaution of ensuring that the saved csv file has a different filename for each run of the job. But there's no way for Dagit to know about this persistent artifact. So we'll add the following lines:
from dagster import (
AssetMaterialization,
MetadataValue,
get_dagster_logger,
op,
)
@op
def sort_by_calories(context, cereals):
sorted_cereals = sorted(cereals, key=lambda cereal: int(cereal["calories"]))
least_caloric = sorted_cereals[0]["name"]
most_caloric = sorted_cereals[-1]["name"]
logger = get_dagster_logger()
logger.info(f"Least caloric cereal: {least_caloric}")
logger.info(f"Most caloric cereal: {most_caloric}")
fieldnames = list(sorted_cereals[0].keys())
sorted_cereals_csv_path = os.path.abspath(
f"output/calories_sorted_{context.run_id}.csv"
)
os.makedirs(os.path.dirname(sorted_cereals_csv_path), exist_ok=True)
with open(sorted_cereals_csv_path, "w") as fd:
writer = csv.DictWriter(fd, fieldnames)
writer.writeheader()
writer.writerows(sorted_cereals)
context.log_event(
AssetMaterialization(
asset_key="sorted_cereals_csv",
description="Cereals data frame sorted by caloric content",
metadata={
"sorted_cereals_csv_path": MetadataValue.path(sorted_cereals_csv_path)
},
)
)
Now, if we run this job in Dagit: