Athrun Data Intelligence


Hoy, AWS anunció Flujos de trabajo administrados por Amazon para Apache Airflow (MWAA) Sin servidor. Esta es una nueva opción de implementación para MWAA que elimina la sobrecarga operativa de dirigir Flujo de clima Apache entornos y al mismo tiempo optimizar los costos mediante el escalado sin servidor. Esta nueva proposición aborda los desafíos esencia que enfrentan los ingenieros de datos y los equipos de DevOps al organizar flujos de trabajo: escalabilidad operativa, optimización de costos y gobierno de comunicación.

Con MWAA sin servidor puede concentrarse en la dialéctica de su flujo de trabajo en emplazamiento de monitorear la capacidad aprovisionada. Ahora puede expedir sus flujos de trabajo de Airflow para su ejecución según un cronograma o bajo demanda, pagando solo por el tiempo de procesamiento existente utilizado durante la ejecución de cada tarea. El servicio maneja automáticamente todo el escalamiento de la infraestructura para que sus flujos de trabajo se ejecuten de forma efectivo independientemente de la carga.

Más allá de las operaciones simplificadas, MWAA Serverless presenta un maniquí de seguridad actualizado para control granular mediante Gobierno de comunicación e identidad de AWS (SOY). Cada flujo de trabajo ahora puede tener sus propios permisos de IAM y ejecutarse en una VPC de su referéndum para que pueda implementar controles de seguridad precisos sin crear entornos de Airflow separados. Este enfoque reduce significativamente los gastos generales de gobierno de seguridad y al mismo tiempo fortalece su postura de seguridad.

En esta publicación, demostramos cómo utilizar MWAA Serverless para crear e implementar soluciones escalables de automatización del flujo de trabajo. Analizamos ejemplos prácticos de creación e implementación de flujos de trabajo, configuración de la observabilidad a través de Amazon CloudWatchy convertir los DAG (gráficos acíclicos dirigidos) de Apache Airflow existentes al formato sin servidor. Además exploramos las mejores prácticas para dirigir flujos de trabajo sin servidor y le mostramos cómo implementar el monitoreo y el registro.

¿Cómo funciona MWAA sin servidor?

MWAA Serverless procesa las definiciones de su flujo de trabajo y las ejecuta de forma efectivo en entornos Airflow administrados por servicios, escalando automáticamente los capital según las demandas del flujo de trabajo. MWAA Serverless utiliza el Servicio de contenedor elástico de Amazon (Amazon ECS) para ejecutar cada tarea individual en su propio contenedor ECS Fargate, ya sea en su VPC o en una VPC administrada por servicio. Luego, esos contenedores se comunican con su clúster de Airflow asignado utilizando la API de tareas de Airflow 3.



Figura 1: Bloque de Amazon MWAA

MWAA Serverless utiliza archivos de configuración YAML declarativos basados ​​en el popular código rajado Industria DAG formato para mejorar la seguridad a través del aislamiento de tareas. Tiene dos opciones para crear estas definiciones de flujo de trabajo:

Este enfoque declarativo proporciona dos beneficios esencia. Primero, cubo que MWAA Serverless lee las definiciones de flujo de trabajo de YAML, puede determinar la programación de tareas sin ejecutar ningún código de flujo de trabajo. En segundo emplazamiento, esto permite a MWAA Serverless otorgar permisos de ejecución solo cuando se ejecutan tareas, en emplazamiento de requerir permisos amplios a nivel de flujo de trabajo. El resultado es un entorno más seguro donde los permisos de las tareas tienen un radio preciso y un tiempo constreñido.

Consideraciones de servicio para MWAA Serverless

MWAA Serverless tiene las siguientes limitaciones que debe considerar al arriesgarse entre implementaciones de MWAA aprovisionadas y sin servidor:

  • Soporte del cirujano
    • MWAA Serverless solo admite operadores del paquete de proveedores de Amazon.
    • Para ejecutar código o scripts personalizados, deberá utilizar los servicios de AWS, como:
  • Interfaz de sucesor
    • MWAA Serverless funciona sin utilizar la interfaz web de Airflow.
    • Para el seguimiento y la gobierno del flujo de trabajo, ofrecemos integración con Amazon CloudWatch y AWS CloudTrail.

Trabajar con MWAA sin servidor

Complete los siguientes requisitos previos y pasos para usar MWAA Serverless.

Requisitos previos

Ayer de comenzar, verifique que cumpla con los siguientes requisitos:

  • Entrada y permisos
    • Un cuenta de AWS
    • Interfaz de carrera de comandos de AWS (AWS CLI) interpretación 2.31.38 o posterior instalada y configurada
    • Los permisos adecuados para crear y modificar roles y políticas de IAM, incluidos los siguientes permisos de IAM requeridos:
      • airflow-serverless:CreateWorkflow
      • airflow-serverless:DeleteWorkflow
      • airflow-serverless:GetTaskInstance
      • airflow-serverless:GetWorkflowRun
      • airflow-serverless:ListTaskInstances
      • airflow-serverless:ListWorkflowRuns
      • airflow-serverless:ListWorkflows
      • airflow-serverless:StartWorkflowRun
      • airflow-serverless:UpdateWorkflow
      • iam:CreateRole
      • iam:DeleteRole
      • iam:DeleteRolePolicy
      • iam:GetRole
      • iam:PutRolePolicy
      • iam:UpdateAssumeRolePolicy
      • logs:CreateLogGroup
      • logs:CreateLogStream
      • logs:PutLogEvents
      • airflow:GetEnvironment
      • airflow:ListEnvironments
      • s3:DeleteObject
      • s3:GetObject
      • s3:ListBucket
      • s3:PutObject
      • s3:Sync
    • Entrada a un Cúmulo privada aparente de Amazon (VPC) con conectividad a internet
  • Servicios de AWS requeridos: por otra parte de MWAA Serverless, necesitará comunicación a los siguientes servicios de AWS:
    • Amazon MWAA para consentir a sus entornos de Airflow existentes
    • Amazon CloudWatch para ver registros
    • Amazon S3 para gobierno de archivos DAG y YAML
    • AWS IAM para controlar permisos
  • Entorno de ampliación
  • Requisitos adicionales
    • Confianza básica con los conceptos de Apache Airflow.
    • Comprensión de la sintaxis YAML
    • Conocimiento de los comandos de AWS CLI

Nota: A lo desprendido de esta publicación, utilizamos títulos de ejemplo que deberás reemplazar con los tuyos propios:

  • Reemplazar amzn-s3-demo-bucket con el nombre de su depósito S3
  • Reemplazar 111122223333 con su número de cuenta de AWS
  • Reemplazar us-east-2 con su región de AWS. MWAA Serverless está acondicionado en varias regiones de AWS. Compruebe el Directorio de servicios de AWS disponibles por región para disponibilidad coetáneo.

Creando su primer flujo de trabajo sin servidor

Comencemos definiendo un flujo de trabajo simple que obtenga una letanía de objetos S3 y escriba esa letanía en un archivo en el mismo depósito. Crea un nuevo archivo llamado simple_s3_test.yaml con el futuro contenido:

simples3test:
  dag_id: simples3test
  schedule: 0 0 * * *
  tasks:
    list_objects:
      operator: airflow.providers.amazon.aws.operators.s3.S3ListOperator
      bucket: 'amzn-s3-demo-bucket'
      prefix: ''
      retries: 0
    create_object_list:
      operator: airflow.providers.amazon.aws.operators.s3.S3CreateObjectOperator
      data: '{{ ti.xcom_pull(task_ids="list_objects", key="return_value") }}'
      s3_bucket: 'amzn-s3-demo-bucket'
      s3_key: 'filelist.txt'
      dependencies: (list_objects)

Para que se ejecute este flujo de trabajo, debe crear una función de ejecución que tenga permisos para enumerar y escribir en el depósito previo. El rol además debe poder asumirse desde MWAA Serverless. Los siguientes comandos CLI crean este rol y su política asociada:

aws iam create-role 
--role-name mwaa-serverless-access-role 
--assume-role-policy-document '{
    "Version": "2012-10-17",
    "Statement": (
      {
        "Effect": "Allow",
        "Principal": {
          "Service": (
            "airflow-serverless.amazonaws.com"
          )
        },
        "Action": "sts:AssumeRole"
      },
      {
        "Sid": "AllowAirflowServerlessAssumeRole",
        "Effect": "Allow",
        "Principal": {
          "Service": "airflow-serverless.amazonaws.com"
        },
        "Action": "sts:AssumeRole",
        "Condition": {
          "StringEquals": {
            "aws:SourceAccount": "${aws:PrincipalAccount}"
          },
          "ArnLike": {
            "aws:SourceArn": "arn:aws:*:*:${aws:PrincipalAccount}:workflow/*"
          }
        }
      }
    )
  }'

aws iam put-role-policy 
  --role-name mwaa-serverless-access-role 
  --policy-name mwaa-serverless-policy   
  --policy-document '{
	"Version": "2012-10-17",
	"Statement": (
		{
			"Sid": "CloudWatchLogsAccess",
			"Effect": "Allow",
			"Action": (
				"logs:CreateLogGroup",
				"logs:CreateLogStream",
				"logs:PutLogEvents"
			),
			"Resource": "*"
		},
		{
			"Sid": "S3DataAccess",
			"Effect": "Allow",
			"Action": (
				"s3:ListBucket",
				"s3:GetObject",
				"s3:PutObject"
			),
			"Resource": (
				"arn:aws:s3:::amzn-s3-demo-bucket",
				"arn:aws:s3:::amzn-s3-demo-bucket/*"
			)
		}
	)
}'

Luego copia su YAML DAG en el mismo depósito S3 y crea su flujo de trabajo basado en la respuesta Arn de la función previo.

aws s3 cp "simple_s3_test.yaml" 
s3://amzn-s3-demo-bucket/yaml/simple_s3_test.yaml

aws mwaa-serverless create-workflow 
--name simple_s3_test 
--definition-s3-location '{ "Bucket": "amzn-s3-demo-bucket", "ObjectKey": "yaml/simple_s3_test.yaml" }' 
--role-arn arn:aws:iam::111122223333:role/mwaa-serverless-access-role 
--region us-east-2

La salida del extremo comando devuelve un WorkflowARN valía, que luego utilizará para ejecutar el flujo de trabajo:

aws mwaa-serverless start-workflow-run 
--workflow-arn arn:aws:airflow-serverless:us-east-2:111122223333:workflow/simple_s3_test-abc1234def 
--region us-east-2

La salida devuelve un RunId valía, que luego usará para compulsar el estado de la ejecución del flujo de trabajo que acaba de ejecutar.

aws mwaa-serverless get-workflow-run 
--workflow-arn arn:aws:airflow-serverless:us-east-2:111122223333:workflow/simple_s3_test-abc1234def 
--run-id ABC123456789def 
--region us-east-2

Si necesita realizar un cambio en su YAML, puede copiarlo nuevamente a S3 y ejecutar el update-workflow dominio.

aws s3 cp "simple_s3_test.yaml" 
s3://amzn-s3-demo-bucket/yaml/simple_s3_test.yaml

aws mwaa-serverless update-workflow 
--workflow-arn arn:aws:airflow-serverless:us-east-2:111122223333:workflow/simple_s3_test-abc1234def 
--definition-s3-location '{ "Bucket": "amzn-s3-demo-bucket", "ObjectKey": "yaml/simple_s3_test.yaml" }' 
--role-arn arn:aws:iam::111122223333:role/mwaa-serverless-access-role 
--region us-east-2

Conversión de DAG de Python al formato YAML

AWS ha publicado un aparejo de conversión que utiliza el procesador Airflow DAG de código rajado para serializar los DAG de Python en el formato de taller YAML DAG. Para instalar, ejecuta lo futuro:

pip3 install python-to-yaml-dag-converter-mwaa-serverless
dag-converter convert source_dag.py --output output_yaml_folder

Por ejemplo, cree el futuro DAG y asígnele el nombre create_s3_objects.py:

from datetime import datetime
from airflow import DAG
from airflow.models.param import Param
from airflow.providers.amazon.aws.operators.s3 import S3CreateObjectOperator

default_args = {
    'start_date': datetime(2024, 1, 1),
    'retries': 0,
}

dag = DAG(
    'create_s3_objects',
    default_args=default_args,
    description='Create multiple S3 objects in a loop',
    schedule=None
)

# Set number of files to create
LOOP_COUNT = 3
s3_bucket="md-workflows-mwaa-bucket"
s3_prefix = 'test-files'

# Create multiple S3 objects using loop
last_task=None
for i in range(1, LOOP_COUNT + 1):  
    create_object = S3CreateObjectOperator(
        task_id=f'create_object_{i}',
        s3_bucket=s3_bucket,
        s3_key=f'{s3_prefix}/{i}.txt',
        data="{{ ds_nodash }}-{ lower }",
        replace=True,
        dag=dag
    )
    if last_task:
        last_task >> create_object
    last_task = create_object

Una vez que haya instalado python-to-yaml-dag-converter-mwaa-serverlessejecutas:

dag-converter convert "/path_to/create_s3_objects.py" --output "/path_to/yaml/"

Donde terminará la salida con:

YAML validation successful, no errors found

YAML written to /path_to/yaml/create_s3_objects.yaml

Y el YAML resultante se verá así:

create_s3_objects:
  dag_id: create_s3_objects
  params: {}
  default_args:
    start_date: '2024-01-01'
    retries: 0
  schedule: None
  tasks:
    create_object_1:
      operator: airflow.providers.amazon.aws.operators.s3.S3CreateObjectOperator
      aws_conn_id: aws_default
      data: '{{ ds_nodash }}-{ lower }'
      encrypt: false
      outlets: ()
      params: {}
      priority_weight: 1
      replace: true
      retries: 0
      retry_delay: 300.0
      retry_exponential_backoff: false
      s3_bucket: md-workflows-mwaa-bucket
      s3_key: test-files/1.txt
      task_id: create_object_1
      trigger_rule: all_success
      wait_for_downstream: false
      dependencies: ()
    create_object_2:
      operator: airflow.providers.amazon.aws.operators.s3.S3CreateObjectOperator
      aws_conn_id: aws_default
      data: '{{ ds_nodash }}-{ lower }'
      encrypt: false
      outlets: ()
      params: {}
      priority_weight: 1
      replace: true
      retries: 0
      retry_delay: 300.0
      retry_exponential_backoff: false
      s3_bucket: md-workflows-mwaa-bucket
      s3_key: test-files/2.txt
      task_id: create_object_2
      trigger_rule: all_success
      wait_for_downstream: false
      dependencies: (create_object_1)
    create_object_3:
      operator: airflow.providers.amazon.aws.operators.s3.S3CreateObjectOperator
      aws_conn_id: aws_default
      data: '{{ ds_nodash }}-{ lower }'
      encrypt: false
      outlets: ()
      params: {}
      priority_weight: 1
      replace: true
      retries: 0
      retry_delay: 300.0
      retry_exponential_backoff: false
      s3_bucket: md-workflows-mwaa-bucket
      s3_key: test-files/3.txt
      task_id: create_object_3
      trigger_rule: all_success
      wait_for_downstream: false
      dependencies: (create_object_2)
  catchup: false
  description: Create multiple S3 objects in a loop
  max_active_runs: 16
  max_active_tasks: 16
  max_consecutive_failed_dag_runs: 0

Tenga en cuenta que, oportuno a que la conversión YAML se realiza a posteriori del examen DAG, el onda que crea las tareas se ejecuta primero y la letanía estática de tareas resultante se escribe en el documento YAML con sus dependencias.

Migración de los DAG de un entorno MWAA a MWAA Serverless

Puede disfrutar un entorno MWAA aprovisionado para desarrollar y probar sus flujos de trabajo y luego pasarlos a una interpretación sin servidor para ejecutarlos de forma efectivo a escalera. Adicionalmente, si su entorno MWAA utiliza operadores MWAA Serverless compatibles, puede convertir todos los DAG del entorno a la vez. El primer paso es permitir que MWAA Serverless asuma la función de ejecución de MWAA a través de una relación de confianza. Esta es una operación única para cada rol de ejecución de MWAA y se puede realizar manualmente en la consola de IAM o mediante un comando de AWS CLI de la futuro forma:

MWAA_ENVIRONMENT_NAME="MyAirflowEnvironment"
MWAA_REGION=us-east-2

MWAA_EXECUTION_ROLE_ARN=$(aws mwaa get-environment --region $MWAA_REGION --name $MWAA_ENVIRONMENT_NAME --query 'Environment.ExecutionRoleArn' --output text )
MWAA_EXECUTION_ROLE_NAME=$(echo $MWAA_EXECUTION_ROLE_ARN | xargs basename) 
MWAA_EXECUTION_ROLE_POLICY=$(aws iam get-role --role-name $MWAA_EXECUTION_ROLE_NAME --query 'Role.AssumeRolePolicyDocument' --output json | jq '.Statement(0).Principal.Service += ("airflow-serverless.amazonaws.com") | .Statement(0).Principal.Service |= unique | .Statement += ({"Sid": "AllowAirflowServerlessAssumeRole", "Effect": "Allow", "Principal": {"Service": "airflow-serverless.amazonaws.com"}, "Action": "sts:AssumeRole", "Condition": {"StringEquals": {"aws:SourceAccount": "${aws:PrincipalAccount}"}, "ArnLike": {"aws:SourceArn": "arn:aws:*:*:${aws:PrincipalAccount}:workflow/*"}}})')

aws iam update-assume-role-policy --role-name $MWAA_EXECUTION_ROLE_NAME --policy-document "$MWAA_EXECUTION_ROLE_POLICY"

Ahora podemos recorrer cada DAG convertido con éxito y crear flujos de trabajo sin servidor para cada uno.

S3_BUCKET=$(aws mwaa get-environment --name $MWAA_ENVIRONMENT_NAME --query 'Environment.SourceBucketArn' --output text --region us-east-2 | cut -d':' -f6)

for file in /tmp/yaml/*.yaml; do MWAA_WORKFLOW_NAME=$(basename "$file" .yaml); 
      aws s3 cp "$file" s3://$S3_BUCKET/yaml/$MWAA_WORKFLOW_NAME.yaml --region us-east-2; 
      aws mwaa-serverless create-workflow --name $MWAA_WORKFLOW_NAME 
      --definition-s3-location "{"Bucket": "$S3_BUCKET", "ObjectKey": "yaml/$MWAA_WORKFLOW_NAME.yaml"}" --role-arn $MWAA_EXECUTION_ROLE_ARN  
      --region us-east-2  
      done

Para ver una letanía de sus flujos de trabajo creados, ejecute:

aws mwaa-serverless list-workflows --region us-east-2

Monitoreo y observabilidad

El estado de ejecución del flujo de trabajo sin servidor MWAA se devuelve a través del GetWorkflowRun función. Los resultados de eso devolverán detalles para esa ejecución en particular. Si hay errores en la definición del flujo de trabajo, se devuelven en RunDetail en el ErrorMessage campo como en el futuro ejemplo:

{
  "WorkflowVersion": "7bcd36ce4d42f5cf23bfee67a0f816c6",
  "RunId": "d58cxqdClpTVjeN",
  "RunType": "SCHEDULE",
  "RunDetail": {
    "ModifiedAt": "2025-11-03T08:02:47.625851+00:00",
    "ErrorMessage": "expected token ',', got 'create_test_table'",
    "TaskInstances": (),
    "RunState": "FAILED"
  }
}

Los flujos de trabajo que estén correctamente definidos, pero cuyas tareas fallen, volverán "ErrorMessage": "Workflow execution failed":

{
  "WorkflowVersion": "0ad517eb5e33deca45a2514c0569079d",
  "RunId": "ABC123456789def",
  "RunType": "SCHEDULE",
  "RunDetail": {
    "StartedOn": "2025-11-03T13:12:09.904466+00:00",
    "CompletedOn": "2025-11-03T13:13:57.620605+00:00",
    "ModifiedAt": "2025-11-03T13:16:08.888182+00:00",
    "Duration": 107,
    "ErrorMessage": "Workflow execution failed",
    "TaskInstances": (
      "ex_5496697b-900d-4008-8d6f-5e43767d6e36_create_bucket_1"
    ),
    "RunState": "FAILED"
  },
}

Los registros de tareas sin servidor de MWAA se almacenan en el colección de registros de CloudWatch /aws/mwaa-serverless// (dónde / es la misma esclavitud que la identificación única del flujo de trabajo en el ARN del flujo de trabajo). Para secuencias de registro de tareas específicas, deberá enumerar las tareas para la ejecución del flujo de trabajo y luego obtener la información de cada tarea. Puede combinar estas operaciones en un solo comando CLI.

aws mwaa-serverless list-task-instances 
  --workflow-arn arn:aws:airflow-serverless:us-east-2:111122223333:workflow/simple_s3_test-abc1234def 
  --run-id ABC123456789def 
  --region us-east-2 
  --query 'TaskInstances().TaskInstanceId' 
  --output text | xargs -n 1 -I {} aws mwaa-serverless get-task-instance 
  --workflow-arn arn:aws:airflow-serverless:us-east-2:111122223333:workflow/simple_s3_test-abc1234def 
  --run-id ABC123456789def 
  --task-instance-id {} 
  --region us-east-2 
  --query '{Status: Status, StartedAt: StartedAt, LogStream: LogStream}'

Lo que daría como resultado lo futuro:

{
    "Status": "SUCCESS",
    "StartedAt": "2025-10-28T21:21:31.753447+00:00",
    "LogStream": "//aws/mwaa-serverless/simple_s3_test_3-abc1234def//workflow_id=simple_s3_test-abc1234def/run_id=ABC123456789def/task_id=list_objects/attempt=1.log"
}
{
    "Status": "FAILED",
    "StartedAt": "2025-10-28T21:23:13.446256+00:00",
    "LogStream": "//aws/mwaa-serverless/simple_s3_test_3-abc1234def//workflow_id=simple_s3_test-abc1234def/run_id=ABC123456789def/task_id=create_object_list/attempt=1.log"
}

En ese momento, utilizaría CloudWatch. LogStream salida para depurar su flujo de trabajo.

Puede ver y dirigir sus flujos de trabajo en el Consola sin servidor Amazon MWAA:

Para ver un ejemplo que crea métricas detalladas y un panel de monitoreo utilizando AWS Lambda, Amazon CloudWatch, AmazonDynamoDBy Puente de eventos de Amazonrevise el ejemplo en este repositorio de GitHub.

Expurgar capital

Para evitar incurrir en cargos continuos, siga estos pasos para desterrar todos los capital creados durante este tutorial:

  1. Eliminar flujos de trabajo sin servidor MWAA: ejecute este comando de AWS CLI para eliminar todos los flujos de trabajo:
    aws mwaa-serverless list-workflows --query 'Workflows(*).WorkflowArn' --output text | while read -r workflow; do aws mwaa-serverless delete-workflow --workflow-arn $workflow done

  2. Elimine las funciones y políticas de IAM creadas para este tutorial:
    aws iam delete-role-policy --role-name mwaa-serverless-access-role --policy-name mwaa-serverless-policy

  3. Elimine las definiciones de flujo de trabajo YAML de su depósito S3:
    aws s3 rm s3://amzn-s3-demo-bucket/yaml/ --recursive

A posteriori de completar estos pasos, verifique en la Consola de distribución de AWS que todos los capital se hayan eliminado correctamente. Recuerde que los registros de CloudWatch se conservan de forma predeterminada y es posible que deba eliminarlos por separado si desea eliminar todos los rastros de las ejecuciones de su flujo de trabajo.

Si encuentra algún error durante la pureza, verifique que tenga los permisos necesarios y que existan capital ayer de intentar eliminarlos. Algunos capital pueden tener dependencias que requieran su aniquilación en un orden específico.

Conclusión

En esta publicación, exploramos Amazon MWAA Serverless, una nueva opción de implementación que simplifica la distribución del flujo de trabajo de Apache Airflow. Demostramos cómo crear flujos de trabajo utilizando definiciones YAML, convertir DAG de Python existentes al formato sin servidor y monitorear sus flujos de trabajo.

MWAA Serverless ofrece varias ventajas esencia:

  • Sin gastos generales de aprovisionamiento
  • Maniquí de precios de cuota por uso
  • Escalado forzoso basado en las demandas del flujo de trabajo
  • Seguridad mejorada a través de permisos granulares de IAM
  • Definiciones de flujo de trabajo simplificadas usando YAML

Para obtener más información sobre MWAA Serverless, revise la documentación.


Sobre los autores

Juan Jackson

Juan Jackson

John Tiene más de 25 primaveras de experiencia en software como desarrollador, arquitecto de sistemas y apoderado de productos tanto en nuevas empresas como en grandes corporaciones y es el apoderado principal de productos de AWS responsable de Amazon MWAA.

Deja una respuesta

Tu dirección de correo electrónico no será publicada. Los campos obligatorios están marcados con *