dagster.
JobDefinition
(mode_def, graph_def, name=None, description=None, preset_defs=None, tags=None, hook_defs=None, op_retry_policy=None, version_strategy=None, _op_selection_data=None)[source]execute_in_process
(run_config=None, instance=None, partition_key=None, raise_on_error=True, op_selection=None)[source]Execute the Job in-process, gathering results in-memory.
The executor_def on the Job will be ignored, and replaced with the in-process executor. If using the default io_manager, it will switch from filesystem to in-memory.
(Optional[Dict[str (run_config) – The configuration for the run
Any]] – The configuration for the run
instance (Optional[DagsterInstance]) – The instance to execute against, an ephemeral one will be used if none provided.
partition_key – (Optional[str]) The string partition key that specifies the run config to execute. Can only be used to select run config for jobs with partitioned config.
raise_on_error (Optional[bool]) – Whether or not to raise exceptions when they occur.
Defaults to True
.
op_selection (Optional[List[str]]) – A list of op selection queries (including single op
names) to execute. For example:
* ['some_op']
: selects some_op
itself.
* ['*some_op']
: select some_op
and all its ancestors (upstream dependencies).
* ['*some_op+++']
: select some_op
, all its ancestors, and its descendants
(downstream dependencies) within 3 levels down.
* ['*some_op', 'other_op_a', 'other_op_b+']
: select some_op
and all its
ancestors, other_op_a
itself, and other_op_b
and its direct child ops.
dagster.
GraphDefinition
(name, description=None, node_defs=None, dependencies=None, input_mappings=None, output_mappings=None, config=None, tags=None, **kwargs)[source]Defines a Dagster graph.
A graph is made up of
Nodes, which can either be an op (the functional unit of computation), or another graph.
Dependencies, which determine how the values produced by nodes as outputs flow from one node to another. This tells Dagster how to arrange nodes into a directed, acyclic graph (DAG) of compute.
End users should prefer the @graph
decorator. GraphDefinition is generally
intended to be used by framework authors or for programatically generated graphs.
name (str) – The name of the graph. Must be unique within any GraphDefinition
or JobDefinition
containing the graph.
description (Optional[str]) – A human-readable description of the pipeline.
node_defs (Optional[List[NodeDefinition]]) – The set of ops / graphs used in this graph.
dependencies (Optional[Dict[Union[str, NodeInvocation], Dict[str, DependencyDefinition]]]) – A structure that declares the dependencies of each op’s inputs on the outputs of other
ops in the graph. Keys of the top level dict are either the string names of ops in the
graph or, in the case of aliased ops, NodeInvocations
.
Values of the top level dict are themselves dicts, which map input names belonging to
the op or aliased op to DependencyDefinitions
.
input_mappings (Optional[List[InputMapping]]) – Defines the inputs to the nested graph, and how they map to the inputs of its constituent ops.
output_mappings (Optional[List[OutputMapping]]) – Defines the outputs of the nested graph, and how they map from the outputs of its constituent ops.
config (Optional[ConfigMapping]) – Defines the config of the graph, and how its schema maps to the config of its constituent ops.
tags (Optional[Dict[str, Any]]) – Arbitrary metadata for any execution of the graph. Values that are not strings will be json encoded and must meet the criteria that json.loads(json.dumps(value)) == value. These tag values may be overwritten by tag values provided at invocation time.
Examples
@op
def return_one():
return 1
@op
def add_one(num):
return num + 1
graph_def = GraphDefinition(
name='basic',
node_defs=[return_one, add_one],
dependencies={'add_one': {'num': DependencyDefinition('return_one')}},
)
execute_in_process
(run_config=None, instance=None, resources=None, raise_on_error=True, op_selection=None)[source]Execute this graph in-process, collecting results in-memory.
run_config (Optional[Dict[str, Any]]) – Run config to provide to execution. The configuration for the underlying graph should exist under the “ops” key.
instance (Optional[DagsterInstance]) – The instance to execute against, an ephemeral one will be used if none provided.
resources (Optional[Dict[str, Any]]) – The resources needed if any are required. Can provide resource instances directly, or resource definitions.
raise_on_error (Optional[bool]) – Whether or not to raise exceptions when they occur.
Defaults to True
.
op_selection (Optional[List[str]]) – A list of op selection queries (including single op
names) to execute. For example:
* ['some_op']
: selects some_op
itself.
* ['*some_op']
: select some_op
and all its ancestors (upstream dependencies).
* ['*some_op+++']
: select some_op
, all its ancestors, and its descendants
(downstream dependencies) within 3 levels down.
* ['*some_op', 'other_op_a', 'other_op_b+']
: select some_op
and all its
ancestors, other_op_a
itself, and other_op_b
and its direct child ops.
dagster.
ExecuteInProcessResult
(node_def, all_events, dagster_run, output_capture)[source]¶all_events
¶All dagster events emitted during in-process execution.
List[DagsterEvent]
all_node_events
¶All dagster events from the in-process execution.
List[DagsterEvent]
dagster_run
¶the DagsterRun object for the completed execution.
DagsterRun
events_for_node
(node_name)[source]¶Retrieves all dagster events for a specific node.
node_name (str) – The name of the node for which outputs should be retrieved.
A list of all dagster events associated with provided node name.
List[DagsterEvent]
get_job_failure_event
()[source]¶Returns a DagsterEvent with type DagsterEventType.PIPELINE_FAILURE if it ocurred during execution
get_job_success_event
()[source]¶Returns a DagsterEvent with type DagsterEventType.PIPELINE_SUCCESS if it ocurred during execution
output_for_node
(node_str, output_name='result')[source]¶Retrieves output value with a particular name from the in-process run of the job.
node_str (str) – Name of the op/graph whose output should be retrieved. If the intended graph/op is nested within another graph, the syntax is outer_graph.inner_node.
output_name (Optional[str]) – Name of the output on the op/graph to retrieve. Defaults to result, the default output name in dagster.
The value of the retrieved output.
Any
output_value
(output_name='result')[source]¶Retrieves output of top-level job, if an output is returned.
If the top-level job has no output, calling this method will result in a DagsterInvariantViolationError.
output_name (Optional[str]) – The name of the output to retrieve. Defaults to result, the default output name in dagster.
The value of the retrieved output.
Any
dagster.
DagsterEvent
(event_type_value, pipeline_name, step_handle=None, solid_handle=None, step_kind_value=None, logging_tags=None, event_specific_data=None, message=None, pid=None, step_key=None)[source]¶Events yielded by solid and pipeline execution.
Users should not instantiate this class.
solid_handle
¶NodeHandle
event_specific_data
¶Type must correspond to event_type_value.
Any
event_type
¶The type of this event.
dagster.
DagsterEventType
(value)[source]¶The types of events that may be yielded by solid and pipeline execution.
ALERT_START
= 'ALERT_START'¶ALERT_SUCCESS
= 'ALERT_SUCCESS'¶ASSET_MATERIALIZATION
= 'ASSET_MATERIALIZATION'¶ASSET_OBSERVATION
= 'ASSET_OBSERVATION'¶ASSET_STORE_OPERATION
= 'ASSET_STORE_OPERATION'¶ENGINE_EVENT
= 'ENGINE_EVENT'¶HANDLED_OUTPUT
= 'HANDLED_OUTPUT'¶HOOK_COMPLETED
= 'HOOK_COMPLETED'¶HOOK_ERRORED
= 'HOOK_ERRORED'¶HOOK_SKIPPED
= 'HOOK_SKIPPED'¶LOADED_INPUT
= 'LOADED_INPUT'¶LOGS_CAPTURED
= 'LOGS_CAPTURED'¶OBJECT_STORE_OPERATION
= 'OBJECT_STORE_OPERATION'¶PIPELINE_CANCELED
= 'PIPELINE_CANCELED'¶PIPELINE_CANCELING
= 'PIPELINE_CANCELING'¶PIPELINE_DEQUEUED
= 'PIPELINE_DEQUEUED'¶PIPELINE_ENQUEUED
= 'PIPELINE_ENQUEUED'¶PIPELINE_FAILURE
= 'PIPELINE_FAILURE'¶PIPELINE_START
= 'PIPELINE_START'¶PIPELINE_STARTING
= 'PIPELINE_STARTING'¶PIPELINE_SUCCESS
= 'PIPELINE_SUCCESS'¶RUN_CANCELED
= 'PIPELINE_CANCELED'¶RUN_CANCELING
= 'PIPELINE_CANCELING'¶RUN_DEQUEUED
= 'PIPELINE_DEQUEUED'¶RUN_ENQUEUED
= 'PIPELINE_ENQUEUED'¶RUN_FAILURE
= 'PIPELINE_FAILURE'¶RUN_START
= 'PIPELINE_START'¶RUN_STARTING
= 'PIPELINE_STARTING'¶RUN_SUCCESS
= 'PIPELINE_SUCCESS'¶STEP_EXPECTATION_RESULT
= 'STEP_EXPECTATION_RESULT'¶STEP_FAILURE
= 'STEP_FAILURE'¶STEP_INPUT
= 'STEP_INPUT'¶STEP_OUTPUT
= 'STEP_OUTPUT'¶STEP_RESTARTED
= 'STEP_RESTARTED'¶STEP_SKIPPED
= 'STEP_SKIPPED'¶STEP_START
= 'STEP_START'¶STEP_SUCCESS
= 'STEP_SUCCESS'¶STEP_UP_FOR_RETRY
= 'STEP_UP_FOR_RETRY'¶dagster.
reconstructable
(target)[source]¶Create a ReconstructablePipeline
from a
function that returns a PipelineDefinition
/JobDefinition
,
or a function decorated with @pipeline
/@job
.
When your pipeline/job must cross process boundaries, e.g., for execution on multiple nodes or
in different systems (like dagstermill
), Dagster must know how to reconstruct the pipeline/job
on the other side of the process boundary.
Passing a job created with ~dagster.GraphDefinition.to_job
to reconstructable()
,
requires you to wrap that job’s definition in a module-scoped function, and pass that function
instead:
from dagster import graph, reconstructable
@graph
def my_graph():
...
def define_my_job():
return my_graph.to_job()
reconstructable(define_my_job)
This function implements a very conservative strategy for reconstruction, so that its behavior is easy to predict, but as a consequence it is not able to reconstruct certain kinds of pipelines or jobs, such as those defined by lambdas, in nested scopes (e.g., dynamically within a method call), or in interactive environments such as the Python REPL or Jupyter notebooks.
If you need to reconstruct objects constructed in these ways, you should use
build_reconstructable_job()
instead, which allows you to
specify your own reconstruction strategy.
Examples:
from dagster import job, reconstructable
@job
def foo_job():
...
reconstructable_foo_job = reconstructable(foo_job)
@graph
def foo():
...
def make_bar_job():
return foo.to_job()
reconstructable_bar_job = reconstructable(make_bar_job)
dagster.
in_process_executor
ExecutorDefinition[source]¶The in-process executor executes all steps in a single process.
For legacy pipelines, this will be the default executor. To select it explicitly, include the following top-level fragment in config:
execution:
in_process:
Execution priority can be configured using the dagster/priority
tag via solid/op metadata,
where the higher the number the higher the priority. 0 is the default and both positive
and negative numbers can be used.
dagster.
multiprocess_executor
ExecutorDefinition[source]¶The multiprocess executor executes each step in an individual process.
Any job that does not specify custom executors will use the multiprocess_executor by default. For jobs or legacy pipelines, to configure the multiprocess executor, include a fragment such as the following in your config:
execution:
multiprocess:
config:
max_concurrent: 4
The max_concurrent
arg is optional and tells the execution engine how many processes may run
concurrently. By default, or if you set max_concurrent
to be 0, this is the return value of
multiprocessing.cpu_count()
.
Execution priority can be configured using the dagster/priority
tag via solid/op metadata,
where the higher the number the higher the priority. 0 is the default and both positive
and negative numbers can be used.
dagster.
OpExecutionContext
(step_execution_context)[source]¶add_output_metadata
(metadata, output_name=None, mapping_key=None)¶Add metadata to one of the outputs of an op.
This can only be used once per output in the body of an op. Using this method with the same output_name more than once within an op will result in an error.
Examples:
from dagster import Out, op
from typing import Tuple
@op
def add_metadata(context):
context.add_output_metadata({"foo", "bar"})
return 5 # Since the default output is called "result", metadata will be attached to the output "result".
@op(out={"a": Out(), "b": Out()})
def add_metadata_two_outputs(context) -> Tuple[str, int]:
context.add_output_metadata({"foo": "bar"}, output_name="b")
context.add_output_metadata({"baz": "bat"}, output_name="a")
return ("dog", 5)
consume_events
()¶Pops and yields all user-generated events that have been recorded from this context.
If consume_events has not yet been called, this will yield all logged events since the beginning of the op’s computation. If consume_events has been called, it will yield all events since the last time consume_events was called. Designed for internal use. Users should never need to invoke this method.
get_mapping_key
()¶Which mapping_key this execution is for if downstream of a DynamicOutput, otherwise None.
get_tag
(key)¶Get a logging tag.
key (tag) – The tag to get.
The value of the tag, if present.
Optional[str]
has_partition_key
¶Whether the current run is a partitioned run
has_tag
(key)¶Check if a logging tag is set.
instance
¶The current Dagster instance
log
¶The log manager available in the execution context.
log_event
(event)¶Log an AssetMaterialization, AssetObservation, or ExpectationResult from within the body of an op.
Events logged with this method will appear in the list of DagsterEvents, as well as the event log.
event (Union[AssetMaterialization, Materialization, AssetObservation, ExpectationResult]) – The event to log.
Examples:
from dagster import op, AssetMaterialization
@op
def log_materialization(context):
context.log_event(AssetMaterialization("foo"))
mode_def
¶The mode of the current execution.
output_asset_partition_key
(output_name='result')¶Returns the asset partition key for the given output. Defaults to “result”, which is the name of the default output.
output_asset_partitions_time_window
(output_name='result')¶The time window for the partitions of the output asset.
Raises an error if either of the following are true: - The output asset has no partitioning. - The output asset is not partitioned with a TimeWindowPartitionsDefinition.
partition_key
¶The partition key for the current run.
Raises an error if the current run is not a partitioned run.
pdb
¶Gives access to pdb debugging from within the op.
Example:
@op
def debug(context):
context.pdb.set_trace()
pipeline_def
¶The currently executing pipeline.
pipeline_run
¶The current pipeline run
resources
¶The currently available resources.
Resources
retry_number
¶Which retry attempt is currently executing i.e. 0 for initial attempt, 1 for first retry, etc.
solid_config
¶The parsed config specific to this solid.
solid_def
¶The current solid definition.
step_launcher
¶The current step launcher, if any.
Optional[StepLauncher]
dagster.
build_op_context
(resources=None, op_config=None, resources_config=None, instance=None, config=None, partition_key=None)[source]¶Builds op execution context from provided parameters.
op
is currently built on top of solid, and thus this function creates a SolidExecutionContext.
build_op_context
can be used as either a function or context manager. If there is a
provided resource that is a context manager, then build_op_context
must be used as a
context manager. This function can be used to provide the context argument when directly
invoking a op.
resources (Optional[Dict[str, Any]]) – The resources to provide to the context. These can be either values or resource definitions.
config (Optional[Any]) – The op config to provide to the context.
instance (Optional[DagsterInstance]) – The dagster instance configured for the context. Defaults to DagsterInstance.ephemeral().
Examples
context = build_op_context()
op_to_invoke(context)
with build_op_context(resources={"foo": context_manager_resource}) as context:
op_to_invoke(context)
dagster.
TypeCheckContext
(run_id, log_manager, scoped_resources_builder, dagster_type)[source]¶The context
object available to a type check function on a DagsterType.
log
¶Centralized log dispatch from user code.
resources
¶An object whose attributes contain the resources available to this op.
Any
dagster.
validate_run_config
(job_def=None, run_config=None, mode=None, pipeline_def=None)[source]¶Function to validate a provided run config blob against a given job. For legacy APIs, a pipeline/mode can also be passed in.
If validation is successful, this function will return a dictionary representation of the validated config actually used during execution.
job_def (Union[PipelineDefinition, JobDefinition]) – The job definition to validate run config against
run_config (Optional[Dict[str, Any]]) – The run config to validate
mode (str) – The mode of the pipeline to validate against (different modes may require different config)
pipeline_def (PipelineDefinition) – The pipeline definition to validate run config against.
A dictionary representation of the validated config.
Dict[str, Any]
The
run_config
used for jobs has the following schema:{ # configuration for execution, required if executors require config execution: { # the name of one, and only one available executor, typically 'in_process' or 'multiprocess' __executor_name__: { # executor-specific config, if required or permitted config: { ... } } }, # configuration for loggers, required if loggers require config loggers: { # the name of an available logger __logger_name__: { # logger-specific config, if required or permitted config: { ... } }, ... }, # configuration for resources, required if resources require config resources: { # the name of a resource __resource_name__: { # resource-specific config, if required or permitted config: { ... } }, ... }, # configuration for underlying ops, required if ops require config ops: { # these keys align with the names of the ops, or their alias in this job __op_name__: { # pass any data that was defined via config_field config: ..., # configurably specify input values, keyed by input name inputs: { __input_name__: { # if an dagster_type_loader is specified, that schema must be satisfied here; # scalar, built-in types will generally allow their values to be specified directly: value: ... } }, } }, }