Source code for dagster.core.storage.fs_asset_io_manager

import os

from dagster.config import Field
from dagster.config.source import StringSource
from dagster.core.storage.io_manager import io_manager

from .fs_io_manager import PickledObjectFilesystemIOManager


[docs]@io_manager(config_schema={"base_dir": Field(StringSource, is_required=False)}) def fs_asset_io_manager(init_context): """IO manager that stores values on the local filesystem, serializing them with pickle. Each asset is assigned to a single filesystem path, at "<base_dir>/<asset_key>". If the asset key has multiple components, the final component is used as the name of the file, and the preceding components as parent directories under the base_dir. Subsequent materializations of an asset will overwrite previous materializations of that asset. If not provided via configuration, the base dir is the local_artifact_storage in your dagster.yaml file. That will be a temporary directory if not explicitly set. So, with a base directory of "/my/base/path", an asset with key `AssetKey(["one", "two", "three"])` would be stored in a file called "three" in a directory with path "/my/base/path/one/two/". Example usage: 1. Specify a collection-level IO manager using the reserved resource key ``"io_manager"``, which will set the given IO manager on all assets in the collection. .. code-block:: python from dagster import AssetGroup, asset, fs_asset_io_manager @asset def asset1(): # create df ... return df @asset def asset2(asset1): return df[:5] asset_group = AssetGroup( [asset1, asset2], resource_defs={ "io_manager": fs_asset_io_manager.configured({"base_path": "/my/base/path"}) }, ) 2. Specify IO manager on the asset, which allows the user to set different IO managers on different assets. .. code-block:: python from dagster import fs_io_manager, job, op, Out @asset(io_manager_key="my_io_manager") def asset1(): # create df ... return df @asset def asset2(asset1): return df[:5] asset_group = AssetGroup( [asset1, asset2], resource_defs={ "my_io_manager": fs_asset_io_manager.configured({"base_path": "/my/base/path"}) }, ) """ base_dir = init_context.resource_config.get( "base_dir", init_context.instance.storage_directory() ) return AssetPickledObjectFilesystemIOManager(base_dir=base_dir)
class AssetPickledObjectFilesystemIOManager(PickledObjectFilesystemIOManager): def _get_path(self, context): return os.path.join(self.base_dir, *context.asset_key.path)