Source code for dagster_ge.factory

import datetime

import great_expectations as ge
from dagster_pandas import DataFrame
from great_expectations.render.renderer import ValidationResultsPageRenderer
from great_expectations.render.view import DefaultMarkdownPageView

from dagster import (
    ExpectationResult,
    InputDefinition,
    MetadataEntry,
    Noneable,
    Output,
    OutputDefinition,
    StringSource,
    check,
    op,
    resource,
    solid,
)

try:
    # ge < v0.13.0
    from great_expectations.core import convert_to_json_serializable
except ImportError:
    # ge >= v0.13.0
    from great_expectations.core.util import convert_to_json_serializable


@resource(config_schema={"ge_root_dir": Noneable(StringSource)})
def ge_data_context(context):
    if context.resource_config["ge_root_dir"] is None:
        yield ge.data_context.DataContext()
    else:
        yield ge.data_context.DataContext(context_root_dir=context.resource_config["ge_root_dir"])


def core_ge_validation_factory(
    dagster_decorator,
    decorator_name,
    name,
    datasource_name,
    suite_name,
    validation_operator_name=None,
    input_dagster_type=DataFrame,
    batch_kwargs=None,
):
    check.str_param(datasource_name, "datasource_name")
    check.str_param(suite_name, "suite_name")
    check.opt_str_param(validation_operator_name, "validation_operator_name")
    batch_kwargs = check.opt_dict_param(batch_kwargs, "batch_kwargs")

    @dagster_decorator(
        name=name,
        input_defs=[InputDefinition("dataset", input_dagster_type)],
        output_defs=[
            OutputDefinition(
                dagster_type=dict,
                description=f"""
        This {decorator_name} yields an expectationResult with a structured dict of metadata from
        the GE suite, as well as the full result in case a user wants to process it differently.
        The structured dict contains both summary stats from the suite as well as expectation by
        expectation results/details.
        """,
            )
        ],
        required_resource_keys={"ge_data_context"},
        tags={"kind": "ge"},
    )
    def _ge_validation_fn(context, dataset):
        data_context = context.resources.ge_data_context
        if validation_operator_name is not None:
            validation_operator = validation_operator_name
        else:
            data_context.add_validation_operator(
                "ephemeral_validation",
                {"class_name": "ActionListValidationOperator", "action_list": []},
            )
            validation_operator = "ephemeral_validation"
        suite = data_context.get_expectation_suite(suite_name)
        final_batch_kwargs = batch_kwargs or {"dataset": dataset}
        if "datasource" in batch_kwargs:
            context.log.warning(
                "`datasource` field of `batch_kwargs` will be ignored; use the `datasource_name` "
                f"parameter of the {decorator_name} factory instead."
            )
        final_batch_kwargs["datasource"] = datasource_name
        batch = data_context.get_batch(final_batch_kwargs, suite)
        run_id = {
            "run_name": datasource_name + " run",
            "run_time": datetime.datetime.utcnow(),
        }
        results = data_context.run_validation_operator(
            validation_operator, assets_to_validate=[batch], run_id=run_id
        )
        res = convert_to_json_serializable(results.list_validation_results())[0]
        validation_results_page_renderer = ValidationResultsPageRenderer(run_info_at_end=True)
        rendered_document_content_list = (
            validation_results_page_renderer.render_validation_operator_result(results)
        )
        md_str = " ".join(DefaultMarkdownPageView().render(rendered_document_content_list))

        meta_stats = MetadataEntry.md(md_str=md_str, label="Expectation Results")
        yield ExpectationResult(
            success=res["success"],
            metadata_entries=[
                meta_stats,
            ],
        )
        yield Output(res)

    return _ge_validation_fn


[docs]def ge_validation_solid_factory( name, datasource_name, suite_name, validation_operator_name=None, input_dagster_type=DataFrame, batch_kwargs=None, ): """Generates solids for interacting with GE. Args: name (str): the name of the solid datasource_name (str): the name of your DataSource, see your great_expectations.yml suite_name (str): the name of your expectation suite, see your great_expectations.yml validation_operator_name (Optional[str]): what validation operator to run -- defaults to None, which generates an ephemeral validator. If you want to save data docs, use 'action_list_operator'. See https://docs.greatexpectations.io/en/latest/reference/core_concepts/validation_operators_and_actions.html input_dagster_type (DagsterType): the Dagster type used to type check the input to the solid. Defaults to `dagster_pandas.DataFrame`. batch_kwargs (Optional[dict]): overrides the `batch_kwargs` parameter when calling the `ge_data_context`'s `get_batch` method. Defaults to `{"dataset": dataset}`, where `dataset` is the input to the generated solid. Returns: A solid that takes in a set of data and yields both an expectation with relevant metadata and an output with all the metadata (for user processing) """ return core_ge_validation_factory( solid, "solid", name, datasource_name, suite_name, validation_operator_name, input_dagster_type, batch_kwargs, )
[docs]def ge_validation_op_factory( name, datasource_name, suite_name, validation_operator_name=None, input_dagster_type=DataFrame, batch_kwargs=None, ): """Generates ops for interacting with GE. Args: name (str): the name of the op datasource_name (str): the name of your DataSource, see your great_expectations.yml suite_name (str): the name of your expectation suite, see your great_expectations.yml validation_operator_name (Optional[str]): what validation operator to run -- defaults to None, which generates an ephemeral validator. If you want to save data docs, use 'action_list_operator'. See https://docs.greatexpectations.io/en/latest/reference/core_concepts/validation_operators_and_actions.html input_dagster_type (DagsterType): the Dagster type used to type check the input to the op. Defaults to `dagster_pandas.DataFrame`. batch_kwargs (Optional[dict]): overrides the `batch_kwargs` parameter when calling the `ge_data_context`'s `get_batch` method. Defaults to `{"dataset": dataset}`, where `dataset` is the input to the generated op. Returns: A solid that takes in a set of data and yields both an expectation with relevant metadata and an output with all the metadata (for user processing) """ return core_ge_validation_factory( solid, "solid", name, datasource_name, suite_name, validation_operator_name, input_dagster_type, batch_kwargs, )
def core_ge_validation_factory_v3( dagster_decorator, decorator_name, name, datasource_name, data_connector_name, data_asset_name, suite_name, batch_identifiers: dict, input_dagster_type=DataFrame, runtime_method_type="batch_data", extra_kwargs=None, ): check.str_param(datasource_name, "datasource_name") check.str_param(data_connector_name, "data_connector_name") check.str_param(suite_name, "suite_name") extra_kwargs = check.opt_dict_param(extra_kwargs, "extra_kwargs") @dagster_decorator( name=name, input_defs=[InputDefinition("dataset", input_dagster_type)], output_defs=[ OutputDefinition( dagster_type=dict, description=f""" This {decorator_name} yields an ExpectationResult with a structured dict of metadata from the GE suite, as well as the full result in case a user wants to process it differently. The structured dict contains both summary stats from the suite as well as expectation by expectation results/details. """, ) ], required_resource_keys={"ge_data_context"}, tags={"kind": "ge"}, ) def _ge_validation_fn(context, dataset): data_context = context.resources.ge_data_context validator_kwargs = { "datasource_name": datasource_name, "data_connector_name": data_connector_name, "data_asset_name": datasource_name or data_asset_name, "runtime_parameters": {runtime_method_type: dataset}, "batch_identifiers": batch_identifiers, "expectation_suite_name": suite_name, **extra_kwargs, } validator = data_context.get_validator(**validator_kwargs) run_id = { "run_name": datasource_name + " run", "run_time": datetime.datetime.utcnow(), } results = validator.validate(run_id=run_id) validation_results_page_renderer = ValidationResultsPageRenderer(run_info_at_end=True) rendered_document_content_list = validation_results_page_renderer.render( validation_results=results ) md_str = "".join(DefaultMarkdownPageView().render(rendered_document_content_list)) meta_stats = MetadataEntry.md(md_str=md_str, label="Expectation Results") yield ExpectationResult( success=bool(results["success"]), metadata_entries=[meta_stats], ) yield Output(results.to_json_dict()) return _ge_validation_fn def ge_validation_solid_factory_v3( name, datasource_name, data_connector_name, data_asset_name, suite_name, batch_identifiers: dict, input_dagster_type=DataFrame, runtime_method_type="batch_data", extra_kwargs=None, ): """Generates solids for interacting with GE (v3 API) Args: name (str): the name of the solid datasource_name (str): the name of your DataSource, see your great_expectations.yml data_connector_name (str): the name of the data connector for this datasource. This should point to a RuntimeDataConnector. For information on how to set this up, see: https://docs.greatexpectations.io/docs/guides/connecting_to_your_data/how_to_create_a_batch_of_data_from_an_in_memory_spark_or_pandas_dataframe data_asset_name (str): the name of the data asset that this solid will be validating. suite_name (str): the name of your expectation suite, see your great_expectations.yml batch_identifier_fn (dict): A dicitonary of batch identifiers to uniquely identify this batch of data. To learn more about batch identifiers, see: https://docs.greatexpectations.io/docs/reference/datasources#batches. input_dagster_type (DagsterType): the Dagster type used to type check the input to the solid. Defaults to `dagster_pandas.DataFrame`. runtime_method_type (str): how GE should interperet the solid input. One of ("batch_data", "path", "query"). Defaults to "batch_data", which will interperet the input as an in-memory object. extra_kwargs (Optional[dict]): adds extra kwargs to the invocation of `ge_data_context`'s `get_validator` method. If not set, input will be: { "datasource_name": datasource_name, "data_connector_name": data_connector_name, "data_asset_name": data_asset_name, "runtime_parameters": { "<runtime_method_type>": <solid input> }, "batch_identifiers": batch_identifiers, "expectation_suite_name": suite_name, } Returns: A solid that takes in a set of data and yields both an expectation with relevant metadata and an output with all the metadata (for user processing) """ return core_ge_validation_factory_v3( solid, "solid", name, datasource_name, data_connector_name, data_asset_name, suite_name, batch_identifiers, input_dagster_type, runtime_method_type, extra_kwargs, ) def ge_validation_op_factory_v3( name, datasource_name, data_connector_name, data_asset_name, suite_name, batch_identifiers: dict, input_dagster_type=DataFrame, runtime_method_type="batch_data", extra_kwargs=None, ): """Generates ops for interacting with GE (v3 API) Args: name (str): the name of the op datasource_name (str): the name of your DataSource, see your great_expectations.yml data_connector_name (str): the name of the data connector for this datasource. This should point to a RuntimeDataConnector. For information on how to set this up, see: https://docs.greatexpectations.io/docs/guides/connecting_to_your_data/how_to_create_a_batch_of_data_from_an_in_memory_spark_or_pandas_dataframe data_asset_name (str): the name of the data asset that this op will be validating. suite_name (str): the name of your expectation suite, see your great_expectations.yml batch_identifier_fn (dict): A dicitonary of batch identifiers to uniquely identify this batch of data. To learn more about batch identifiers, see: https://docs.greatexpectations.io/docs/reference/datasources#batches. input_dagster_type (DagsterType): the Dagster type used to type check the input to the op. Defaults to `dagster_pandas.DataFrame`. runtime_method_type (str): how GE should interperet the solid input. One of ("batch_data", "path", "query"). Defaults to "batch_data", which will interperet the input as an in-memory object. extra_kwargs (Optional[dict]): adds extra kwargs to the invocation of `ge_data_context`'s `get_validator` method. If not set, input will be: { "datasource_name": datasource_name, "data_connector_name": data_connector_name, "data_asset_name": data_asset_name, "runtime_parameters": { "<runtime_method_type>": <op input> }, "batch_identifiers": batch_identifiers, "expectation_suite_name": suite_name, } Returns: An op that takes in a set of data and yields both an expectation with relevant metadata and an output with all the metadata (for user processing) """ return core_ge_validation_factory_v3( op, "op", name, datasource_name, data_connector_name, data_asset_name, suite_name, batch_identifiers, input_dagster_type, runtime_method_type, extra_kwargs, )