Source code for dagster_gcp.dataproc.ops
from dagster import Bool, Field, op, solid
from dagster.seven import json
from .configs import define_dataproc_submit_job_config
DATAPROC_CONFIG_SCHEMA = {
"job_config": define_dataproc_submit_job_config(),
"job_scoped_cluster": Field(
Bool,
description="whether to create a cluster or use an existing cluster",
is_required=False,
default_value=True,
),
}
def _dataproc_compute(context):
job_config = context.solid_config["job_config"]
context.log.info("submitting job with config: %s" % str(json.dumps(job_config)))
if context.solid_config["job_scoped_cluster"]:
# Cluster context manager, creates and then deletes cluster
with context.resources.dataproc.cluster_context_manager() as cluster:
# Submit the job specified by this solid to the cluster defined by the associated resource
result = cluster.submit_job(job_config)
job_id = result["reference"]["jobId"]
context.log.info("Submitted job ID {}".format(job_id))
cluster.wait_for_job(job_id)
else:
# Submit to an existing cluster
# Submit the job specified by this solid to the cluster defined by the associated resource
result = context.resources.dataproc.submit_job(job_config)
job_id = result["reference"]["jobId"]
context.log.info("Submitted job ID {}".format(job_id))
context.resources.dataproc.wait_for_job(job_id)
[docs]@solid(required_resource_keys={"dataproc"}, config_schema=DATAPROC_CONFIG_SCHEMA)
def dataproc_solid(context):
return _dataproc_compute(context)
[docs]@op(required_resource_keys={"dataproc"}, config_schema=DATAPROC_CONFIG_SCHEMA)
def dataproc_op(context):
return _dataproc_compute(context)