Source code for dagster.core.storage.fs_io_manager

import os
import pickle

from dagster import check
from dagster.config import Field
from dagster.config.source import StringSource
from dagster.core.definitions.events import AssetKey, AssetMaterialization
from dagster.core.definitions.metadata import MetadataEntry
from dagster.core.errors import DagsterInvariantViolationError
from dagster.core.execution.context.input import InputContext
from dagster.core.execution.context.output import OutputContext
from dagster.core.storage.io_manager import IOManager, io_manager
from dagster.core.storage.memoizable_io_manager import MemoizableIOManager
from dagster.utils import PICKLE_PROTOCOL, mkdir_p
from dagster.utils.backcompat import experimental


[docs]@io_manager(config_schema={"base_dir": Field(StringSource, is_required=False)}) def fs_io_manager(init_context): """Built-in filesystem IO manager that stores and retrieves values using pickling. Allows users to specify a base directory where all the step outputs will be stored. By default, step outputs will be stored in the directory specified by local_artifact_storage in your dagster.yaml file (which will be a temporary directory if not explicitly set). Serializes and deserializes output values using pickling and automatically constructs the filepaths for the assets. Example usage: 1. Specify a job-level IO manager using the reserved resource key ``"io_manager"``, which will set the given IO manager on all ops in a job. .. code-block:: python from dagster import fs_io_manager, job, op @op def op_a(): # create df ... return df @op def op_b(df): return df[:5] @job( resource_defs={ "io_manager": fs_io_manager.configured({"base_path": "/my/base/path"}) } ) def job(): op_b(op_a()) 2. Specify IO manager on :py:class:`Out`, which allows the user to set different IO managers on different step outputs. .. code-block:: python from dagster import fs_io_manager, job, op, Out @op(out=Out(io_manager_key="my_io_manager")) def op_a(): # create df ... return df @op def op_b(df): return df[:5] @job(resource_defs={"my_io_manager": fs_io_manager}) def job(): op_b(op_a()) """ base_dir = init_context.resource_config.get( "base_dir", init_context.instance.storage_directory() ) return PickledObjectFilesystemIOManager(base_dir=base_dir)
class PickledObjectFilesystemIOManager(MemoizableIOManager): """Built-in filesystem IO manager that stores and retrieves values using pickling. Args: base_dir (Optional[str]): base directory where all the step outputs which use this object manager will be stored in. """ def __init__(self, base_dir=None): self.base_dir = check.opt_str_param(base_dir, "base_dir") self.write_mode = "wb" self.read_mode = "rb" def _get_path(self, context): """Automatically construct filepath.""" keys = context.get_output_identifier() return os.path.join(self.base_dir, *keys) def has_output(self, context): filepath = self._get_path(context) return os.path.exists(filepath) def handle_output(self, context, obj): """Pickle the data and store the object to a file. This method omits the AssetMaterialization event so assets generated by it won't be tracked by the Asset Catalog. """ check.inst_param(context, "context", OutputContext) filepath = self._get_path(context) context.log.debug(f"Writing file at: {filepath}") # Ensure path exists mkdir_p(os.path.dirname(filepath)) with open(filepath, self.write_mode) as write_obj: try: pickle.dump(obj, write_obj, PICKLE_PROTOCOL) except (AttributeError, RecursionError, ImportError, pickle.PicklingError) as e: executor = context.step_context.pipeline_def.mode_definitions[0].executor_defs[0] if isinstance(e, RecursionError): # if obj can't be pickled because of RecursionError then __str__() will also # throw a RecursionError obj_repr = f"{obj.__class__} exceeds recursion limit and" else: obj_repr = obj.__str__() raise DagsterInvariantViolationError( f"Object {obj_repr} is not picklable. You are currently using the " f"fs_io_manager and the {executor.name}. You will need to use a different " "io manager to continue using this output. For example, you can use the " "mem_io_manager with the in_process_executor.\n" "For more information on io managers, visit " "https://docs.dagster.io/concepts/io-management/io-managers \n" "For more information on executors, vist " "https://docs.dagster.io/deployment/executors#overview" ) def load_input(self, context): """Unpickle the file and Load it to a data object.""" check.inst_param(context, "context", InputContext) filepath = self._get_path(context.upstream_output) context.log.debug(f"Loading file from: {filepath}") with open(filepath, self.read_mode) as read_obj: return pickle.load(read_obj) class CustomPathPickledObjectFilesystemIOManager(IOManager): """Built-in filesystem IO managerthat stores and retrieves values using pickling and allow users to specify file path for outputs. Args: base_dir (Optional[str]): base directory where all the step outputs which use this object manager will be stored in. """ def __init__(self, base_dir=None): self.base_dir = check.opt_str_param(base_dir, "base_dir") self.write_mode = "wb" self.read_mode = "rb" def _get_path(self, path): return os.path.join(self.base_dir, path) def handle_output(self, context, obj): """Pickle the data and store the object to a custom file path. This method emits an AssetMaterialization event so the assets will be tracked by the Asset Catalog. """ check.inst_param(context, "context", OutputContext) metadata = context.metadata path = check.str_param(metadata.get("path"), "metadata.path") filepath = self._get_path(path) # Ensure path exists mkdir_p(os.path.dirname(filepath)) context.log.debug(f"Writing file at: {filepath}") with open(filepath, self.write_mode) as write_obj: pickle.dump(obj, write_obj, PICKLE_PROTOCOL) return AssetMaterialization( asset_key=AssetKey([context.pipeline_name, context.step_key, context.name]), metadata_entries=[MetadataEntry.fspath(os.path.abspath(filepath))], ) def load_input(self, context): """Unpickle the file from a given file path and Load it to a data object.""" check.inst_param(context, "context", InputContext) metadata = context.upstream_output.metadata path = check.str_param(metadata.get("path"), "metadata.path") filepath = self._get_path(path) context.log.debug(f"Loading file from: {filepath}") with open(filepath, self.read_mode) as read_obj: return pickle.load(read_obj)
[docs]@io_manager(config_schema={"base_dir": Field(StringSource, is_required=True)}) @experimental def custom_path_fs_io_manager(init_context): """Built-in IO manager that allows users to custom output file path per output definition. It requires users to specify a base directory where all the step output will be stored in. It serializes and deserializes output values (assets) using pickling and stores the pickled object in the user-provided file paths. Example usage: .. code-block:: python from dagster import custom_path_fs_io_manager, job, op @op(out=Out(metadata={"path": "path/to/sample_output"})) def sample_data(df): return df[:5] my_custom_path_fs_io_manager = custom_path_fs_io_manager.configured( {"base_dir": "path/to/basedir"} ) @job(resource_defs={"io_manager": my_custom_path_fs_io_manager}) def my_job(): sample_data() """ return CustomPathPickledObjectFilesystemIOManager( base_dir=init_context.resource_config.get("base_dir") )