Source code for dagstermill.io_managers
import os
from pathlib import Path
from typing import Any, List, Optional
from dagster import check
from dagster.config.field import Field
from dagster.core.definitions.events import AssetKey
from dagster.core.definitions.metadata import MetadataEntry
from dagster.core.execution.context.input import InputContext
from dagster.core.execution.context.output import OutputContext
from dagster.core.storage.io_manager import IOManager, io_manager
from dagster.utils import mkdir_p
class OutputNotebookIOManager(IOManager):
def __init__(self, asset_key_prefix: Optional[List[str]] = None):
self.asset_key_prefix = asset_key_prefix if asset_key_prefix else []
def get_output_asset_key(self, context: OutputContext):
return AssetKey([*self.asset_key_prefix, f"{context.step_key}_output_notebook"])
def handle_output(self, context: OutputContext, obj: bytes):
raise NotImplementedError
def load_input(self, context: InputContext) -> Any:
raise NotImplementedError
class LocalOutputNotebookIOManager(OutputNotebookIOManager):
"""Built-in IO Manager for handling output notebook."""
def __init__(self, base_dir: str, asset_key_prefix: Optional[List[str]] = None):
super(LocalOutputNotebookIOManager, self).__init__(asset_key_prefix=asset_key_prefix)
self.base_dir = base_dir
self.write_mode = "wb"
self.read_mode = "rb"
def _get_path(self, context: OutputContext) -> str:
"""Automatically construct filepath."""
keys = context.get_run_scoped_output_identifier()
return str(Path(self.base_dir, *keys).with_suffix(".ipynb"))
def handle_output(self, context: OutputContext, obj: bytes):
"""obj: bytes"""
check.inst_param(context, "context", OutputContext)
# the output notebook itself is stored at output_file_path
output_notebook_path = self._get_path(context)
mkdir_p(os.path.dirname(output_notebook_path))
with open(output_notebook_path, self.write_mode) as dest_file_obj:
dest_file_obj.write(obj)
yield MetadataEntry.fspath(path=output_notebook_path, label="path")
def load_input(self, context) -> bytes:
check.inst_param(context, "context", InputContext)
# pass output notebook to downstream solids as File Object
with open(self._get_path(context.upstream_output), self.read_mode) as file_obj:
return file_obj.read()
[docs]@io_manager(
config_schema={
"asset_key_prefix": Field(str, is_required=False),
"base_dir": Field(str, is_required=False),
},
)
def local_output_notebook_io_manager(init_context):
"""Built-in IO Manager that handles output notebooks."""
return LocalOutputNotebookIOManager(
base_dir=init_context.resource_config.get(
"base_dir", init_context.instance.storage_directory()
),
asset_key_prefix=init_context.resource_config.get("asset_key_prefix", []),
)