Source code for dagster_spark.ops
from dagster import InputDefinition, Nothing, OutputDefinition, check, op, solid
from .configs import define_spark_config
def create_spark_solid(
name, main_class, description=None, required_resource_keys=frozenset(["spark"])
):
return core_create_spark(
dagster_decorator=solid,
name=name,
main_class=main_class,
description=description,
required_resource_keys=required_resource_keys,
)
[docs]def create_spark_op(
name, main_class, description=None, required_resource_keys=frozenset(["spark"])
):
return core_create_spark(
dagster_decorator=op,
name=name,
main_class=main_class,
description=description,
required_resource_keys=required_resource_keys,
)
def core_create_spark(
dagster_decorator,
name,
main_class,
description=None,
required_resource_keys=frozenset(["spark"]),
):
check.str_param(name, "name")
check.str_param(main_class, "main_class")
check.opt_str_param(description, "description", "A parameterized Spark job.")
check.set_param(required_resource_keys, "required_resource_keys")
@dagster_decorator(
name=name,
description=description,
config_schema=define_spark_config(),
input_defs=[InputDefinition("start", Nothing)],
output_defs=[OutputDefinition(Nothing)],
tags={"kind": "spark", "main_class": main_class},
required_resource_keys=required_resource_keys,
)
def spark_solid(context): # pylint: disable=unused-argument
context.resources.spark.run_spark_job(context.solid_config, main_class)
return spark_solid