Source code for dagster_gcp.bigquery.ops

import hashlib

from dagster_pandas import DataFrame
from google.cloud.bigquery.job import LoadJobConfig, QueryJobConfig
from google.cloud.bigquery.table import EncryptionConfiguration, TimePartitioning

from dagster import InputDefinition, List, Nothing, OutputDefinition, check, op, solid

from .configs import (
    define_bigquery_create_dataset_config,
    define_bigquery_delete_dataset_config,
    define_bigquery_load_config,
    define_bigquery_query_config,
)
from .types import BigQueryLoadSource

_START = "start"


def _preprocess_config(cfg):
    destination_encryption_configuration = cfg.get("destination_encryption_configuration")
    time_partitioning = cfg.get("time_partitioning")

    if destination_encryption_configuration is not None:
        cfg["destination_encryption_configuration"] = EncryptionConfiguration(
            kms_key_name=destination_encryption_configuration
        )

    if time_partitioning is not None:
        cfg["time_partitioning"] = TimePartitioning(**time_partitioning)

    return cfg


def _bq_core_command(dagster_decorator, decorator_name, sql_queries):
    sql_queries = check.list_param(sql_queries, "sql queries", of_type=str)
    m = hashlib.sha1()
    for query in sql_queries:
        m.update(query.encode("utf-8"))
    hash_str = m.hexdigest()[:10]
    name = f"bq_{decorator_name}_{hash_str}"

    @dagster_decorator(
        name=name,
        input_defs=[InputDefinition(_START, Nothing)],
        output_defs=[OutputDefinition(List[DataFrame])],
        config_schema=define_bigquery_query_config(),
        required_resource_keys={"bigquery"},
        tags={"kind": "sql", "sql": "\n".join(sql_queries)},
    )
    def _bq_fn(context):  # pylint: disable=unused-argument
        query_job_config = _preprocess_config(context.op_config.get("query_job_config", {}))

        # Retrieve results as pandas DataFrames
        results = []
        for sql_query in sql_queries:
            # We need to construct a new QueryJobConfig for each query.
            # See: https://bit.ly/2VjD6sl
            cfg = QueryJobConfig(**query_job_config) if query_job_config else None
            context.log.info(
                "executing query %s with config: %s"
                % (sql_query, cfg.to_api_repr() if cfg else "(no config provided)")
            )
            results.append(
                context.resources.bigquery.query(sql_query, job_config=cfg).to_dataframe()
            )

        return results

    return _bq_fn


[docs]def bq_solid_for_queries(sql_queries): """ Executes BigQuery SQL queries. Expects a BQ client to be provisioned in resources as context.resources.bigquery. """ return _bq_core_command(solid, "solid", sql_queries)
[docs]def bq_op_for_queries(sql_queries): """ Executes BigQuery SQL queries. Expects a BQ client to be provisioned in resources as context.resources.bigquery. """ return _bq_core_command(op, "op", sql_queries)
BIGQUERY_LOAD_CONFIG = define_bigquery_load_config()
[docs]@op( input_defs=[InputDefinition("paths", List[str])], output_defs=[OutputDefinition(Nothing)], config_schema=BIGQUERY_LOAD_CONFIG, required_resource_keys={"bigquery"}, ) def import_gcs_paths_to_bq(context, paths): return _execute_load_in_source(context, paths, BigQueryLoadSource.GCS)
[docs]@op( input_defs=[InputDefinition("df", DataFrame)], output_defs=[OutputDefinition(Nothing)], config_schema=BIGQUERY_LOAD_CONFIG, required_resource_keys={"bigquery"}, ) def import_df_to_bq(context, df): return _execute_load_in_source(context, df, BigQueryLoadSource.DataFrame)
[docs]@op( input_defs=[InputDefinition("path", str)], output_defs=[OutputDefinition(Nothing)], config_schema=BIGQUERY_LOAD_CONFIG, required_resource_keys={"bigquery"}, ) def import_file_to_bq(context, path): return _execute_load_in_source(context, path, BigQueryLoadSource.File)
def _execute_load_in_source(context, source, source_name): destination = context.op_config.get("destination") load_job_config = _preprocess_config(context.op_config.get("load_job_config", {})) cfg = LoadJobConfig(**load_job_config) if load_job_config else None context.log.info( "executing BQ load with config: %s for source %s" % (cfg.to_api_repr() if cfg else "(no config provided)", source) ) if source_name == BigQueryLoadSource.DataFrame: context.resources.bigquery.load_table_from_dataframe( source, destination, job_config=cfg ).result() # Load from file. See: https://cloud.google.com/bigquery/docs/loading-data-local elif source_name == BigQueryLoadSource.File: with open(source, "rb") as file_obj: context.resources.bigquery.load_table_from_file( file_obj, destination, job_config=cfg ).result() # Load from GCS. See: https://cloud.google.com/bigquery/docs/loading-data-cloud-storage elif source_name == BigQueryLoadSource.GCS: context.resources.bigquery.load_table_from_uri(source, destination, job_config=cfg).result()
[docs]@op( input_defs=[InputDefinition(_START, Nothing)], config_schema=define_bigquery_create_dataset_config(), required_resource_keys={"bigquery"}, ) def bq_create_dataset(context): """BigQuery Create Dataset. This op encapsulates creating a BigQuery dataset. Expects a BQ client to be provisioned in resources as context.resources.bigquery. """ (dataset, exists_ok) = [context.op_config.get(k) for k in ("dataset", "exists_ok")] context.log.info("executing BQ create_dataset for dataset %s" % (dataset)) context.resources.bigquery.create_dataset(dataset, exists_ok)
[docs]@op( input_defs=[InputDefinition(_START, Nothing)], config_schema=define_bigquery_delete_dataset_config(), required_resource_keys={"bigquery"}, ) def bq_delete_dataset(context): """BigQuery Delete Dataset. This op encapsulates deleting a BigQuery dataset. Expects a BQ client to be provisioned in resources as context.resources.bigquery. """ (dataset, delete_contents, not_found_ok) = [ context.op_config.get(k) for k in ("dataset", "delete_contents", "not_found_ok") ] context.log.info("executing BQ delete_dataset for dataset %s" % dataset) context.resources.bigquery.delete_dataset( dataset, delete_contents=delete_contents, not_found_ok=not_found_ok )