Balanceo de carga efectivo con Ray en Amazon SageMaker
Balanceo de carga con Ray en Amazon SageMaker
Un método para aumentar la eficiencia del entrenamiento de DNN y reducir los costos de entrenamiento
En publicaciones anteriores (por ejemplo, aquí) hemos ampliado sobre la importancia de perfilar y optimizar el rendimiento de tus cargas de trabajo de entrenamiento de DNN. El entrenamiento de modelos de aprendizaje profundo, especialmente los grandes, puede ser costoso. Tu capacidad para maximizar la utilización de tus recursos de entrenamiento de manera que acelere la convergencia de tu modelo y minimice los costos de entrenamiento puede ser un factor decisivo en el éxito de tu proyecto. La optimización del rendimiento es un proceso iterativo en el que identificamos y abordamos los cuellos de botella de rendimiento en nuestra aplicación, es decir, las partes de nuestra aplicación que nos impiden aumentar la utilización de recursos y/o acelerar el tiempo de ejecución.
Esta publicación es la tercera de una serie de publicaciones que se centran en uno de los cuellos de botella de rendimiento más comunes que encontramos al entrenar modelos de aprendizaje profundo, el cuello de botella de preprocesamiento de datos. Un cuello de botella de preprocesamiento de datos ocurre cuando nuestra GPU (o acelerador alternativo) -normalmente el recurso más costoso en nuestra configuración de entrenamiento- se encuentra inactiva mientras espera la entrada de datos de los recursos de la CPU sobrecargados.

En nuestra primera publicación sobre el tema, discutimos y demostramos diferentes formas de abordar este tipo de cuello de botella, incluyendo:
- Elegir una instancia de entrenamiento con una relación de cómputo de CPU a GPU más adecuada para tu carga de trabajo,
- Mejorar el equilibrio de carga de trabajo entre la CPU y la GPU moviendo algunas de las operaciones de la CPU a la GPU, y
- Transferir parte de la computación de la CPU a dispositivos auxiliares de CPU.
Demostramos la tercera opción utilizando la API de TensorFlow Data Service, una solución específica para TensorFlow, en la que se puede transferir parte del procesamiento de datos de entrada a otros dispositivos utilizando gRPC como protocolo de comunicación subyacente.
En nuestra segunda publicación, propusimos una solución basada en gRPC de propósito general para utilizar trabajadores de CPU auxiliares y la demostramos con un modelo PyTorch de juguete. Aunque requería un poco más de codificación y ajuste manual que la API de TensorFlow Data Service, la solución proporcionaba una mayor robustez y permitía la misma optimización en el rendimiento de entrenamiento.
- ¿Qué es Langchain y los modelos de lenguaje grandes?
- Conversa con tus requisitos Mi viaje aplicando IA generativa (LLM) ...
- Ajuste de hiperparámetros mediante entrenamiento basado en la pobla...
Balanceo de carga con Ray
En esta publicación demostraremos un método adicional para utilizar trabajadores de CPU auxiliares que tiene como objetivo combinar la robustez de la solución de propósito general con la simplicidad y facilidad de uso de la API específica de TensorFlow. El método que demostraremos utilizará Ray Datasets de la biblioteca Ray Data. Aprovechando todo el poder de los sistemas de gestión de recursos y programación distribuida de Ray, Ray Data es capaz de ejecutar nuestra canalización de entrada de datos de entrenamiento de manera escalable y distribuida. En particular, configuraremos nuestro Ray Dataset de tal manera que la biblioteca detecte y utilice automáticamente todos los recursos de CPU disponibles para el preprocesamiento de los datos de entrenamiento. Además, envolveremos nuestro bucle de entrenamiento del modelo con un Ray AIR Trainer para permitir una ampliación sin problemas a una configuración multi-GPU.
Implementación de un clúster Ray en Amazon SageMaker
Un requisito previo para utilizar el marco de Ray y las utilidades que ofrece en un entorno de múltiples nodos es la implementación de un clúster Ray. En general, diseñar, implementar, administrar y mantener dicho clúster de cómputo puede ser una tarea desafiante y a menudo requiere un ingeniero devops dedicado (o un equipo de ingenieros). Esto puede ser un obstáculo insuperable para algunos equipos de desarrollo. En esta publicación demostraremos cómo superar este obstáculo utilizando el servicio de entrenamiento administrado de AWS, Amazon SageMaker. En particular, crearemos un clúster heterogéneo de SageMaker con instancias de GPU e instancias de CPU y lo utilizaremos para implementar un clúster Ray al inicio. Luego ejecutaremos la aplicación de entrenamiento de Ray AIR en este clúster Ray confiando en la capacidad de Ray para realizar un balanceo de carga efectivo en todos los recursos del clúster. Cuando se complete la aplicación de entrenamiento, el clúster Ray se eliminará automáticamente. El uso de SageMaker de esta manera nos permite implementar y utilizar un clúster Ray sin la sobrecarga que normalmente se asocia con la gestión de clústeres.
Ray es un marco poderoso que permite una amplia gama de cargas de trabajo de aprendizaje automático. En esta publicación demostraremos solo algunas de sus capacidades y API utilizando Ray versión 2.6.1. Esta publicación no debe usarse como reemplazo de la documentación de Ray. Asegúrese de consultar la documentación oficial para obtener el uso más apropiado y actualizado de las utilidades de Ray.
Antes de comenzar, un agradecimiento especial a Boruch Chalk por presentarme la biblioteca de datos de Ray y sus capacidades únicas.
Ejemplo de juguete
Para facilitar nuestra discusión, definiremos y entrenaremos un modelo de clasificación basado en Vision Transformer simple de PyTorch (2.0) que entrenaremos en un conjunto de datos sintético compuesto por imágenes y etiquetas aleatorias. La documentación de Ray AIR incluye una amplia variedad de ejemplos que demuestran cómo construir diferentes tipos de cargas de trabajo de entrenamiento utilizando Ray AIR. El guion que creamos aquí sigue aproximadamente los pasos descritos en el ejemplo de clasificador de imágenes de PyTorch.
Definición del conjunto de datos y el preprocesador de Ray
La API de entrenador de Ray AIR distingue entre el conjunto de datos en bruto y la canalización de preprocesamiento que se aplica a los elementos del conjunto de datos antes de alimentarlos al bucle de entrenamiento. Para nuestro conjunto de datos en bruto de Ray, creamos un simple rango de enteros del tamaño de num_records. A continuación, definimos el preprocesador que queremos aplicar a nuestro conjunto de datos. Nuestro preprocesador de Ray contiene dos componentes: el primero es un BatchMapper que mapea los enteros en bruto a pares de imágenes y etiquetas aleatorias. El segundo es un TorchVisionPreprocessor que realiza una transformación de torchvision en nuestros lotes aleatorios, que los convierte en tensores de PyTorch y aplica una serie de operaciones de desenfoque gaussiano. Las operaciones de desenfoque gaussiano tienen como objetivo simular una canalización de preprocesamiento de datos relativamente pesada. Los dos preprocesadores se combinan utilizando un preprocesador en cadena. La creación del conjunto de datos y el preprocesador de Ray se demuestra en el bloque de código siguiente:
importar rayfrom typing import Dict, Tupleimportar numpy como npimportar torchvision.transforms como transformsfrom ray.data.preprocessors import Chain, BatchMapper, TorchVisionPreprocessordef get_ds(batch_size, num_records): # crear un conjunto de datos tabulares en bruto de Ray ds = ray.data.range(num_records) # mapear un entero a un par de imágenes y etiquetas aleatorias def synthetic_ds(batch: Tuple[int]) -> Dict[str, np.ndarray]: etiquetas = batch['id'] tamaño_lote = len(etiquetas) imágenes = np.random.randn(tamaño_lote, 224, 224, 3).astype(np.float32) etiquetas = np.array([etiqueta % 1000 for etiqueta in etiquetas]).astype( dtype=np.int64) return {"imagen": imágenes, "etiqueta": etiquetas} # el primer paso del preprocesador mapea lotes de enteros a # pares de imágenes y etiquetas aleatorias datos_sintéticos = BatchMapper(synthetic_ds, batch_size=tamaño_lote, batch_format="numpy") # definimos una transformación de torchvision que convierte los pares numpy en # tensores y luego aplica una serie de desenfoques gaussianos para simular # un preprocesamiento pesado transformación = transforms.Compose( [transforms.ToTensor()] + [transforms.GaussianBlur(11)]*10 ) # el segundo paso del preprocesador aplica la transformación de torchvision preprocesador_vision = TorchVisionPreprocessor(columns=["imagen"], transform=transformación) # combinar los pasos de preprocesamiento preprocesador = Chain(datos_sintéticos, preprocesador_vision) return ds, preprocesador
Tenga en cuenta que la canalización de datos de Ray utilizará automáticamente todas las CPU disponibles en el clúster de Ray. Esto incluye los recursos de CPU que se encuentran en la instancia de GPU, así como los recursos de CPU de cualquier instancia auxiliar adicional en el clúster.
Definición del bucle de entrenamiento
El siguiente paso es definir la secuencia de entrenamiento que se ejecutará en cada uno de los trabajadores de entrenamiento (por ejemplo, GPUs). Primero definimos el modelo utilizando el paquete Python popular timm (0.6.13) y lo envolvemos utilizando la API train.torch.prepare_model. A continuación, extraemos la partición adecuada del conjunto de datos y definimos un iterador que produce lotes de datos con el tamaño de lote solicitado y los copia al dispositivo de entrenamiento. Luego viene el propio bucle de entrenamiento, que está compuesto por código estándar de PyTorch. Cuando salimos del bucle, informamos la métrica de pérdida resultante. La secuencia de entrenamiento por trabajador se muestra en el bloque de código siguiente:
importar timefrom ray import trainfrom ray.air import sessionimportar torch.nn como nnimportar torch.optim como optimfrom timm.models.vision_transformer import VisionTransformer# construir un modelo ViT usando timmdef build_model(): return VisionTransformer()# definir el bucle de entrenamiento por trabajadordef train_loop_per_worker(config): # envolver el modelo de PyTorch con un objeto Ray modelo = train.torch.prepare_model(build_model()) criterio = nn.CrossEntropyLoss() optimizador = optim.SGD(modelo.parameters(), lr=0.001, momentum=0.9) # obtener la partición adecuada del conjunto de datos conjunto_datos_entrenamiento = session.get_dataset_shard("train") # crear un iterador que devuelve lotes del conjunto de datos lotes_datos_entrenamiento = conjunto_datos_entrenamiento.iter_torch_batches( batch_size=config["batch_size"], prefetch_batches=config["prefetch_batches"], dispositivo=train.torch.get_device() ) t0 = time.perf_counter() para i, lote en enumerate(lotes_datos_entrenamiento): # obtener las entradas y etiquetas entradas, etiquetas = lote["imagen"], lote["etiqueta"] # poner los gradientes de los parámetros a cero optimizador.zero_grad() # adelante + atrás + optimizar salidas = modelo(entradas) pérdida = criterio(salidas, etiquetas) pérdida.backward() optimizador.step() # imprimir estadísticas si i % 100 == 99: # imprimir cada 100 mini-lotes tiempo_promedio = (time.perf_counter()-t0)/100 print(f"Iteración {i+1}: tiempo promedio por paso {tiempo_promedio:.3f}") t0 = time.perf_counter() métricas = dict(pérdida_en_ejecución=pérdida.item()) sesión.informar(métricas)
Definiendo el Entrenador de Ray Torch
Una vez que hemos definido nuestra tubería de datos y bucle de entrenamiento, podemos pasar a configurar el Ray TorchTrainer. Configuramos el Entrenador de manera que tenga en cuenta los recursos disponibles en el clúster. Específicamente, establecemos el número de trabajadores de entrenamiento de acuerdo con el número de GPUs y establecemos el tamaño del lote de acuerdo con la memoria disponible en nuestra GPU objetivo. Construimos nuestro conjunto de datos con el número de registros necesarios para entrenar durante exactamente 1000 pasos.
from ray.train.torch import TorchTrainerfrom ray.air.config import ScalingConfigdef train_model(): # configuraremos el número de trabajadores, el tamaño de nuestro # conjunto de datos y el tamaño del almacenamiento de datos según los # recursos disponibles num_gpus = int(ray.available_resources().get("GPU", 0)) # establecemos el número de trabajadores de entrenamiento según el número de GPUs num_workers = num_gpus if num_gpus > 0 else 1 # establecemos el tamaño del lote basado en la capacidad de memoria de la GPU de # la familia de instancias Amazon EC2 g5 batch_size = 64 # creamos un conjunto de datos sintético con suficientes datos para entrenar durante 1000 pasos num_records = batch_size * 1000 * num_workers ds, preprocessor = get_ds(batch_size, num_records) ds = preprocessor(ds) trainer = TorchTrainer( train_loop_per_worker=train_loop_per_worker, train_loop_config={"batch_size": batch_size}, datasets={"train": ds}, scaling_config=ScalingConfig(num_workers=num_workers, use_gpu=num_gpus > 0), ) trainer.fit()
Desplegar un Clúster de Ray y Ejecutar la Secuencia de Entrenamiento
Ahora definimos el punto de entrada de nuestro script de entrenamiento. Es aquí donde configuramos el clúster de Ray e iniciamos la secuencia de entrenamiento en el nodo principal. Usamos la clase Environment de la biblioteca sagemaker-training para descubrir las instancias en el clúster heterogéneo de SageMaker como se describe en este tutorial. Definimos el primer nodo del grupo de instancias de GPU como el nodo principal del clúster de Ray y ejecutamos el comando apropiado en todos los demás nodos para conectarlos al clúster. (Consulte la documentación de Ray para obtener más detalles sobre la creación de clústeres). Programamos el nodo principal para que espere hasta que todos los nodos se hayan conectado y luego iniciamos la secuencia de entrenamiento. Esto garantiza que Ray utilice todos los recursos disponibles al definir y distribuir las tareas subyacentes de Ray.
import timeimport subprocessfrom sagemaker_training import environmentif __name__ == "__main__": # utilizamos la clase Environment() para descubrir automáticamente el clúster de SageMaker env = environment.Environment() if env.current_instance_group == 'gpu' and \ env.current_instance_group_hosts.index(env.current_host) == 0: # el nodo principal inicia un clúster de Ray p = subprocess.Popen('ray start --head --port=6379', shell=True).wait() ray.init() # calculamos el número total de nodos en el clúster groups = env.instance_groups_dict.values() cluster_size = sum(len(v['hosts']) for v in list(groups)) # esperamos hasta que todos los nodos de SageMaker se hayan conectado al clúster de Ray connected_nodes = 1 while connected_nodes < cluster_size: time.sleep(1) resources = ray.available_resources().keys() connected_nodes = sum(1 for s in list(resources) if 'node' in s) # llamamos a la secuencia de entrenamiento train_model() # desmontamos el clúster de Ray p = subprocess.Popen("ray down", shell=True).wait() else: # los nodos trabajadores se conectan al nodo principal head = env.instance_groups_dict['gpu']['hosts'][0] p = subprocess.Popen( f"ray start --address='{head}:6379'", shell=True).wait() # utilidad para verificar si el clúster sigue activo def is_alive(): from subprocess import Popen p = Popen('ray status', shell=True) p.communicate()[0] return p.returncode # mantenemos el nodo activo hasta que el proceso en el nodo principal se complete while is_alive() == 0: time.sleep(10)
Entrenamiento en un Clúster Heterogéneo de Amazon SageMaker
Con nuestro script de entrenamiento completo, ahora tenemos la tarea de implementarlo en un Clúster Heterogéneo de Amazon SageMaker. Para hacer esto, seguimos los pasos descritos en este tutorial. Comenzamos creando un directorio source_dir en el que colocamos nuestro script train.py y un archivo requirements.txt que contiene los dos paquetes pip en los que nuestro script depende, timm y ray[air]. Estos se instalan automáticamente en cada uno de los nodos del clúster de SageMaker. Definimos dos grupos de instancias de SageMaker, el primero con una única instancia ml.g5.xlarge (que contiene 1 GPU y 4 vCPUs), y el segundo con una única instancia ml.c5.4xlarge (que contiene 16 vCPUs). Luego utilizamos el estimador de PyTorch de SageMaker para definir e implementar nuestro trabajo de entrenamiento en la nube.
from sagemaker.pytorch import PyTorch
from sagemaker.instance_group import InstanceGroup
cpu_group = InstanceGroup("cpu", "ml.c5.4xlarge", 1)
gpu_group = InstanceGroup("gpu", "ml.g5.xlarge", 1)
estimator = PyTorch(
entry_point='train.py',
source_dir='./source_dir',
framework_version='2.0.0',
role='',
py_version='py310',
job_name='hetero-cluster',
instance_groups=[gpu_group, cpu_group])
estimator.fit()
Resultados
En la tabla a continuación comparamos los resultados de tiempo de ejecución de nuestro script de entrenamiento en dos configuraciones diferentes: una instancia GPU ml.g5.xlarge individual y un clúster heterogéneo que contiene una instancia ml.g5.xlarge y una ml.c5.4xlarge. Evaluamos la utilización de recursos del sistema utilizando Amazon CloudWatch y estimamos el costo de entrenamiento utilizando los precios de Amazon SageMaker disponibles en el momento de escribir esto ($0.816 por hora para la instancia ml.c5.4xlarge y $1.408 para la instancia ml.g5.xlarge).

La utilización relativamente alta de la CPU combinada con la baja utilización de la GPU en el experimento de instancia individual indica un cuello de botella de rendimiento en la canalización de preprocesamiento de datos. Estos problemas se resuelven claramente al pasar al clúster heterogéneo. No solo aumenta la utilización de la GPU, sino también la velocidad de entrenamiento. En general, la eficiencia en términos de precio del entrenamiento aumenta en un 23%.
Debemos enfatizar que estos experimentos de juguete se crearon únicamente con el propósito de demostrar las funciones de equilibrio de carga automatizadas habilitadas por el ecosistema de Ray. Es posible que la afinación de los parámetros de control haya mejorado el rendimiento. También es probable que elegir una solución diferente para abordar el cuello de botella de la CPU (como elegir una instancia de la familia EC2 g5 con más CPUs) haya dado como resultado un mejor rendimiento de costo.
Resumen
En esta publicación hemos demostrado cómo se pueden utilizar los conjuntos de datos de Ray para equilibrar la carga de una canalización de preprocesamiento de datos pesada en todos los trabajadores de CPU disponibles en el clúster. Esto nos permite abordar fácilmente los cuellos de botella de la CPU simplemente agregando instancias de CPU auxiliares al entorno de entrenamiento. El soporte de clúster heterogéneo de Amazon SageMaker es una forma convincente de ejecutar un trabajo de entrenamiento de Ray en la nube, ya que maneja todos los aspectos de la gestión del clúster, evitando la necesidad de soporte dedicado de devops.
Tenga en cuenta que la solución presentada aquí es solo una de las muchas formas diferentes de abordar los cuellos de botella de la CPU. La mejor solución para usted dependerá en gran medida de los detalles de su proyecto.
Como siempre, no dude en comunicarse con comentarios, correcciones y preguntas.