Cómo automatizar los pipelines de PySpark en AWS EMR con Airflow

Automatización de pipelines de PySpark en AWS EMR con Airflow.

Optimizando la orquestación de flujos de trabajo de big data

Foto por Tom Fisk en Pexels

Introducción

En el panorama dinámico de la ingeniería y análisis de datos, construir tuberías escalables y automatizadas es fundamental.

Los entusiastas de Spark que han estado trabajando con Airflow durante un tiempo podrían preguntarse:

¿Cómo ejecutar un trabajo de Spark en un clúster remoto utilizando Airflow?

¿Cómo automatizar las tuberías de Spark con AWS EMR y Airflow?

En este tutorial vamos a integrar estas dos tecnologías mostrando cómo:

  1. Configurar y obtener parámetros esenciales desde la interfaz de Airflow.
  2. Crear funciones auxiliares para generar automáticamente el comando preferido spark-submit.
  3. Utilizar el método EmrAddStepsOperator() de Airflow para construir una tarea que envíe y ejecute un trabajo de PySpark en EMR
  4. Utilizar el método EmrStepSensor() de Airflow para monitorear la ejecución del script.

El código utilizado en este tutorial está disponible en GitHub.

Requisitos previos

  • Una cuenta de AWS con un bucket de S3 y un clúster de EMR configurados en la misma región (en este caso eu-north-1). El clúster de EMR debe estar disponible y en estado WAITING. En nuestro caso, se ha llamado emr-cluster-tutorial:
Foto por el autor (Clúster EMR personal)
  • Algunos datos de balances simulados ya disponibles en el bucket de S3 bajo la carpeta src/balances. Los datos se pueden generar y escribir en la ubicación utilizando el script de productor de datos.
  • Los JARs necesarios ya deberían haberse descargado de Maven y estar disponibles en el bucket de S3.
  • Docker instalado y en ejecución en la máquina local con 4-6 GB de memoria asignada.

Arquitectura

El objetivo es escribir algunos datos simulados en formato parquet en un bucket de S3 y luego construir un DAG que:

  • Obtenga la configuración requerida desde la interfaz de Airflow;
  • Suba un script de pyspark al mismo bucket de S3;