Airflow (dagster-airflow)

dagster_airflow.make_airflow_dag(module_name, job_name, run_config=None, mode=None, instance=None, dag_id=None, dag_description=None, dag_kwargs=None, op_kwargs=None, pipeline_name=None)[source]

Construct an Airflow DAG corresponding to a given Dagster job/pipeline.

Tasks in the resulting DAG will execute the Dagster logic they encapsulate as a Python callable, run by an underlying PythonOperator. As a consequence, both dagster, any Python dependencies required by your solid logic, and the module containing your pipeline definition must be available in the Python environment within which your Airflow tasks execute. If you cannot install requirements into this environment, or you are looking for a containerized solution to provide better isolation, see instead make_airflow_dag_containerized().

This function should be invoked in an Airflow DAG definition file, such as that created by an invocation of the dagster-airflow scaffold CLI tool.

Parameters
  • module_name (str) – The name of the importable module in which the pipeline/job definition can be found.

  • job_name (str) – The name of the job definition.

  • run_config (Optional[dict]) – The config, if any, with which to compile the pipeline/job to an execution plan, as a Python dict.

  • mode (Optional[str]) – The mode in which to execute the pipeline.

  • instance (Optional[DagsterInstance]) – The Dagster instance to use to execute the pipeline/job.

  • dag_id (Optional[str]) – The id to use for the compiled Airflow DAG (passed through to DAG).

  • dag_description (Optional[str]) – The description to use for the compiled Airflow DAG (passed through to DAG)

  • dag_kwargs (Optional[dict]) – Any additional kwargs to pass to the Airflow DAG constructor, including default_args.

  • op_kwargs (Optional[dict]) – Any additional kwargs to pass to the underlying Airflow operator (a subclass of PythonOperator).

  • pipeline_name (str) – (legacy) The name of the pipeline definition.

Returns

The generated Airflow DAG, and a list of its constituent tasks.

Return type

(airflow.models.DAG, List[airflow.models.BaseOperator])

dagster_airflow.make_airflow_dag_for_operator(recon_repo, job_name, operator, run_config=None, mode=None, dag_id=None, dag_description=None, dag_kwargs=None, op_kwargs=None, pipeline_name=None)[source]

Construct an Airflow DAG corresponding to a given Dagster job/pipeline and custom operator.

Custom operator template

Tasks in the resulting DAG will execute the Dagster logic they encapsulate run by the given Operator BaseOperator. If you are looking for a containerized solution to provide better isolation, see instead make_airflow_dag_containerized().

This function should be invoked in an Airflow DAG definition file, such as that created by an invocation of the dagster-airflow scaffold CLI tool.

Parameters
  • recon_repo (dagster.ReconstructableRepository) – reference to a Dagster RepositoryDefinition that can be reconstructed in another process

  • job_name (str) – The name of the job definition.

  • operator (type) – The operator to use. Must be a class that inherits from BaseOperator

  • run_config (Optional[dict]) – The config, if any, with which to compile the pipeline to an execution plan, as a Python dict.

  • mode (Optional[str]) – The mode in which to execute the pipeline.

  • instance (Optional[DagsterInstance]) – The Dagster instance to use to execute the pipeline.

  • dag_id (Optional[str]) – The id to use for the compiled Airflow DAG (passed through to DAG).

  • dag_description (Optional[str]) – The description to use for the compiled Airflow DAG (passed through to DAG)

  • dag_kwargs (Optional[dict]) – Any additional kwargs to pass to the Airflow DAG constructor, including default_args.

  • op_kwargs (Optional[dict]) – Any additional kwargs to pass to the underlying Airflow operator.

  • pipeline_name (str) – (legacy) The name of the pipeline definition.

Returns

The generated Airflow DAG, and a list of its constituent tasks.

Return type

(airflow.models.DAG, List[airflow.models.BaseOperator])

dagster_airflow.make_airflow_dag_containerized(module_name, job_name, image, run_config=None, mode=None, dag_id=None, dag_description=None, dag_kwargs=None, op_kwargs=None, pipeline_name=None)[source]

Construct a containerized Airflow DAG corresponding to a given Dagster job/pipeline.

Tasks in the resulting DAG will execute the Dagster logic they encapsulate using a subclass of DockerOperator. As a consequence, both dagster, any Python dependencies required by your solid logic, and the module containing your pipeline definition must be available in the container spun up by this operator. Typically you’ll want to install these requirements onto the image you’re using.

This function should be invoked in an Airflow DAG definition file, such as that created by an invocation of the dagster-airflow scaffold CLI tool.

Parameters
  • module_name (str) – The name of the importable module in which the pipeline/job definition can be found.

  • job_name (str) – The name of the job definition.

  • image (str) – The name of the Docker image to use for execution (passed through to DockerOperator).

  • run_config (Optional[dict]) – The config, if any, with which to compile the pipeline/job to an execution plan, as a Python dict.

  • mode (Optional[str]) – The mode in which to execute the pipeline.

  • dag_id (Optional[str]) – The id to use for the compiled Airflow DAG (passed through to DAG).

  • dag_description (Optional[str]) – The description to use for the compiled Airflow DAG (passed through to DAG)

  • dag_kwargs (Optional[dict]) – Any additional kwargs to pass to the Airflow DAG constructor, including default_args.

  • op_kwargs (Optional[dict]) – Any additional kwargs to pass to the underlying Airflow operator (a subclass of DockerOperator).

  • pipeline_name (str) – (legacy) The name of the pipeline definition.

Returns

The generated Airflow DAG, and a list of its constituent tasks.

Return type

(airflow.models.DAG, List[airflow.models.BaseOperator])

dagster_airflow.make_dagster_job_from_airflow_dag(dag, tags=None, use_airflow_template_context=False, unique_id=None)[source]

Construct a Dagster job corresponding to a given Airflow DAG.

Tasks in the resulting job will execute the execute() method on the corresponding Airflow Operator. Dagster, any dependencies required by Airflow Operators, and the module containing your DAG definition must be available in the Python environment within which your Dagster solids execute.

To set Airflow’s execution_date for use with Airflow Operator’s execute() methods, either:

  1. (Best for ad hoc runs) Execute job directly. This will set execution_date to the

    time (in UTC) of the run.

  2. Add {'airflow_execution_date': utc_date_string} to the job tags. This will override

    behavior from (1).

    my_dagster_job = make_dagster_job_from_airflow_dag(
            dag=dag,
            tags={'airflow_execution_date': utc_execution_date_str}
    )
    my_dagster_job.execute_in_process()
    
  3. (Recommended) Add {'airflow_execution_date': utc_date_string} to the run tags,

    such as in the Dagit UI. This will override behavior from (1) and (2)

We apply normalized_name() to the dag id and task ids when generating job name and op names to ensure that names conform to Dagster’s naming conventions.

Parameters
  • dag (DAG) – The Airflow DAG to compile into a Dagster job

  • tags (Dict[str, Field]) – Job tags. Optionally include tags={‘airflow_execution_date’: utc_date_string} to specify execution_date used within execution of Airflow Operators.

  • use_airflow_template_context (bool) – If True, will call get_template_context() on the Airflow TaskInstance model which requires and modifies the DagRun table. (default: False)

  • unique_id (int) – If not None, this id will be postpended to generated op names. Used by framework authors to enforce unique op names within a repo.

Returns

The generated Dagster job

Return type

JobDefinition

dagster_airflow.make_dagster_repo_from_airflow_dags_path(dag_path, repo_name, safe_mode=True, store_serialized_dags=False, use_airflow_template_context=False)[source]

Construct a Dagster repository corresponding to Airflow DAGs in dag_path.

DagBag.get_dag() dependency requires Airflow DB to be initialized.

Usage:

Create make_dagster_repo.py:

from dagster_airflow.dagster_pipeline_factory import make_dagster_repo_from_airflow_dags_path

def make_repo_from_dir():
    return make_dagster_repo_from_airflow_dags_path(
        '/path/to/dags/', 'my_repo_name'
    )

Use RepositoryDefinition as usual, for example: dagit -f path/to/make_dagster_repo.py -n make_repo_from_dir

Parameters
  • dag_path (str) – Path to directory or file that contains Airflow Dags

  • repo_name (str) – Name for generated RepositoryDefinition

  • include_examples (bool) – True to include Airflow’s example DAGs. (default: False)

  • safe_mode (bool) – True to use Airflow’s default heuristic to find files that contain DAGs (ie find files that contain both b’DAG’ and b’airflow’) (default: True)

  • store_serialized_dags (bool) – True to read Airflow DAGS from Airflow DB. False to read DAGS from Python files. (default: False)

  • use_airflow_template_context (bool) – If True, will call get_template_context() on the Airflow TaskInstance model which requires and modifies the DagRun table. (default: False)

Returns

RepositoryDefinition

dagster_airflow.make_dagster_repo_from_airflow_dag_bag(dag_bag, repo_name, refresh_from_airflow_db=False, use_airflow_template_context=False)[source]

Construct a Dagster repository corresponding to Airflow DAGs in DagBag.

Usage:
Create make_dagster_repo.py:

from dagster_airflow.dagster_pipeline_factory import make_dagster_repo_from_airflow_dag_bag from airflow_home import my_dag_bag

def make_repo_from_dag_bag():

return make_dagster_repo_from_airflow_dag_bag(my_dag_bag, ‘my_repo_name’)

Use RepositoryDefinition as usual, for example:

dagit -f path/to/make_dagster_repo.py -n make_repo_from_dag_bag

Parameters
  • dag_path (str) – Path to directory or file that contains Airflow Dags

  • repo_name (str) – Name for generated RepositoryDefinition

  • refresh_from_airflow_db (bool) – If True, will refresh DAG if expired via DagBag.get_dag(), which requires access to initialized Airflow DB. If False (recommended), gets dag from DagBag’s dags dict without depending on Airflow DB. (default: False)

  • use_airflow_template_context (bool) – If True, will call get_template_context() on the Airflow TaskInstance model which requires and modifies the DagRun table. (default: False)

Returns

RepositoryDefinition

dagster_airflow.make_dagster_repo_from_airflow_example_dags(repo_name='airflow_example_dags_repo')[source]

Construct a Dagster repository for Airflow’s example DAGs.

Execution of the following Airflow example DAGs is not currently supported:

‘example_external_task_marker_child’, ‘example_pig_operator’, ‘example_skip_dag’, ‘example_trigger_target_dag’, ‘example_xcom’, ‘test_utils’,

Usage:

Create make_dagster_repo.py:

from dagster_airflow.dagster_pipeline_factory import make_dagster_repo_from_airflow_example_dags

def make_airflow_example_dags():

return make_dagster_repo_from_airflow_example_dags()

Use RepositoryDefinition as usual, for example:

dagit -f path/to/make_dagster_repo.py -n make_airflow_example_dags

Parameters

repo_name (str) – Name for generated RepositoryDefinition

Returns

RepositoryDefinition

dagster_airflow.make_dagster_pipeline_from_airflow_dag(dag, tags=None, use_airflow_template_context=False, unique_id=None)[source]

Construct a Dagster pipeline corresponding to a given Airflow DAG.

Tasks in the resulting pipeline will execute the execute() method on the corresponding Airflow Operator. Dagster, any dependencies required by Airflow Operators, and the module containing your DAG definition must be available in the Python environment within which your Dagster solids execute.

To set Airflow’s execution_date for use with Airflow Operator’s execute() methods, either:

  1. (Best for ad hoc runs) Run Pipeline with ‘default’ preset, which sets execution_date to the

    time (in UTC) of pipeline invocation:

    execute_pipeline(
        pipeline=make_dagster_pipeline_from_airflow_dag(dag=dag),
        preset='default')
    
  2. Add {'airflow_execution_date': utc_date_string} to the PipelineDefinition tags. This will override behavior from (1).

    execute_pipeline(
        make_dagster_pipeline_from_airflow_dag(
            dag=dag,
            tags={'airflow_execution_date': utc_execution_date_str}
        )
    )
    
  3. (Recommended) Add {'airflow_execution_date': utc_date_string} to the PipelineRun tags,

    such as in the Dagit UI. This will override behavior from (1) and (2)

We apply normalized_name() to the dag id and task ids when generating pipeline name and solid names to ensure that names conform to Dagster’s naming conventions.

Parameters
  • dag (DAG) – The Airflow DAG to compile into a Dagster pipeline

  • tags (Dict[str, Field]) – Pipeline tags. Optionally include tags={‘airflow_execution_date’: utc_date_string} to specify execution_date used within execution of Airflow Operators.

  • use_airflow_template_context (bool) – If True, will call get_template_context() on the Airflow TaskInstance model which requires and modifies the DagRun table. (default: False)

  • unique_id (int) – If not None, this id will be postpended to generated solid names. Used by framework authors to enforce unique solid names within a repo.

Returns

The generated Dagster pipeline

Return type

pipeline_def (PipelineDefinition)