Apache Airflow migraiton journey from self-hosted to AWS Managed Airflow
Introduction
Due to security, and compatibility issues with migrating our self-hosted Airflow envirinment, we decided to migrate to AWS Managed Workflows for Apache Airflow (mwaa). The old EKS cluster was using istio as an ingress gateway controller, however we dropped this on the new cluster and opted for a more managed approach of using the AWS Loadbalancer Controller for the majority of ingress, and the nginx ingress controller for any services which required more complex ingress rules. During the course of the migration we encountered some issues with Apache Airflow around correct traffic routing/authentication for the web gui. We spent several days attempting to resolve these problems before deciding that it was better to simply switch to using AWS Managed Workflows for Apache Airflow (mwaa). This article will go through the specific issues we faced, and the journey to migrating to mwaa.
Self-hosted Implementation
The original configuration
Our original implementation of Apache Airflow was deployed onto EKS using the community maintained helm chart. Authentication for the web gui washandled via Google oAuth, over HTTPS — a fairly standard setup. oAuth values were set in the helm chart and the callback URLs were configured on the Google GCP side for the application. You can see we are retrieving Google credentials from a kubernetes secret which is configured via ExternalSecrets.
Note: The AIRFLOW__GOOGLE__CLIENT_ID
, and AIRFLOW__GOOGLE__DOMAIN
values below have been replaced with dummy data.
extraEnv:
- name: AIRFLOW__WEBSERVER__AUTHENTICATE
value: "True"
- name: AIRFLOW__WEBSERVER__AUTH_BACKEND
value: airflow.contrib.auth.backends.google_auth
- name: AIRFLOW__GOOGLE__CLIENT_ID
value: 123456789-1245tghu87654esdxcvbhu8765rdcvg.apps.googleusercontent.com
- name: AIRFLOW__GOOGLE__CLIENT_SECRET
valueFrom:
secretKeyRef:
name: airflow-google-client-secret
key: client_secret
optional: false
- name: AIRFLOW__GOOGLE__OAUTH_CALLBACK_ROUTE
value: "/oauth2callback"
- name: AIRFLOW__GOOGLE__DOMAIN
value: company.com
- name: AIRFLOW__GOOGLE__PROMPT
value: "select_account consent"
Our web base URL is configured with the HTTPS protocol:
web:
baseUrl: https://airflow.prod.company.io
And we have an Istio virtual service for ingress using our configured Istio exgternal ingress gateway.
apiVersion: networking.istio.io/v1alpha3
kind: VirtualService
metadata:
name: airflow
namespace: airflow
spec:
hosts:
- airflow.prod.company.io
gateways:
- istio-system/company-gateway-external
http:
- route:
- destination:
host: airflow-web
All is working as expected, and when navigating in a web browser to https://airflow.prod.company.io you are routed to the Airflow web gui and are prompted to login using Google SSO identity provider.
The Attempted Migration Configuration
The configuration for the migration was very similar however we opted to use the official helm chart for the deployment rather than the community chart, which meant re-factoring many of the parameters and values to conform to the official helm chart specs. In my opinion this is one of the down-sides of using helm to deploy applications in a smaller environment, however discussing the pros and cons of deploying to kubernetes uisng helm is not within the scope of this post.
The majority of the helm release config is identical, although re-factored slightly to conform to the chart specific templates. However, as I mentioned previously we are no longer using istio in the new EKS cluster so instead we configure service and ingress resources, and use the AWS Loadbalancer Controller annotations on the ingress resource to provision an internal loadbalancer (because this service should not be acessible outside of our organisation) within AWS for our application.
apiVersion: v1
kind: Service
metadata:
name: airflow
namespace: airflow
spec:
ports:
- port: 80
targetPort: 8080
protocol: TCP
type: NodePort
selector:
app: airflow
---
apiVersion: networking.k8s.io/v1
kind: Ingress
metadata:
name: airflow
namespace: airflow
annotations:
external-dns.alpha.kubernetes.io/ttl: "300"
external-dns.alpha.kubernetes.io/hostname: airflow.staging.company.io
kubernetes.io/ingress.class: alb
alb.ingress.kubernetes.io/scheme: internal
alb.ingress.kubernetes.io/target-type: ip
alb.ingress.kubernetes.io/certificate-arn: arn:aws:acm:eu-west-1:1234567890:certificate/a01d4f1d-1009-42f4-9f04-c0f98bccffbf
alb.ingress.kubernetes.io/listen-ports: '[{"HTTP": 80}, {"HTTPS":443}]'
alb.ingress.kubernetes.io/ssl-redirect: '443'
alb.ingress.kubernetes.io/healthcheck-path: '/api/v1/ping/'
spec:
rules:
- host: airflow.staging.company.io
http:
paths:
- path: /
pathType: Prefix
backend:
service:
name: airflow
port:
number: 80
As you can see, this configuration specifies both HTTP and HTTPS listeners on the loadbalancer and attaches an SSL certificate to the HTTPS listener. This certificate contains the DNS names relavent to the service endpoint (*.internal.company.io).
Upon testing this configuration we were unable to access the Airflow web gui. We received a redirect_uri_mismatch
error when attempting to log in via SSO. After some troubleshooting we realised this is because the Airflow application is attempting to forward our authentication request to Google over HTTP which is not permitted on Google’s side. Even though we have specified HTTPS within our URL, and we have configured the base URL of our Airflow applicatkon with HTTPS.
This is the error we were seeing. Note: this is just an example and is not our original URI error
To confirm this, we disabled Google oAuth on the deployment and simply used the built-in Airflow RBAC authentication (username and password). This worked successfully and we were able to login to the admin console. So we know that our service is running correctly, and our issue is entirely with authentication using Google. So we began a very lengthy troubleshooting process taking a couple of weeks which involved:
- Testing with the latest version of Airflow on both the community and official helm charts.
- Testing using the latest version 1 of Airflow (1.10.15) and version 2 (2.2.3) in case version 2 of Airflow had better security implementation. Sadly this was not the case.
- Using internal and external loadbalancers
- Configuring non-default HTTPS ports on the loadbalancers and kubernetes ingress/service
- Changing the target types of the AWS loadbalancers, along with the service type within Kubernetes
- Adding and removing HTTP and HTTPs versions of the authorised callbacl URLs in the google oAuth section of GCP.
- Testing loopback URLs
Throughout all of this troubleshooting we tested various different combinations of configurations of each of the above and we recieved different errors consistently. Sometimes we would simply see a blank page without any error, and when inspecting the requests in the browser developer tools we would find a 400 error. Other times we would find a 302 redirect, but to an incorrect URL. Sometimes we would see the “circles” built-in error page from Airflow (page not found 404):
Along the way, we were of course consulting the documentation for both the community chart and the official chart.
Community chart: https://github.com/airflow-helm/charts/tree/main/charts/airflow
Official documentation: https://airflow.apache.org/docs/apache-airflow/stable/index.html
The official documentation for Apache Airflow is not great in my opinion. The navigation is difficult and often it is more effective so simply Google what you are looking for and find a link to the documentation than try to find it within the documentation itself. I would also say that I have never read any documentation before which has so many sections and organised navigation structure, without actually saying anything! It is very difficult to find meaningful explanations for certain aspects of Airflow, and as such we had to resort to other online resources a lot during the troubleshooting process. We discovered many other people encountering the same problems not just with Google oAuth, but with any kind of third party SSO identity privider. It seems that Airflow is simply not built well to handle third party SSO and is geared towards encouraging people to use the standard built-in password authentication method.
It also seems that Airflow does not actually support HTTPS/SSL termination correctly as the documentation describes simply setting a HTTP endpoint with :443
as the method for “enabling” HTTPS… https://airflow.apache.org/docs/apache-airflow/stable/security/webserver.html#ssl
How did it work before?
This is a question we continuously asked ourselves throughout this journey, and the whole painful process, it is easy to forget that we had Airflow deployed and working in production already, using Google oAuth. So how is this possible, when it seems that the application does not properly support HTTPS, a rquirement of Google oAuth?
The only explanation we came up with, was that in our original EKS cluster, which was utilising istio for ingress, we were simply lucky. We believe that since istio is working as a sidecar to proxy all requests in and out of the pod, re-routing them to localhost
along the way and adding additional headers etc, that it is somehow managing to force a HTTPS connection upon egress from the pod, allowing Google oAuth to work. If we had not been using istio in the original EKS setup, we believe that Airflow/Google oAuth deployment would have been much more difficult.
Implementaion of AWS Managed Airflow
So we now come to the interesting part of this post. We decided that because of all of the above headaches, it would be much simpler to use a managed solution provided by our cloud hosting service which we are already using.
We are using Terraform to create the underlying required infrastreucture for the Airflow environment, and since this is managed within AWS, we simply need to provide it access to our new EKS cluster via IAM policy and aws-auth configmap. Which will allow the Airflow service IAM role to spin up pods in the cluster to run the jobs, identically to how Airflow was working in the originl EKS cluster.
Locals
We have configured a local resource of name_prefix
for the module which takes our Service
and Environment
tags and joins them together, which makes it easier to add a quick consistent naming convention to all resources in the module:
locals {
name_prefix = "${var.tags["Service"]}-${var.tags["Environment"]}"
}
First, we need a VPC for the Airflow environment to be deployed into, so for this we can use the official AWS VPC module for terraform: https://registry.terraform.io/modules/terraform-aws-modules/vpc/aws/latest. Our subnets are defined by variables in the .tfvars
files which are per-environment.
module "vpc" {
source = "terraform-aws-modules/vpc/aws"
version = "3.11.0"
name = "${local.name_prefix}-vpc"
cidr = var.vpc_cidr
azs = var.azs
private_subnets = var.private_subnets
public_subnets = var.public_subnets
enable_nat_gateway = true
single_nat_gateway = true
reuse_nat_ips = true # <= Skip creation of EIPs for the NAT Gateways
external_nat_ip_ids = [aws_eip.nat.id]
tags = merge(var.tags, { Name = "${local.name_prefix}-vpc" }, )
}
We also have a single NAT gateway for the deployment:
resource "aws_eip" "nat" {
vpc = true
tags = merge(var.tags, { Name = "${local.name_prefix}-private_nat-gateway" }, )
}
A security group with default egress rule and the self
ingress rule is sufficient:
resource "aws_security_group" "airflow_security_group" {
name_prefix = "${local.name_prefix}-sg-"
description = "Security group used for airflow envionment"
vpc_id = module.vpc.vpc_id
lifecycle {
create_before_destroy = true
}
tags = merge(var.tags, { Name = "${local.name_prefix}-sg" }, )
}
resource "aws_security_group_rule" "allow_self" {
type = "ingress"
to_port = 0
protocol = "-1"
from_port = 0
security_group_id = aws_security_group.airflow_security_group.id
self = true
}
resource "aws_security_group_rule" "allow_egress" {
type = "egress"
to_port = 0
protocol = "-1"
from_port = 0
cidr_blocks = ["0.0.0.0/0"]
ipv6_cidr_blocks = ["::/0"]
security_group_id = aws_security_group.airflow_security_group.id
}
S3
We create an S3 bucket where the Airflow dags will be stored, as well as any requirements.txt and plugins.zip objects. With mwaa, it is not sufficient to simply have a plugins folder in the S3 bucket as is usually the case with self-hosted Airflow, it must be provided as a .zip
file to the Airflow environment config.
resource "aws_s3_bucket" "airflow_dags_bucket" {
bucket = "company-${local.name_prefix}"
acl = "private"
versioning {
enabled = true
}
tags = var.tags
}
resource "aws_s3_bucket_public_access_block" "airflow_dags_s3_public_access_block" {
bucket = aws_s3_bucket.airflow_dags_bucket.id
block_public_acls = true
block_public_policy = true
ignore_public_acls = true
restrict_public_buckets = true
}
IAM
IAM implementation for Airflow is fairly straightforward, AWS provide examples for you to use for your IAM service role and KMS key etc: https://docs.aws.amazon.com/mwaa/latest/userguide/mwaa-create-role.html#mwaa-create-role-how-create-role
First we can define a couple of housekeeping data resources to make our life easier. The AWS account ID and the region, these will automatically be populated because our AWS terraform provider is already authenticated with AWS via our deployment pipeline.
data "aws_caller_identity" "current" {}
data "aws_region" "current" {}
Next we can define our IAM trust policy for our service role:, this is also privided by AWS in the docs above.
data "aws_iam_policy_document" "airflow_trust_policy" {
statement {
sid = "AssumeRole"
effect = "Allow"
actions = [
"sts:AssumeRole"
]
principals {
type = "Service"
identifiers = ["airflow.amazonaws.com", "airflow-env.amazonaws.com"]
}
}
}
We can then define our Airflow IAM policy, where we can make sure of the above mentioned data resources for account ID and region:
data "aws_iam_policy_document" "airflow_policy" {
statement {
effect = "Allow"
actions = [
"airflow:PublishMetrics"
]
resources = [
"arn:aws:airflow:${data.aws_region.current.name}:${data.aws_caller_identity.current.account_id}:environment/${local.name_prefix}"
]
}
statement {
effect = "Deny"
actions = [
"s3:ListAllMyBuckets"
]
resources = [
aws_s3_bucket.airflow_dags_bucket.arn
]
}
statement {
effect = "Allow"
actions = [
"s3:GetBucket*",
"s3:ListBucket",
"s3:GetBucketPublicAccessBlock"
]
resources = [
aws_s3_bucket.airflow_dags_bucket.arn
]
}
...
We also have a KMS key policy for the KMS key used to by Airflow. Note: When creating KMS key policies, ensure that you give full permissions to a role or user to be able to administer the key, otherwise you can find yourself with a KMS key which is unmanagble: https://aws.amazon.com/premiumsupport/knowledge-center/update-key-policy-future/.
Airflow Environment
For the Airflow environment itself we can use the aws_mwaa_environment
Terraform resource. Below is the bulk of the mwaa config, you can see that we are providing the requirements.txt and plugins.zip objects which are contained in the dags S3 bucket, you can chose to either create these manually or with Terraform. We are adding the execution_role_arn
of the IAM role we have created, and configuring the worker parameters via variables (per environment). Finally the network configuration which will use the VPC subnets and security groups we defined.
resource "aws_mwaa_environment" "airflow_environment" {
name = local.name_prefix
airflow_configuration_options = {
"core.default_task_retries" = 16
"core.parallelism" = 1
}
source_bucket_arn = aws_s3_bucket.airflow_dags_bucket.arn
dag_s3_path = "dags/"
plugins_s3_path = "plugins.zip"
requirements_s3_path = "requirements.txt"
execution_role_arn = aws_iam_role.airflow.arn
kms_key = aws_kms_key.airflow.arn
webserver_access_mode = "PUBLIC_ONLY"
environment_class = var.environment_class
min_workers = var.min_workers
max_workers = var.max_workers
network_configuration {
security_group_ids = [aws_security_group.airflow_security_group.id]
subnet_ids = slice(module.vpc.private_subnets, 0, 2)
}
}
You can optionally configure logging for the environment:
dynamic "logging_configuration" {
for_each = anytrue([var.dag_processing_logs_enabled, var.scheduler_logs_enabled, var.task_logs_enabled, var.webserver_logs_enabled, var.worker_logs_enabled]) ? [1] : []
content {
dynamic "dag_processing_logs" {
for_each = var.dag_processing_logs_enabled ? [1] : []
content {
enabled = var.dag_processing_logs_enabled
log_level = var.dag_processing_logs_log_level
}
}
dynamic "scheduler_logs" {
for_each = var.scheduler_logs_enabled ? [1] : []
content {
enabled = var.scheduler_logs_enabled
log_level = var.scheduler_logs_log_level
}
}
dynamic "task_logs" {
for_each = var.task_logs_enabled ? [1] : []
content {
enabled = var.task_logs_enabled
log_level = var.task_logs_log_level
}
}
}
...
Outputs
We have some useful outputs configured that we can use for reference if we need to expand our module in the future, or integrate other services with Airflow:
output "private_subnets" {
value = module.vpc.private_subnets
}
output "private_subnet_cidr_blocks" {
value = module.vpc.private_subnets_cidr_blocks
}
output "public_subnets" {
value = module.vpc.public_subnets
}
output "public_subnet_cidr_blocks" {
value = module.vpc.public_subnets_cidr_blocks
}
output "security_group_default" {
value = module.vpc.default_security_group_id
}
Using Managed Airlfow
Importing DAGs
Now that weh ave our MWAA environment set up we can begin importing out DAG files. We can do this simply by uploading them to the S3 bucket created by the Terraform module. The DAG files need to go into a folder in the bucket named “dags”, and can be nested within sub-folders if preferred.
Dependencies and Plugins
Dependencies and plugins are controlled by two files, a requirements.txt
file anda plugins.zip
file. both of which need to sit in the root of the S3 bucket for our DAGs.
The Airflow UI
The Airflow UI is identical to what you will see if you are deploying your own Airflow service. You can access it by navigating to the managed Airflow service inside the AWS console and selecting the UI link on the environment which has been created.
Note: Your user/role must have access to MWAA in order to be able to login to the UI. The permissions required are:
{
"Sid": "AirflowAccess",
"Effect": "Allow",
"Action": [
"airflow:CreateWebLoginToken",
"airflow:GetEnvironment",
"airflow:ListEnvironments"
],
"Resource": [<your airflow environment ARN]
},
"Sid": "AirflowListEnvs",
"Effect": "Allow",
"Action": "airflow:ListEnvironments",
"Resource": "*"
}
And for the S3 DAGs bucket created:
{
"Sid": "DagsBucket",
"Effect": "Allow",
"Action": [
"S3:ListAllMyBuckets",
"S3:GetBucketPolicy",
"S3:ListBucket",
"S3:GetObject",
"S3:GetObjectRetention",
"S3:GetObjectVersion",
"S3:PutObject",
"S3:RestoreObject"
],
"Resource: [<your dags bucket ARN>]
}
Using MWAA with EKS
When we were running our own hosted Airflow service within out EKS cluster, we didn’t need to worry about certain things which are required for using MWAA with EKS. Our use case (and most others) is that when a DAG task runs, rather than running the DAG on the worker node in the MWAA environment, the task simply spins up a pod in our EKS cluster, with a container entrypoint of the command which actually executes the task at hand. In order to accomplish this using MWAA, there are some additional steps required, which are documented here: https://docs.aws.amazon.com/mwaa/latest/userguide/mwaa-eks-example.html
AWS Auth Configmap
If you are using EKS, you are probably familiar with the AWS auth configmap. This is the built-in configmap which allows you to map AWS IAM roles to Kubernetes users/groups. We need to add a mapping for our Airflow IAM role which is used by the MWAA environment:
We use Terraform to create our configmap in our EKS clusters:
{
rolearn = "arn:aws:iam::1234567890:role/airflow-staging-role",
username = "mwaa-service",
groups = [
"company:mwaa"
]
}
This translates to a YAML format of:
- "groups":
- "company:mwaa"
"rolearn": "arn:aws:iam::1234567890:role/airflow-staging-role"
"username": "mwaa-service"
Next we need to create a kubeconfig for the mwaa-service user which we can do by utilising the aws eks update-kubeconfig command:
aws eks update-kubeconfig \
--region us-west-2 \
--kubeconfig ./kube_config.yaml \
--name mwaa-eks \
--alias aws
We need to provide this kubeconfig file to the Airflow by adding it to the root of the dags bucket and then specifying the location of the file within the dag parameters:
config_file=<s3 bucket path relative to root>
The Final Hurdle
We actually spent days trying to troubleshoot this last error, including having a support case open with AWS for it in which they were also struggling to understand the cause. The error we were seeing when the DAGs we were testing with tried to run, was a cloudwatch error:
*** Reading remote log from Cloudwatch log_group: airflow-airflow-staging-Task log_stream: era5_pipeline/copy_air_temp_2m_task/2022-02-24T10_53_11.815092+00_00/3.log.
Could not read remote logs from log_group: airflow-airflow-staging-Task log_stream: era5_pipeline/copy_air_temp_2m_task/2022-02-24T10_53_11.815092+00_00/3.log.
This error makes it seem like a permissions issue exists with the IAM role being used by Airflow to read from Cloudwatch, however upon checking Cloudwatch, none of the log streams were ever created in the first place. AWS actually have a knowledge page regarding this error, as it is a known bug and they provide a few solutions which should solve it. You can read about this here: https://docs.aws.amazon.com/mwaa/latest/userguide/t-cloudwatch-cloudtrail-logs.html#t-task-fail-permission
None of the provided solutions worked for us, and after days of trying different things with AWS and ourselves, we managed to narrow down the problem to a specific custom dependency function which was being used in these DAGs:
default_args = build_default_args(**task_args)
The build_default_args function was causing this issue. We managed to find this out by systematically copying small segments of the DAG into a new file, and running them one iteration at a time, until we cam accross the problem line. We then tested without using this custom function and the DAG was able to run correctly.
Conclusion
After many many hours of troubleshooting the Cloudwatch error above, it seemed like AWS MWAA was going to be even more trouble than trying to migrate our own in-house hosted Airflow service. Especially with the added wild-goose chase caused by the fact that the error we were seeing is a known issue within AWS, and so even their own support enigneers believed it to be a problem on their side. The final realisation that it was actually a problem with some of our own custom code was disheartening, however is was a leaening experience for sure and that has allowed me to share that information with you, so that you might not get stuck on a similar issue.
Other than the above mentioned pitfall, the actual deployment process of MWAA and the uploading and parsing of DAG files, and access to the UI was fairly straightforward. Even the kubernetes authentication config is fairly trivial with the aws eks update-kubeconfig
command.
The pricing for MWAA is fairly good in my opinion for our use case, and when weighed up agains tthe time/cost of engineers to support and maintain in-house hosted Airflow, it made sense for us. So this was one of the deciding factors in our descision to migrate to the managed Airflow service. Your mileage may vary however. I am hoping to see more utilisation of MWAA in the future and hopefully it will prove to be a good descision to migrate.
Apache Airflow is an open-source platform used for workflow automation and scheduling. Many organizations use Apache Airflow to manage complex workflows and data pipelines. However, managing Airflow on-premises can be challenging, as it requires a lot of resources and expertise. That’s why many companies are migrating to managed services like AWS Managed Airflow.
Migrating to a managed service like AWS Managed Airflow can help organizations reduce the burden of managing and scaling their Airflow environment. This enables them to focus on their core business functions, while the managed service takes care of the infrastructure and maintenance. Additionally, a managed service like AWS Managed Airflow provides better scalability and availability.
Double Cloud is a platform that can help organizations build analytics applications with second-by-second processing using open-source technologies like ClickHouse and Kafka. Double Cloud’s expertise in managing complex infrastructure can be valuable during the migration journey from self-hosted Airflow to AWS Managed Airflow. By leveraging Double Cloud’s services, organizations can ensure a smooth and efficient migration, allowing them to reap the benefits of a managed service like AWS Managed Airflow.
In conclusion, migrating from self-hosted Apache Airflow to a managed service like AWS Managed Airflow can be a smart move for organizations looking to reduce their infrastructure management burden. By partnering with a platform like Double Cloud, organizations can make the migration journey smoother and more efficient, enabling them to focus on their core business functions. Visit https://double.cloud/ to learn more about Double Cloud’s services.