Source code for dagster_k8s.launcher

import sys

import kubernetes

from dagster import Field, MetadataEntry, StringSource, check
from dagster.cli.api import ExecuteRunArgs
from dagster.core.events import EngineEventData
from dagster.core.launcher import LaunchRunContext, ResumeRunContext, RunLauncher
from dagster.core.launcher.base import CheckRunHealthResult, WorkerStatus
from dagster.core.storage.pipeline_run import PipelineRun, PipelineRunStatus
from dagster.core.storage.tags import DOCKER_IMAGE_TAG
from dagster.grpc.types import ResumeRunArgs
from dagster.serdes import ConfigurableClass, ConfigurableClassData
from dagster.utils import frozentags, merge_dicts
from dagster.utils.error import serializable_error_info_from_exc_info

from .job import (
    DagsterK8sJobConfig,
    construct_dagster_k8s_job,
    get_job_name_from_run_id,
    get_user_defined_k8s_config,
)
from .utils import delete_job


[docs]class K8sRunLauncher(RunLauncher, ConfigurableClass): """RunLauncher that starts a Kubernetes Job for each Dagster job run. Encapsulates each run in a separate, isolated invocation of ``dagster-graphql``. You can configure a Dagster instance to use this RunLauncher by adding a section to your ``dagster.yaml`` like the following: .. code-block:: yaml run_launcher: module: dagster_k8s.launcher class: K8sRunLauncher config: service_account_name: your_service_account job_image: my_project/dagster_image:latest instance_config_map: dagster-instance postgres_password_secret: dagster-postgresql-secret """ def __init__( self, service_account_name, instance_config_map, postgres_password_secret=None, dagster_home=None, job_image=None, image_pull_policy=None, image_pull_secrets=None, load_incluster_config=True, kubeconfig_file=None, inst_data=None, job_namespace="default", env_config_maps=None, env_secrets=None, env_vars=None, k8s_client_batch_api=None, volume_mounts=None, volumes=None, labels=None, fail_pod_on_run_failure=None, ): self._inst_data = check.opt_inst_param(inst_data, "inst_data", ConfigurableClassData) self.job_namespace = check.str_param(job_namespace, "job_namespace") self.load_incluster_config = load_incluster_config self.kubeconfig_file = kubeconfig_file if load_incluster_config: check.invariant( kubeconfig_file is None, "`kubeconfig_file` is set but `load_incluster_config` is True.", ) kubernetes.config.load_incluster_config() else: check.opt_str_param(kubeconfig_file, "kubeconfig_file") kubernetes.config.load_kube_config(kubeconfig_file) self._fixed_batch_api = k8s_client_batch_api self._job_config = None self._job_image = check.opt_str_param(job_image, "job_image") self.dagster_home = check.str_param(dagster_home, "dagster_home") self._image_pull_policy = check.opt_str_param( image_pull_policy, "image_pull_policy", "IfNotPresent" ) self._image_pull_secrets = check.opt_list_param( image_pull_secrets, "image_pull_secrets", of_type=dict ) self._service_account_name = check.str_param(service_account_name, "service_account_name") self.instance_config_map = check.str_param(instance_config_map, "instance_config_map") self.postgres_password_secret = check.opt_str_param( postgres_password_secret, "postgres_password_secret" ) self._env_config_maps = check.opt_list_param( env_config_maps, "env_config_maps", of_type=str ) self._env_secrets = check.opt_list_param(env_secrets, "env_secrets", of_type=str) self._env_vars = check.opt_list_param(env_vars, "env_vars", of_type=str) self._volume_mounts = check.opt_list_param(volume_mounts, "volume_mounts") self._volumes = check.opt_list_param(volumes, "volumes") self._labels = check.opt_dict_param(labels, "labels", key_type=str, value_type=str) self._fail_pod_on_run_failure = check.opt_bool_param( fail_pod_on_run_failure, "fail_pod_on_run_failure" ) super().__init__() @property def image_pull_policy(self): return self._image_pull_policy @property def image_pull_secrets(self): return self._image_pull_secrets @property def service_account_name(self): return self._service_account_name @property def env_config_maps(self): return self._env_config_maps @property def env_secrets(self): return self._env_secrets @property def volume_mounts(self): return self._volume_mounts @property def volumes(self): return self._volumes @property def env_vars(self): return self._env_vars @property def labels(self): return self._labels @property def fail_pod_on_run_failure(self): return self._fail_pod_on_run_failure @property def _batch_api(self): return self._fixed_batch_api if self._fixed_batch_api else kubernetes.client.BatchV1Api() @classmethod def config_type(cls): """Include all arguments required for DagsterK8sJobConfig along with additional arguments needed for the RunLauncher itself. """ job_cfg = DagsterK8sJobConfig.config_type_run_launcher() run_launcher_extra_cfg = { "job_namespace": Field(StringSource, is_required=False, default_value="default"), } return merge_dicts(job_cfg, run_launcher_extra_cfg) @classmethod def from_config_value(cls, inst_data, config_value): return cls(inst_data=inst_data, **config_value) @property def inst_data(self): return self._inst_data def get_static_job_config(self): if self._job_config: return self._job_config else: self._job_config = DagsterK8sJobConfig( job_image=check.str_param(self._job_image, "job_image"), dagster_home=check.str_param(self.dagster_home, "dagster_home"), image_pull_policy=check.str_param(self._image_pull_policy, "image_pull_policy"), image_pull_secrets=check.opt_list_param( self._image_pull_secrets, "image_pull_secrets", of_type=dict ), service_account_name=check.str_param( self._service_account_name, "service_account_name" ), instance_config_map=check.str_param( self.instance_config_map, "instance_config_map" ), postgres_password_secret=check.opt_str_param( self.postgres_password_secret, "postgres_password_secret" ), env_config_maps=check.opt_list_param( self._env_config_maps, "env_config_maps", of_type=str ), env_secrets=check.opt_list_param(self._env_secrets, "env_secrets", of_type=str), env_vars=check.opt_list_param(self._env_vars, "env_vars", of_type=str), volume_mounts=self._volume_mounts, volumes=self._volumes, labels=self._labels, ) return self._job_config def _get_grpc_job_config(self, job_image): return DagsterK8sJobConfig( job_image=check.str_param(job_image, "job_image"), dagster_home=check.str_param(self.dagster_home, "dagster_home"), image_pull_policy=check.str_param(self._image_pull_policy, "image_pull_policy"), image_pull_secrets=check.opt_list_param( self._image_pull_secrets, "image_pull_secrets", of_type=dict ), service_account_name=check.str_param( self._service_account_name, "service_account_name" ), instance_config_map=check.str_param(self.instance_config_map, "instance_config_map"), postgres_password_secret=check.opt_str_param( self.postgres_password_secret, "postgres_password_secret" ), env_config_maps=check.opt_list_param( self._env_config_maps, "env_config_maps", of_type=str ), env_secrets=check.opt_list_param(self._env_secrets, "env_secrets", of_type=str), env_vars=check.opt_list_param(self._env_vars, "env_vars", of_type=str), volume_mounts=self._volume_mounts, volumes=self._volumes, labels=self._labels, ) def _launch_k8s_job_with_args(self, job_name, args, run, pipeline_origin): pod_name = job_name user_defined_k8s_config = get_user_defined_k8s_config(frozentags(run.tags)) repository_origin = pipeline_origin.repository_origin job_config = ( self._get_grpc_job_config(repository_origin.container_image) if repository_origin.container_image else self.get_static_job_config() ) self._instance.add_run_tags( run.run_id, {DOCKER_IMAGE_TAG: job_config.job_image}, ) job = construct_dagster_k8s_job( job_config=job_config, args=args, job_name=job_name, pod_name=pod_name, component="run_worker", user_defined_k8s_config=user_defined_k8s_config, labels={ "dagster/job": pipeline_origin.pipeline_name, }, ) self._instance.report_engine_event( "Creating Kubernetes run worker job", run, EngineEventData( [ MetadataEntry.text(job_name, "Kubernetes Job name"), MetadataEntry.text(self.job_namespace, "Kubernetes Namespace"), MetadataEntry.text(run.run_id, "Run ID"), ] ), cls=self.__class__, ) self._batch_api.create_namespaced_job(body=job, namespace=self.job_namespace) self._instance.report_engine_event( "Kubernetes run worker job created", run, EngineEventData( [ MetadataEntry.text(job_name, "Kubernetes Job name"), MetadataEntry.text(self.job_namespace, "Kubernetes Namespace"), MetadataEntry.text(run.run_id, "Run ID"), ] ), cls=self.__class__, ) def launch_run(self, context: LaunchRunContext) -> None: run = context.pipeline_run job_name = get_job_name_from_run_id(run.run_id) pipeline_origin = context.pipeline_code_origin args = ExecuteRunArgs( pipeline_origin=pipeline_origin, pipeline_run_id=run.run_id, instance_ref=self._instance.get_ref(), set_exit_code_on_failure=self._fail_pod_on_run_failure, ).get_command_args() self._launch_k8s_job_with_args(job_name, args, run, pipeline_origin) @property def supports_resume_run(self): return True def resume_run(self, context: ResumeRunContext) -> None: run = context.pipeline_run job_name = get_job_name_from_run_id( run.run_id, resume_attempt_number=context.resume_attempt_number ) pipeline_origin = context.pipeline_code_origin args = ResumeRunArgs( pipeline_origin=pipeline_origin, pipeline_run_id=run.run_id, instance_ref=self._instance.get_ref(), set_exit_code_on_failure=self._fail_pod_on_run_failure, ).get_command_args() self._launch_k8s_job_with_args(job_name, args, run, pipeline_origin) # https://github.com/dagster-io/dagster/issues/2741 def can_terminate(self, run_id): check.str_param(run_id, "run_id") pipeline_run = self._instance.get_run_by_id(run_id) if not pipeline_run: return False if pipeline_run.status != PipelineRunStatus.STARTED: return False return True def terminate(self, run_id): check.str_param(run_id, "run_id") run = self._instance.get_run_by_id(run_id) if not run: return False can_terminate = self.can_terminate(run_id) if not can_terminate: self._instance.report_engine_event( message="Unable to terminate run; can_terminate returned {}".format(can_terminate), pipeline_run=run, cls=self.__class__, ) return False self._instance.report_run_canceling(run) job_name = get_job_name_from_run_id( run_id, resume_attempt_number=self._instance.count_resume_run_attempts(run.run_id) ) try: termination_result = delete_job(job_name=job_name, namespace=self.job_namespace) if termination_result: self._instance.report_engine_event( message="Run was terminated successfully.", pipeline_run=run, cls=self.__class__, ) else: self._instance.report_engine_event( message="Run was not terminated successfully; delete_job returned {}".format( termination_result ), pipeline_run=run, cls=self.__class__, ) return termination_result except Exception: self._instance.report_engine_event( message="Run was not terminated successfully; encountered error in delete_job", pipeline_run=run, engine_event_data=EngineEventData.engine_error( serializable_error_info_from_exc_info(sys.exc_info()) ), cls=self.__class__, ) @property def supports_check_run_worker_health(self): return True def check_run_worker_health(self, run: PipelineRun): job_name = get_job_name_from_run_id( run.run_id, resume_attempt_number=self._instance.count_resume_run_attempts(run.run_id) ) try: job = self._batch_api.read_namespaced_job(namespace=self.job_namespace, name=job_name) except Exception: return CheckRunHealthResult( WorkerStatus.UNKNOWN, str(serializable_error_info_from_exc_info(sys.exc_info())) ) if job.status.failed: return CheckRunHealthResult(WorkerStatus.FAILED, "K8s job failed") if job.status.succeeded: return CheckRunHealthResult(WorkerStatus.SUCCESS) return CheckRunHealthResult(WorkerStatus.RUNNING)