Source code for dagster_msteams.sensors
from typing import Callable, Optional
from dagster_msteams.card import Card
from dagster_msteams.client import TeamsClient
from dagster import DefaultSensorStatus
from dagster.core.definitions.run_status_sensor_definition import (
PipelineFailureSensorContext,
pipeline_failure_sensor,
)
def _default_failure_message(context: PipelineFailureSensorContext) -> str:
return "\n".join(
[
f"Pipeline {context.pipeline_run.pipeline_name} failed!",
f"Run ID: {context.pipeline_run.run_id}",
f"Mode: {context.pipeline_run.mode}",
f"Error: {context.failure_event.message}",
]
)
[docs]def make_teams_on_pipeline_failure_sensor(
hook_url: str,
message_fn: Callable[[PipelineFailureSensorContext], str] = _default_failure_message,
http_proxy: Optional[str] = None,
https_proxy: Optional[str] = None,
timeout: Optional[float] = 60,
verify: Optional[bool] = None,
name: Optional[str] = None,
dagit_base_url: Optional[str] = None,
default_status: DefaultSensorStatus = DefaultSensorStatus.STOPPED,
):
"""Create a sensor on pipeline failures that will message the given MS Teams webhook URL.
Args:
hook_url (str): MS Teams incoming webhook URL.
message_fn (Optional(Callable[[PipelineFailureSensorContext], str])): Function which
takes in the ``PipelineFailureSensorContext`` and outputs the message you want to send.
Defaults to a text message that contains error message, pipeline name, and run ID.
http_proxy : (Optional[str]): Proxy for requests using http protocol.
https_proxy : (Optional[str]): Proxy for requests using https protocol.
timeout: (Optional[float]): Connection timeout in seconds. Defaults to 60.
verify: (Optional[bool]): Whether to verify the servers TLS certificate.
name: (Optional[str]): The name of the sensor. Defaults to "teams_on_pipeline_failure".
dagit_base_url: (Optional[str]): The base url of your Dagit instance. Specify this to allow
messages to include deeplinks to the failed pipeline run.
default_status (DefaultSensorStatus): Whether the sensor starts as running or not. The default
status can be overridden from Dagit or via the GraphQL API.
Examples:
.. code-block:: python
teams_on_pipeline_failure = make_teams_on_pipeline_failure_sensor(
hook_url=os.getenv("TEAMS_WEBHOOK_URL")
)
@repository
def my_repo():
return [my_pipeline + teams_on_pipeline_failure]
.. code-block:: python
def my_message_fn(context: PipelineFailureSensorContext) -> str:
return "Pipeline {pipeline_name} failed! Error: {error}".format(
pipeline_name=context.pipeline_run.pipeline_name,
error=context.failure_event.message,
)
teams_on_pipeline_failure = make_teams_on_pipeline_failure_sensor(
hook_url=os.getenv("TEAMS_WEBHOOK_URL"),
message_fn=my_message_fn,
dagit_base_url="http://localhost:3000",
)
"""
teams_client = TeamsClient(
hook_url=hook_url,
http_proxy=http_proxy,
https_proxy=https_proxy,
timeout=timeout,
verify=verify,
)
@pipeline_failure_sensor(name=name, default_status=default_status)
def teams_on_pipeline_failure(context: PipelineFailureSensorContext):
text = message_fn(context)
if dagit_base_url:
text += "<a href='{base_url}/instance/runs/{run_id}'>View in Dagit</a>".format(
base_url=dagit_base_url,
run_id=context.pipeline_run.run_id,
)
card = Card()
card.add_attachment(text_message=text)
teams_client.post_message(payload=card.payload)
return teams_on_pipeline_failure