This guide provides instructions for deploying Dagster on AWS. You can use EC2 or ECS to host Dagit and the Dagster Daemon, RDS to store runs and events, and S3 as an IO manager to store op inputs and outputs.
To host Dagster on a bare VM or in Docker on EC2, see Running Dagster as a service.
You can use a hosted RDS PostgreSQL database for your Dagster run/events data by configuring your dagster.yaml
file.
run_storage:
module: dagster_postgres.run_storage
class: PostgresRunStorage
config:
postgres_db:
username: { username }
password: { password }
hostname: { hostname }
db_name: { database }
port: { port }
event_log_storage:
module: dagster_postgres.event_log
class: PostgresEventLogStorage
config:
postgres_db:
username: { username }
password: { password }
hostname: { hostname }
db_name: { db_name }
port: { port }
schedule_storage:
module: dagster_postgres.schedule_storage
class: PostgresScheduleStorage
config:
postgres_db:
username: { username }
password: { password }
hostname: { hostname }
db_name: { db_name }
port: { port }
In this case, you'll want to ensure you provide the right connection strings for your RDS instance, and ensure that the node or container hosting Dagit is able to connect to RDS.
Be sure that this file is present, and DAGSTER_HOME is set, on the node where Dagit is running.
Note that using RDS for run and event log storage does not require that Dagit be running in the cloud. If you are connecting a local Dagit instance to a remote RDS storage, double check that your local node is able to connect to RDS.
The Deploying on ECS example on GitHub demonstrates how to configure the Docker Compose CLI integration with ECS to manage all of the required AWS resources that Dagster needs to run on ECS. The example includes a Dagit container for loading and launching jobs, a dagster-daemon
container for managing a run queue and submitting runs from schedules and sensors, a Postgres container for persistent storage, and a container with user job code. The Dagster instance uses the EcsRunLauncher
to launch each run in its own ECS task.
The EcsRunLauncher
launches an ECS task per run. It assumes that the rest of our Dagster deployment is also running in ECS on a Fargate compatible cluster.
By default, each run's task registers its own task definition. To simplify configuration, these task definitions inherit most of their configuration (networking, cpu, memory, environment, etc.) from the process that launches the run but overrides its container definition with a new command to launch a Dagster run. When using the DefaultRunCoordinator
, runs launched via Dagit or GraphQL inherit their task definitions from the Dagit task; runs launched from a sensor or schedule inherit their task definitions from the Daemon task.
Alternatively, you can define your own task definition in your dagster.yaml:
run_launcher:
module: "dagster_aws.ecs"
class: "EcsRunLauncher"
config:
task_definition: "arn:aws:ecs:us-east-1:1234567890:task-definition/my-task-definition:1"
container_name: "my_container_name"
You can use job tags to customize the run's CPU and Memory:
from dagster import job, op
@op()
def my_op(context):
context.log.info('running')
@job(
tags = {
"ecs/cpu": "256",
"ecs/memory": "512",
}
)
def my_job():
my_op()
Fargate tasks only support certain combinations of CPU and Memory.
ECS can bind AWS Secrets Managers secrets as environment variables when runs launch.
By default, Dagster will fetch any Secrets Manager secrets tagged with the key dagster
and set them as environment variables.
Alternatively you can set your own tag name in your dagster.yaml:
run_launcher:
module: "dagster_aws.ecs"
class: "EcsRunLauncher"
config:
secrets_tag: "my-tag-name"
Any secret tagged with my-tag-name
will be included in the environment.
Additionally, you can pass specific secrets using the same structure as the ECS API:
run_launcher:
module: "dagster_aws.ecs"
class: "EcsRunLauncher"
config:
secrets:
- name: "MY_API_TOKEN"
valueFrom: arn:aws:secretsmanager:us-east-1:123456789012:secret:FOO-AbCdEf:token::
- name: "MY_PASSWORD"
valueFrom: arn:aws:secretsmanager:us-east-1:123456789012:secret:FOO-AbCdEf:password::
]
}
Any secret tagged with dagster
will be included in the environment. MY_API_TOKEN
and MY_PASSWORD
will also be included in the environment.
To enable parallel computation (e.g., with the multiprocessing or Dagster celery executors), you will need to configure persistent IO Managers -- for instance, using an S3 bucket to store data passed between ops.
You'll first need to need to use s3_pickle_io_manager
as your IO Manager or customize your own persistent io managers (see example).
from dagster_aws.s3.io_manager import s3_pickle_io_manager
from dagster_aws.s3.resources import s3_resource
from dagster import Int, Out, job, op
@op(out=Out(Int))
def my_op():
return 1
@job(
resource_defs={
"io_manager": s3_pickle_io_manager,
"s3": s3_resource,
}
)
def my_job():
my_op()
Then, add the following YAML block in your job's config:
resources:
io_manager:
config:
s3_bucket: my-cool-bucket
s3_prefix: good/prefix-for-files-
The resource uses boto
under the hood, so if you are accessing your private buckets, you will need to provide the AWS_ACCESS_KEY_ID
and AWS_SECRET_ACCESS_KEY
environment variables or follow one of the other boto authentication methods.
With this in place, your job runs will store data passed between ops on S3 in the location s3://<bucket>/dagster/storage/<job run id>/<op name>.compute
.