Aprovechando el aprendizaje automático en Big Data con PySpark en AWS
Aprendizaje automático en Big Data con PySpark en AWS
Nota del editor: Suman Debnath es un ponente en ODSC APAC este 22-23 de agosto. ¡Asegúrate de ver su charla, “Construye modelos de clasificación y regresión con Spark en AWS”, allí!
En el arena incesantemente dinámico de la ciencia de datos, discernir y aplicar los instrumentos adecuados puede dar forma significativa a los resultados de tus iniciativas de aprendizaje automático. ¡Un cordial saludo a todos los entusiastas de la ciencia de datos! Me considero afortunado de tener la oportunidad de hablar en la próxima conferencia ODSC APAC programada para el 22 de agosto de 2023. Mi presentación se centrará en el desarrollo de modelos de clasificación y regresión utilizando PySpark en AWS.
Comprendiendo la sesión
En esta sesión interesante e interactiva, profundizaremos en PySpark MLlib, un recurso invaluable en el campo del aprendizaje automático, y exploraremos cómo se pueden implementar varios algoritmos de clasificación utilizando AWS Glue/EMR como plataforma.
Nuestro enfoque será práctico, con énfasis en la aplicación práctica y la comprensión de conceptos esenciales de aprendizaje automático. Los asistentes serán introducidos a una variedad de algoritmos de aprendizaje automático, poniendo un enfoque en la regresión logística, una técnica potente de aprendizaje supervisado para resolver problemas de clasificación binaria.
- La Revolución de la IA Explorando Aplicaciones y Casos de Uso en la...
- Cómo no ser un científico basura
- Aprendizaje auto-supervisado y Transformadores explicación del artí...
Pero esta sesión va más allá de solo conceptos y algoritmos. También navegaremos a través de técnicas críticas de preprocesamiento de datos, esenciales para crear modelos de aprendizaje automático efectivos. Al final de la sesión, los participantes adquirirán habilidades para manejar valores faltantes, modificar tipos de datos de columnas y dividir sus datos en conjuntos de entrenamiento y prueba. Esta experiencia práctica se llevará a cabo dentro del versátil entorno de AWS Glue/EMR.
¿Qué ganarás?
Esta sesión está diseñada para ayudar a los participantes a obtener una comprensión profunda de:
- PySpark MLlib
- Técnicas de aprendizaje no supervisado
- Varios tipos de algoritmos de clasificación
- Implementación de clasificadores de regresión logística
- Preprocesamiento de datos utilizando PySpark en AWS utilizando AWS Glue y Amazon EMR
- Construcción de modelos con PySpark en AWS
Si eres un ingeniero de datos, científico de datos o entusiasta del aprendizaje automático que busca comenzar con Machine Learning con Apache Spark en AWS, esta sesión es perfecta para ti.
Ahora, permítenos darte un adelanto de lo que te espera (el repositorio de código en GitHub se puede encontrar aquí).
Seleccionamos un conjunto de datos que consta de 20,057 nombres de platos, cada uno detallado con 680 columnas que caracterizan la lista de ingredientes, el contenido nutricional y la categoría del plato. Nuestro objetivo colectivo aquí es predecir si un plato es un postre. Esta es una pregunta sencilla y en su mayoría clara: la mayoría de nosotros probablemente podemos clasificar un plato como postre o no simplemente leyendo su nombre, lo que lo convierte en un excelente candidato para un modelo de aprendizaje automático simple.
Paso 1: Importando las bibliotecas necesarias
El primer paso implica importar las bibliotecas necesarias, incluidas las funciones y tipos de PySpark SQL.
import pyspark.sql.functions as F
import pyspark.sql.types as T
from pyspark.ml.feature import Imputer, MinMaxScaler, VectorAssembler
Paso 2: Preprocesamiento de datos y EDA (Análisis exploratorio de datos)
Cargamos el conjunto de datos CSV de recetas de alimentos utilizando la función read.csv de Spark. El parámetro inferSchema se establece en True para inferir los tipos de datos de las columnas, y header se establece en True para usar la primera fila como encabezados.
# Cargando los datos
dataset = 's3://fcc-spark-example/dataset/2023/recipes_dataset/epi_r.csv'
food = (
spark
.read
.csv(dataset, inferSchema=True, header=True)
)
# Sanitizando los nombres de las columnas
def sanitize_column_name(name):
answer = name
for i, j in ((" ", "_"), ("-", "_"), ("/", "_"), ("&", "and")):
answer = answer.replace(i, j)
return "".join(
[
char
for char in answer
if char.isalpha() or char.isdigit() or char == "_"
]
)
food = food.toDF(*[sanitize_column_name(name) for name in food.columns])
Esta parte del script sanea los nombres de las columnas reemplazando espacios, guiones, barras y ampersands con guiones bajos. También elimina caracteres no alfanuméricos.
# Filtrando los datos
food = food.where(
(
F.col("cakeweek").isin([0.0, 1.0])
| F.col("cakeweek").isNull()
)
& (
F.col("wasteless").isin([0.0, 1.0])
| F.col("wasteless").isNull()
)
)
Aquí filtramos los datos para mantener solo las filas donde las columnas cakeweek y wasteless tienen valores de 0.0 o 1.0, o son nulos.
# Definiendo las columnas identificadoras, continuas, objetivo y binarias
IDENTIFIERS = ["title"]
CONTINUOUS_COLUMNS = [
"rating",
"calories",
"protein",
"fat",
"sodium",
]
TARGET_COLUMN = ["dessert"]
BINARY_COLUMNS = [
x
for x in food.columns
if x not in CONTINUOUS_COLUMNS
and x not in TARGET_COLUMN
and x not in IDENTIFIERS
]
En esta sección, definimos qué columnas son identificadoras, variables continuas, variables objetivo y variables binarias.
# Manejando valores faltantes
food = food.dropna(
how="all",
subset=[x for x in food.columns if x not in IDENTIFIERS],
)
food = food.dropna(subset=TARGET_COLUMN)
food = food.fillna(0.0, subset=BINARY_COLUMNS)
Manejamos los valores faltantes eliminando filas que tienen todos nulos (excluyendo columnas identificadoras), eliminando filas con nulos en la columna objetivo y completando nulos en columnas binarias con 0.0.
# Convirtiendo números en cadena a tipo float y limitando variables continuas
from typing import Optional
@F.udf(T.BooleanType())
def is_a_number(value: Optional[str]) -> bool:
if not value:
return True
try:
_ = float(value)
except ValueError:
return False
return True
for column in ["rating", "calories"]:
food = food.where(is_a_number(F.col(column)))
food = food.withColumn(column, F.col(column).cast(T.DoubleType()))
maximum = {
"calories": 3203.0,
"protein": 173.0,
"fat": 207.0,
"sodium": 5661.0,
}
for k, v in maximum.items():
food = food.withColumn(
k,
F.when(F.isnull(F.col(k)), F.col(k)).otherwise(
F.least(F.col(k), F.lit(v))
),
)
En esta parte, creamos una función definida por el usuario llamada is_a_number para verificar si una cadena se puede convertir a tipo float. Usamos esta función para filtrar los valores no numéricos en las columnas “rating” y “calories” y luego las convertimos a tipo double.
Luego limitamos los valores de las variables continuas “calories”, “protein”, “fat” y “sodium” a máximos especificados para manejar posibles valores atípicos.
# Calculando la suma de cada columna binaria
inst_sum_of_binary_columns = [
F.sum(F.col(x)).alias(x) for x in BINARY_COLUMNS
]
# Seleccionando las sumas de las columnas binarias y convirtiendo el resultado en un diccionario
sum_of_binary_columns = (
food.select(*inst_sum_of_binary_columns).head().asDict()
)
# Contando el número total de filas
num_rows = food.count()
# Identificando las características raras
too_rare_features = [
k
for k, v in sum_of_binary_columns.items()
if v < 10 or v > (num_rows - 10)
]
# Excluyendo las características raras de las columnas binarias
BINARY_COLUMNS = list(set(BINARY_COLUMNS) - set(too_rare_features))
A continuación, calculamos la suma de cada columna binaria y convertimos el resultado en un diccionario. Luego, identificamos las características “raras”: aquellas que son verdaderas menos de 10 veces o verdaderas en todas menos de 10 instancias, y las eliminamos de nuestras columnas binarias.
# Creando nuevas características
food = food.withColumn(
"protein_ratio", F.col("protein") * 4 / F.col("calories")
).withColumn(
"fat_ratio", F.col("fat") * 9 / F.col("calories")
)
# Manejando los valores faltantes en las nuevas características
food = food.fillna(0.0, subset=["protein_ratio", "fat_ratio"])
# Agregando las nuevas características a las columnas continuas
CONTINUOUS_COLUMNS += ["protein_ratio", "fat_ratio"]
Aquí creamos nuevas características “protein_ratio” y “fat_ratio” que representan la proporción de proteínas y grasas en relación a las calorías, respectivamente. Rellenamos los valores faltantes en estas nuevas características con 0.0 y las agregamos a nuestras columnas continuas.
# Rellenando valores faltantes en las columnas continuas
OLD_COLS = ["calories", "protein", "fat", "sodium"]
NEW_COLS = ["calories_i", "protein_i", "fat_i", "sodium_i"]
imputer = Imputer(
strategy="mean",
inputCols=OLD_COLS,
outputCols=NEW_COLS,
)
imputer_model = imputer.fit(food)
# Actualizando las columnas continuas
CONTINUOUS_COLUMNS = (
list(set(CONTINUOUS_COLUMNS) - set(OLD_COLS)) + NEW_COLS
)
# Aplicando el modelo de imputación a los datos
food = imputer_model.transform(food)
En esta sección, imputamos los valores faltantes en las columnas “calories”, “protein”, “fat” y “sodium” con sus valores promedio utilizando el Imputer de Spark. Luego actualizamos nuestra lista de columnas continuas para incluir las imputadas.
# Ensamblado de características continuas en un solo vector
CONTINUOUS_NB = [x for x in CONTINUOUS_COLUMNS if "ratio" not in x]
continuous_assembler = VectorAssembler(
inputCols=CONTINUOUS_NB, outputCol="continuous"
)
food_features = continuous_assembler.transform(food)
A continuación, utilizamos el VectorAssembler para ensamblar nuestras características continuas en una sola columna de vector llamada “continuous”.
# Escalando las características continuas
continuous_scaler = MinMaxScaler(
inputCol="continuous",
outputCol="continuous_scaled",
)
food_features = continuous_scaler.fit(food_features).transform(
food_features
)
Finalmente, escalamos las características continuas al rango [0, 1] utilizando el MinMaxScaler, ajustándolo a nuestros datos y transformando nuestros datos. ¡En este punto, nuestro conjunto de datos está listo para tareas de aprendizaje automático!
Ahora estamos listos para realizar el trabajo de entrenamiento de aprendizaje automático.
Paso 3: Entrenar, Probar y Evaluar el Modelo
Una vez que los datos se procesan y transforman, podemos dividirlos en un conjunto de entrenamiento y un conjunto de pruebas. Después de entrenar el modelo, podemos evaluar su rendimiento utilizando varias métricas. En esta sección, construimos un pipeline de aprendizaje automático con los estimadores que utilizamos para nuestro programa de preparación de características de predicción de postres y agregamos el paso de modelado a la mezcla.
from pyspark.ml import Pipeline
import pyspark.ml.feature as MF
imputer = MF.Imputer(
strategy="mean",
inputCols=["calories", "protein", "fat", "sodium"],
outputCols=["calories_i", "protein_i", "fat_i", "sodium_i"],
)
continuous_assembler = MF.VectorAssembler(
inputCols=["rating", "calories_i", "protein_i", "fat_i", "sodium_i"],
outputCol="continuous",
)
continuous_scaler = MF.MinMaxScaler(
inputCol="continuous",
outputCol="continuous_scaled",
)
food_pipeline = Pipeline(
stages=[imputer, continuous_assembler, continuous_scaler]
)
Podemos ensamblar el conjunto de datos final con el tipo de columna vector.
preml_assembler = MF.VectorAssembler(
inputCols=BINARY_COLUMNS
+ ["continuous_scaled"]
+ ["protein_ratio", "fat_ratio"],
outputCol="features",
)
food_pipeline.setStages(
[imputer, continuous_assembler, continuous_scaler, preml_assembler]
)
food_pipeline_model = food_pipeline.fit(food)
food_features = food_pipeline_model.transform(food)
¡Nuestro marco de datos está listo para el aprendizaje automático! Tenemos varios registros, cada uno con:
- Una columna objetivo (o etiqueta), dessert, que contiene una entrada binaria (1.0 si la receta es un postre, 0.0 de lo contrario)
- Un vector de características, llamado features, que contiene toda la información con la que queremos entrenar nuestro modelo de aprendizaje automático
Podemos mostrar los resultados predichos:
food_features.select("title", "dessert", "features").show(30, truncate=30)
Ahora vamos a entrenar un modelo de aprendizaje automático utilizando un clasificador LogisticRegression:
from pyspark.ml.classification import LogisticRegression
lr = LogisticRegression(
featuresCol="features", labelCol="dessert", predictionCol="prediction"
)
food_pipeline.setStages(
[
imputer,
continuous_assembler,
continuous_scaler,
preml_assembler,
lr,
]
)
# Dividir nuestro marco de datos en entrenamiento y prueba
train, test = food.randomSplit([0.7, 0.3], 13)
train.cache()
food_pipeline_model = food_pipeline.fit(train)
results = food_pipeline_model.transform(test)
Ahora evaluemos el modelo y veamos la matriz de confusión
results.select("prediction", "rawPrediction", "probability").show(3, False)
# Crear una matriz de confusión para nuestro modelo utilizando pivot()
results.groupby("dessert").pivot("prediction").count().show()
Finalmente, podemos calcular la precisión y la recuperación de nuestro modelo:
lr_model = food_pipeline_model.stages[-1]
metrics = lr_model.evaluate(results.select("title", "dessert", "features"))
print(f"Precisión del modelo: {metrics.precisionByLabel[1]}")
print(f"Recuperación del modelo: {metrics.recallByLabel[1]}")
Tenga en cuenta que el script completo se ha simplificado con fines de este tutorial. Para comprender de manera completa las aplicaciones prácticas, incluido un recorrido detallado del código desde la preparación de datos hasta la implementación del modelo, únase a nosotros en la conferencia ODSC APAC 2023.
Este breve tutorial le ha dado una idea de lo que se tratará en la sesión de ODSC. Al asistir a la sesión, podrá explorar estos temas de manera más profunda y comprender las complejidades de PySpark MLlib. El objetivo principal es capacitar a los entusiastas y profesionales de la ciencia de datos para aprovechar todo el potencial de Spark MLlib en sus proyectos de aprendizaje automático.
Recuerde que la clave para dominar cualquier habilidad radica en el aprendizaje constante y la implementación práctica. Así que prepárese y sumérjase en el fascinante mundo del aprendizaje automático con Spark en AWS en la conferencia ODSC. ¡Esperamos verlo allí!
Sobre el autor:
Suman Debnath es un Principal Developer Advocate (Data Engineering) en Amazon Web Services, enfocándose principalmente en Data Engineering, Data Analysis y Machine Learning. Es apasionado de los sistemas distribuidos a gran escala y es un ferviente fanático de Python. Su experiencia está en el rendimiento del almacenamiento y el desarrollo de herramientas, donde ha desarrollado varias herramientas de evaluación y monitoreo de rendimiento.