This library provides an integration with Slack, to support posting messages in your company’s Slack workspace.
Presently, it provides a thin wrapper on the Slack client API chat.postMessage.
To use this integration, you’ll first need to create a Slack App for it.
Create App: Go to https://api.slack.com/apps and click “Create New App”:
Install App: After creating an app, on the left-hand side of the app configuration, click “Bot Users”, and then create a bot user. Then, click “Install App” on the left hand side, and finally “Install App to Workspace”.
Bot Token: Once finished, this will create a new bot token for your bot/workspace:
Copy this bot token and put it somewhere safe; see Safely Storing Credentials for more on this topic.
dagster_slack.
slack_resource
ResourceDefinition[source]¶This resource is for connecting to Slack.
The resource object is a slack_sdk.WebClient.
By configuring this Slack resource, you can post messages to Slack from any Dagster solid:
Examples:
import os
from dagster import job, op
from dagster_slack import slack_resource
@op(required_resource_keys={'slack'})
def slack_op(context):
context.resources.slack.chat_postMessage(channel='#noise', text=':wave: hey there!')
@job(resource_defs={'slack': slack_resource})
def slack_job():
slack_op()
slack_job.execute_in_process(
run_config={'resources': {'slack': {'config': {'token': os.getenv('SLACK_TOKEN')}}}}
)
dagster_slack.
slack_on_failure
HookDefinition[source]¶Create a hook on step failure events that will message the given Slack channel.
channel (str) – The channel to send the message to (e.g. “#my_channel”)
message_fn (Optional(Callable[[HookContext], str])) – Function which takes in the HookContext outputs the message you want to send.
dagit_base_url – (Optional[str]): The base url of your Dagit instance. Specify this to allow messages to include deeplinks to the specific pipeline run that triggered the hook.
Examples
@slack_on_failure("#foo", dagit_base_url="http://localhost:3000")
@job(...)
def my_job():
pass
def my_message_fn(context: HookContext) -> str:
return f"Op {context.op} failed!"
@op
def an_op(context):
pass
@job(...)
def my_job():
an_op.with_hooks(hook_defs={slack_on_failure("#foo", my_message_fn)})
dagster_slack.
slack_on_success
HookDefinition[source]¶Create a hook on step success events that will message the given Slack channel.
channel (str) – The channel to send the message to (e.g. “#my_channel”)
message_fn (Optional(Callable[[HookContext], str])) – Function which takes in the HookContext outputs the message you want to send.
dagit_base_url – (Optional[str]): The base url of your Dagit instance. Specify this to allow messages to include deeplinks to the specific pipeline run that triggered the hook.
Examples
@slack_on_success("#foo", dagit_base_url="http://localhost:3000")
@job(...)
def my_job():
pass
def my_message_fn(context: HookContext) -> str:
return f"Op {context.solid} worked!"
@op
def an_op(context):
pass
@job(...)
def my_job():
an_op.with_hooks(hook_defs={slack_on_success("#foo", my_message_fn)})
dagster_slack.
make_slack_on_run_failure_sensor
(channel, slack_token, text_fn=<function _default_failure_message_text_fn>, blocks_fn=None, name=None, dagit_base_url=None, job_selection=None, default_status=<DefaultSensorStatus.STOPPED: 'STOPPED'>)[source]¶Create a sensor on job failures that will message the given Slack channel.
channel (str) – The channel to send the message to (e.g. “#my_channel”)
slack_token (str) – The slack token. Tokens are typically either user tokens or bot tokens. More in the Slack API documentation here: https://api.slack.com/docs/token-types
text_fn (Optional(Callable[[RunFailureSensorContext], str])) – Function which
takes in the RunFailureSensorContext
and outputs the message you want to send.
Defaults to a text message that contains error message, job name, and run ID.
The usage of the text_fn changes depending on whether you’re using blocks_fn. If you
are using blocks_fn, this is used as a fallback string to display in notifications. If
you aren’t, this is the main body text of the message. It can be formatted as plain text,
or with mrkdwn.
See more details in https://api.slack.com/methods/chat.postMessage#text_usage
blocks_fn (Callable[[RunFailureSensorContext], List[Dict]]) – Function which takes in
the RunFailureSensorContext
and outputs the message blocks you want to send.
See information about Blocks in https://api.slack.com/reference/block-kit/blocks
name – (Optional[str]): The name of the sensor. Defaults to “slack_on_run_failure”.
dagit_base_url – (Optional[str]): The base url of your Dagit instance. Specify this to allow messages to include deeplinks to the failed job run.
job_selection (Optional[List[Union[PipelineDefinition, GraphDefinition]]]) – The jobs that will be monitored by this failure sensor. Defaults to None, which means the alert will be sent when any job in the repository fails.
default_status (DefaultSensorStatus) – Whether the sensor starts as running or not. The default status can be overridden from Dagit or via the GraphQL API.
Examples
slack_on_run_failure = make_slack_on_run_failure_sensor(
"#my_channel",
os.getenv("MY_SLACK_TOKEN")
)
@repository
def my_repo():
return [my_job + slack_on_run_failure]
def my_message_fn(context: RunFailureSensorContext) -> str:
return (
f"Job {context.pipeline_run.pipeline_name} failed!"
f"Error: {context.failure_event.message}"
)
slack_on_run_failure = make_slack_on_run_failure_sensor(
channel="#my_channel",
slack_token=os.getenv("MY_SLACK_TOKEN"),
message_fn=my_message_fn,
dagit_base_url="http://mycoolsite.com",
)
dagster_slack.
make_slack_on_pipeline_failure_sensor
(channel, slack_token, text_fn=<function _default_failure_message_text_fn>, blocks_fn=None, pipeline_selection=None, name=None, dagit_base_url=None, default_status=<DefaultSensorStatus.STOPPED: 'STOPPED'>)[source]¶Create a sensor on pipeline failures that will message the given Slack channel.
channel (str) – The channel to send the message to (e.g. “#my_channel”)
slack_token (str) – The slack token. Tokens are typically either user tokens or bot tokens. More in the Slack API documentation here: https://api.slack.com/docs/token-types
text_fn (Optional(Callable[[PipelineFailureSensorContext], str])) – Function which
takes in the PipelineFailureSensorContext
and outputs the message you want to send.
Defaults to a text message that contains error message, pipeline name, and run ID.
The usage of the text_fn changes depending on whether you’re using blocks_fn. If you
are using blocks_fn, this is used as a fallback string to display in notifications. If
you aren’t, this is the main body text of the message. It can be formatted as plain text,
or with mrkdwn.
See more details in https://api.slack.com/methods/chat.postMessage#text_usage
blocks_fn (Callable[[PipelineFailureSensorContext], List[Dict]]) – Function which takes in
the PipelineFailureSensorContext
and outputs the message blocks you want to send.
See information about Blocks in https://api.slack.com/reference/block-kit/blocks
pipeline_selection (Optional[List[str]]) – Names of the pipelines that will be monitored by this failure sensor. Defaults to None, which means the alert will be sent when any pipeline in the repository fails.
name – (Optional[str]): The name of the sensor. Defaults to “slack_on_pipeline_failure”.
dagit_base_url – (Optional[str]): The base url of your Dagit instance. Specify this to allow messages to include deeplinks to the failed pipeline run.
default_status (DefaultSensorStatus) – Whether the sensor starts as running or not. The default status can be overridden from Dagit or via the GraphQL API.
Examples
slack_on_pipeline_failure = make_slack_on_pipeline_failure_sensor(
"#my_channel",
os.getenv("MY_SLACK_TOKEN")
)
@repository
def my_repo():
return [my_pipeline + slack_on_pipeline_failure]
def my_message_fn(context: PipelineFailureSensorContext) -> str:
return "Pipeline {pipeline_name} failed! Error: {error}".format(
pipeline_name=context.pipeline_run.pipeline_name,
error=context.failure_event.message,
)
slack_on_pipeline_failure = make_slack_on_pipeline_failure_sensor(
channel="#my_channel",
slack_token=os.getenv("MY_SLACK_TOKEN"),
message_fn=my_message_fn,
dagit_base_url="http://mycoolsite.com",
)