Source code for dagster_docker.docker_executor

import os
from typing import List

import docker
from dagster_docker.utils import DOCKER_CONFIG_SCHEMA, validate_docker_config, validate_docker_image

from dagster import check, executor
from dagster.core.definitions.executor_definition import multiple_process_executor_requirements
from dagster.core.events import DagsterEvent, DagsterEventType, EngineEventData, MetadataEntry
from dagster.core.execution.plan.objects import StepFailureData
from dagster.core.execution.retries import RetryMode, get_retries_config
from dagster.core.executor.base import Executor
from dagster.core.executor.init import InitExecutorContext
from dagster.core.executor.step_delegating import StepDelegatingExecutor
from dagster.core.executor.step_delegating.step_handler.base import StepHandler, StepHandlerContext
from dagster.serdes.utils import hash_str
from dagster.utils import merge_dicts
from dagster.utils.backcompat import experimental


[docs]@executor( name="docker", config_schema=merge_dicts( DOCKER_CONFIG_SCHEMA, { "retries": get_retries_config(), }, ), requirements=multiple_process_executor_requirements(), ) @experimental def docker_executor(init_context: InitExecutorContext) -> Executor: """ Executor which launches steps as Docker containers. To use the `docker_executor`, set it as the `executor_def` when defining a job: .. literalinclude:: ../../../../../../python_modules/libraries/dagster-docker/dagster_docker_tests/test_example_executor.py :start-after: start_marker :end-before: end_marker :language: python Then you can configure the executor with run config as follows: .. code-block:: YAML execution: config: registry: ... network: ... networks: ... container_kwargs: ... If you're using the DockerRunLauncher, configuration set on the containers created by the run launcher will also be set on the containers that are created for each step. """ from . import DockerRunLauncher image = init_context.executor_config.get("image") registry = init_context.executor_config.get("registry") env_vars = init_context.executor_config.get("env_vars") network = init_context.executor_config.get("network") networks = init_context.executor_config.get("networks") container_kwargs = init_context.executor_config.get("container_kwargs") run_launcher = init_context.instance.run_launcher if isinstance(run_launcher, DockerRunLauncher): image = image or run_launcher.image registry = registry or run_launcher.registry env_vars = run_launcher.env_vars + (env_vars or []) networks = run_launcher.networks + (networks or []) container_kwargs = merge_dicts(run_launcher.container_kwargs, container_kwargs or {}) validate_docker_config(network, networks, container_kwargs) return StepDelegatingExecutor( DockerStepHandler( image, registry, env_vars, network, networks, container_kwargs, ), retries=RetryMode.from_config(init_context.executor_config["retries"]), )
class DockerStepHandler(StepHandler): def __init__( self, image=None, registry=None, env_vars=None, network=None, networks=None, container_kwargs=None, ): super().__init__() self._image = image self._registry = registry self._env_vars = env_vars if network: self._networks = [network] elif networks: self._networks = networks else: self._networks = [] self._container_kwargs = check.opt_dict_param( container_kwargs, "container_kwargs", key_type=str ) @property def name(self) -> str: return "DockerStepHandler" def _get_client(self): client = docker.client.from_env() if self._registry: client.login( registry=self._registry["url"], username=self._registry["username"], password=self._registry["password"], ) return client def _get_container_name(self, run_id, step_key): return f"dagster-step-{hash_str(run_id + step_key)}" def _create_step_container(self, client, step_image, execute_step_args): return client.containers.create( step_image, name=self._get_container_name( execute_step_args.pipeline_run_id, execute_step_args.step_keys_to_execute[0] ), detach=True, network=self._networks[0] if len(self._networks) else None, command=execute_step_args.get_command_args(), environment=( {env_name: os.getenv(env_name) for env_name in self._env_vars} if self._env_vars else {} ), **self._container_kwargs, ) def launch_step(self, step_handler_context: StepHandlerContext) -> List[DagsterEvent]: client = self._get_client() step_image = ( step_handler_context.execute_step_args.pipeline_origin.repository_origin.container_image ) if not step_image: step_image = self._image if not step_image: raise Exception("No docker image specified by the executor config or repository") validate_docker_image(step_image) try: step_container = self._create_step_container( client, step_image, step_handler_context.execute_step_args ) except docker.errors.ImageNotFound: client.images.pull(step_image) step_container = self._create_step_container( client, step_image, step_handler_context.execute_step_args ) if len(self._networks) > 1: for network_name in self._networks[1:]: network = client.networks.get(network_name) network.connect(step_container) assert ( len(step_handler_context.execute_step_args.step_keys_to_execute) == 1 ), "Launching multiple steps is not currently supported" step_key = step_handler_context.execute_step_args.step_keys_to_execute[0] events = [ DagsterEvent( event_type_value=DagsterEventType.ENGINE_EVENT.value, pipeline_name=step_handler_context.execute_step_args.pipeline_origin.pipeline_name, step_key=step_key, message="Launching step in Docker container", event_specific_data=EngineEventData( [ MetadataEntry.text(step_key, "Step key"), MetadataEntry.text(step_container.id, "Docker container id"), ], ), ) ] step_container.start() return events def check_step_health(self, step_handler_context: StepHandlerContext) -> List[DagsterEvent]: step_key = step_handler_context.execute_step_args.step_keys_to_execute[0] client = self._get_client() container_name = self._get_container_name( step_handler_context.execute_step_args.pipeline_run_id, step_key, ) try: container = client.containers.get(container_name) except Exception as e: return [ DagsterEvent( event_type_value=DagsterEventType.STEP_FAILURE.value, pipeline_name=step_handler_context.execute_step_args.pipeline_origin.pipeline_name, step_key=step_key, message=f"Error when checking on step container health: {e}", event_specific_data=StepFailureData( error=None, user_failure_data=None, ), ) ] if container.status == "running": return [] try: container_info = container.wait(timeout=0.1) except Exception as e: return [ DagsterEvent( event_type_value=DagsterEventType.STEP_FAILURE.value, pipeline_name=step_handler_context.execute_step_args.pipeline_origin.pipeline_name, step_key=step_key, message=f"Container status is {container.status}. Hit exception attempting to get its return code: {e}", event_specific_data=StepFailureData( error=None, user_failure_data=None, ), ) ] ret_code = container_info.get("StatusCode") if ret_code == 0: return [] return [ DagsterEvent( event_type_value=DagsterEventType.STEP_FAILURE.value, pipeline_name=step_handler_context.execute_step_args.pipeline_origin.pipeline_name, step_key=step_key, message=f"Container status is {container.status}. Return code is {str(ret_code)}.", event_specific_data=StepFailureData( error=None, user_failure_data=None, ), ) ] def terminate_step(self, step_handler_context: StepHandlerContext) -> List[DagsterEvent]: assert ( len(step_handler_context.execute_step_args.step_keys_to_execute) == 1 ), "Launching multiple steps is not currently supported" step_key = step_handler_context.execute_step_args.step_keys_to_execute[0] events = [ DagsterEvent( event_type_value=DagsterEventType.ENGINE_EVENT.value, pipeline_name=step_handler_context.execute_step_args.pipeline_origin.pipeline_name, step_key=step_key, message="Stopping Docker container for step", event_specific_data=EngineEventData(), ) ] client = self._get_client() try: container = client.containers.get( self._get_container_name( step_handler_context.execute_step_args.pipeline_run_id, step_handler_context.execute_step_args.step_keys_to_execute[0], ) ) container.stop() except Exception as e: events.append( DagsterEvent( event_type_value=DagsterEventType.ENGINE_EVENT.value, pipeline_name=step_handler_context.execute_step_args.pipeline_origin.pipeline_name, step_key=step_key, message=f"Hit error while terminating Docker container:\n{e}", event_specific_data=EngineEventData(), ) ) return events