Orquesta flujos de trabajo de aprendizaje automático basados en Ray utilizando Amazon SageMaker

Orquesta flujos de trabajo de ML basados en Ray con SageMaker

El aprendizaje automático (ML) se está volviendo cada vez más complejo a medida que los clientes intentan resolver problemas más desafiantes. Esta complejidad a menudo conduce a la necesidad de ML distribuido, donde se utilizan múltiples máquinas para entrenar un modelo único. Aunque esto permite la paralelización de tareas en varios nodos, lo que resulta en tiempos de entrenamiento acelerados, mayor escalabilidad y mejor rendimiento, existen desafíos significativos para utilizar de manera efectiva el hardware distribuido. Los científicos de datos deben abordar desafíos como la partición de datos, el equilibrio de carga, la tolerancia a fallas y la escalabilidad. Los ingenieros de ML deben manejar la paralelización, la programación, las fallas y los reintentos manualmente, lo que requiere código de infraestructura complejo.

En esta publicación, discutimos los beneficios de usar Ray y Amazon SageMaker para ML distribuido y proporcionamos una guía paso a paso sobre cómo utilizar estos frameworks para construir e implementar un flujo de trabajo de ML escalable.

Ray, un framework de computación distribuida de código abierto, proporciona un marco flexible para el entrenamiento y el servicio distribuido de modelos de ML. Abstrae los detalles de bajo nivel del sistema distribuido a través de bibliotecas simples y escalables para tareas comunes de ML, como el preprocesamiento de datos, el entrenamiento distribuido, la afinación de hiperparámetros, el aprendizaje por refuerzo y el servicio de modelos.

SageMaker es un servicio completamente administrado para construir, entrenar e implementar modelos de ML. Ray se integra sin problemas con las características de SageMaker para construir e implementar cargas de trabajo de ML complejas que son eficientes y confiables. La combinación de Ray y SageMaker proporciona capacidades de extremo a extremo para flujos de trabajo de ML escalables y tiene las siguientes características destacadas:

  • Los actores distribuidos y los constructos de paralelismo en Ray simplifican el desarrollo de aplicaciones distribuidas.
  • Ray AI Runtime (AIR) reduce la fricción al pasar del desarrollo a la producción. Con Ray y AIR, el mismo código Python puede escalar sin problemas desde una computadora portátil hasta un clúster grande.
  • La infraestructura administrada de SageMaker y características como trabajos de procesamiento, trabajos de entrenamiento y trabajos de afinación de hiperparámetros pueden utilizar bibliotecas Ray en el fondo para la computación distribuida.
  • Amazon SageMaker Experiments permite iterar rápidamente y realizar un seguimiento de los ensayos.
  • Amazon SageMaker Feature Store proporciona un repositorio escalable para almacenar, recuperar y compartir características de ML para el entrenamiento de modelos.
  • Los modelos entrenados se pueden almacenar, versionar y rastrear en el Registro de modelos de Amazon SageMaker para el gobierno y la gestión.
  • Amazon SageMaker Pipelines permite orquestar el ciclo de vida completo de ML, desde la preparación de datos y el entrenamiento hasta la implementación de modelos como flujos de trabajo automatizados.

Descripción general de la solución

Esta publicación se enfoca en los beneficios de usar Ray y SageMaker juntos. Configuramos un flujo de trabajo de ML basado en Ray de principio a fin, orquestado mediante SageMaker Pipelines. El flujo de trabajo incluye la ingestión paralela de datos en el almacén de características utilizando actores Ray, el preprocesamiento de datos con Ray Data, el entrenamiento de modelos y la afinación de hiperparámetros a gran escala utilizando Ray Train y trabajos de afinación de hiperparámetros (HPO), y finalmente la evaluación del modelo y el registro del modelo en un registro de modelos.

Para nuestros datos, utilizamos un conjunto de datos sintético de viviendas que consta de ocho características (YEAR_BUILT, SQUARE_FEET, NUM_BEDROOM, NUM_BATHROOMS, LOT_ACRES, GARAGE_SPACES, FRONT_PORCH y DECK) y nuestro modelo predecirá el PRECIO de la casa.

Cada etapa en el flujo de trabajo de ML se divide en pasos discretos, con su propio script que toma parámetros de entrada y salida. En la siguiente sección, destacamos fragmentos de código clave de cada paso. El código completo se puede encontrar en el repositorio de GitHub aws-samples-for-ray.

Prerrequisitos

Para utilizar el SDK de Python de SageMaker y ejecutar el código asociado con esta publicación, necesitas los siguientes prerrequisitos:

  • Una cuenta de AWS que contenga todos tus recursos de AWS
  • Un rol de IAM (Identidad y acceso de AWS) con acceso a los cuadernos de Amazon SageMaker Studio, el Almacén de características de SageMaker, el Registro de modelos de SageMaker y los Pipelines de SageMaker

Ingesta de datos en el Almacén de características de SageMaker

El primer paso en el flujo de trabajo de ML es leer el archivo de datos fuente de Amazon Simple Storage Service (Amazon S3) en formato CSV e ingerirlo en el Almacén de características de SageMaker. El Almacén de características de SageMaker es un repositorio diseñado específicamente que facilita a los equipos crear, compartir y gestionar características de ML. Simplifica el descubrimiento, la reutilización y el intercambio de características, lo que conduce a un desarrollo más rápido, una mayor colaboración dentro de los equipos de los clientes y costos reducidos.

La ingesta de características en el almacén de características incluye los siguientes pasos:

  1. Definir un grupo de características y crear el grupo de características en el almacén de características.
  2. Preparar los datos fuente para el almacén de características agregando una hora de evento y un ID de registro para cada fila de datos.
  3. Ingerir los datos preparados en el grupo de características utilizando el SDK Boto3.

En esta sección, destacamos únicamente el Paso 3, ya que esta es la parte que implica el procesamiento paralelo de la tarea de ingestión utilizando Ray. Puedes revisar el código completo de este proceso en el repositorio de GitHub.

El método ingest_features está definido dentro de una clase llamada Featurestore. Observa que la clase Featurestore está decorada con @ray.remote. Esto indica que una instancia de esta clase es un actor de Ray, una unidad computacional concurrente y con estado dentro de Ray. Es un modelo de programación que te permite crear objetos distribuidos que mantienen un estado interno y pueden ser accedidos concurrentemente por múltiples tareas que se ejecutan en diferentes nodos de un clúster de Ray. Los actores proporcionan una forma de gestionar y encapsular el estado mutable, lo que los hace valiosos para construir aplicaciones complejas y con estado en un entorno distribuido. También puedes especificar requisitos de recursos en los actores. En este caso, cada instancia de la clase FeatureStore requerirá 0.5 CPUs. Observa el siguiente código:

@ray.remote(num_cpus=0.5)
class Featurestore:
    def ingest_features(self,feature_group_name, df, region):
        """
        Ingestar características en el Grupo de Almacenamiento de Características
        Args:
            feature_group_name (str): Nombre del Grupo de Características
            data_path (str): Ruta hacia los datos de entrenamiento/validación/prueba en formato CSV.
        """
        
        ...

Puedes interactuar con el actor llamando al operador remote. En el siguiente código, el número deseado de actores se pasa como un argumento de entrada al script. Luego, los datos se dividen en particiones basadas en el número de actores y se pasan a los procesos paralelos remotos para ser ingestados en el almacén de características. Puedes llamar a get en la referencia del objeto para bloquear la ejecución de la tarea actual hasta que la computación remota esté completa y el resultado esté disponible. Cuando el resultado está disponible, ray.get devolverá el resultado y la ejecución de la tarea actual continuará.

import modin.pandas as pd
import ray

df = pd.read_csv(s3_path)
data = prepare_df_for_feature_store(df)
# Dividir en particiones
particiones = [ray.put(part) para part en np.array_split(data, num_actors)]
# Iniciar actores y asignar particiones en un bucle
actores = [Featurestore.remote() para _ en range(args.num_actors)]
resultados = []

para actor, particion en zip(actores, particiones_de_entrada):
    resultados.append(actor.ingest_features.remote(
                        args.feature_group_name, 
                        particion, args.region
                      )
                )

ray.get(resultados)

Preparar los datos para entrenamiento, validación y prueba

En este paso, utilizamos Ray Dataset para dividir, transformar y escalar eficientemente nuestro conjunto de datos en preparación para el aprendizaje automático. Ray Dataset proporciona una forma estándar de cargar datos distribuidos en Ray, con soporte para varios sistemas de almacenamiento y formatos de archivo. Tiene APIs para operaciones comunes de preprocesamiento de datos de aprendizaje automático, como transformaciones paralelas, mezcla, agrupación y agregaciones. Ray Dataset también maneja operaciones que requieren configuración con estado y aceleración de GPU. Se integra sin problemas con otras bibliotecas de procesamiento de datos como Spark, Pandas, NumPy y más, así como con frameworks de aprendizaje automático como TensorFlow y PyTorch. Esto permite construir tuberías de datos de extremo a extremo y flujos de trabajo de aprendizaje automático sobre Ray. El objetivo es facilitar el procesamiento de datos distribuidos y el aprendizaje automático para profesionales e investigadores.

Veamos las secciones del script que realizan este preprocesamiento de datos. Comenzamos cargando los datos desde el almacén de características:

def load_dataset(feature_group_name, region):
    """
    Carga los datos como un dataset de Ray desde la ubicación S3 del almacén de características sin conexión
    Args:
        feature_group_name (str): nombre del grupo de características
    Returns:
        ds (ray.data.dataset): dataset de Ray que contiene los datos solicitados del almacén de características
    """
    session = sagemaker.Session(boto3.Session(region_name=region))
    fs_group = FeatureGroup(
        name=feature_group_name, 
        sagemaker_session=session
    )

    fs_data_loc = fs_group.describe().get("OfflineStoreConfig").get("S3StorageConfig").get("ResolvedOutputS3Uri")
    
    # Eliminar columnas añadidas por el almacén de características
    # Ya que no están relacionadas con el problema de aprendizaje automático en cuestión
    cols_to_drop = ["record_id", "event_time","write_time", 
                    "api_invocation_time", "is_deleted", 
                    "year", "month", "day", "hour"]           

    ds = ray.data.read_parquet(fs_data_loc)
    ds = ds.drop_columns(cols_to_drop)
    print(f"{fs_data_loc} el conteo es {ds.count()}")
    return ds

A continuación, dividimos y escalamos los datos utilizando las abstracciones de nivel superior disponibles en la biblioteca ray.data:

def split_dataset(dataset, train_size, val_size, test_size, random_state=None):
    """
    Dividir el conjunto de datos en muestras de entrenamiento, validación y prueba
    Args:
        dataset (ray.data.Dataset): datos de entrada
        train_size (float): proporción de datos a utilizar como conjunto de entrenamiento
        val_size (float): proporción de datos a utilizar como conjunto de validación
        test_size (float): proporción de datos a utilizar como conjunto de prueba
        random_state (int): Pase un int para obtener una salida reproducible en múltiples llamadas a la función.
    Returns:
        train_set (ray.data.Dataset): conjunto de datos de entrenamiento
        val_set (ray.data.Dataset): conjunto de datos de validación
        test_set (ray.data.Dataset): conjunto de datos de prueba
    """
    # Mezclamos este conjunto de datos con una semilla aleatoria fija.
    shuffled_ds = dataset.random_shuffle(seed=random_state)
    # Dividir los datos en conjuntos de entrenamiento, validación y prueba
    train_set, val_set, test_set = shuffled_ds.split_proportionately([train_size, val_size])
    return train_set, val_set, test_set

def scale_dataset(train_set, val_set, test_set, target_col):
    """
    Ajustar StandardScaler a train_set y aplicarlo a val_set y test_set
    Args:
        train_set (ray.data.Dataset): conjunto de datos de entrenamiento
        val_set (ray.data.Dataset): conjunto de datos de validación
        test_set (ray.data.Dataset): conjunto de datos de prueba
        target_col (str): columna objetivo
    Returns:
        train_transformed (ray.data.Dataset): datos de entrenamiento escalados
        val_transformed (ray.data.Dataset): datos de validación escalados
        test_transformed (ray.data.Dataset): datos de prueba escalados
    """
    tranform_cols = dataset.columns()
    # Eliminar las columnas objetivo del escalado
    tranform_cols.remove(target_col)
    # Configurar un escalador estándar
    standard_scaler = StandardScaler(tranform_cols)
    # Ajustar el escalador al conjunto de entrenamiento
    print("Ajustando la escala a los datos de entrenamiento y transformando el conjunto de datos...")
    train_set_transformed = standard_scaler.fit_transform(train_set)
    # Aplicar el escalador a los conjuntos de validación y prueba
    print("Transformando conjuntos de validación y prueba...")
    val_set_transformed = standard_scaler.transform(val_set)
    test_set_transformed = standard_scaler.transform(test_set)
    return train_set_transformed, val_set_transformed, test_set_transformed

Los conjuntos de datos de entrenamiento, validación y prueba procesados se almacenan en Amazon S3 y se pasarán como parámetros de entrada a los pasos siguientes.

Realizar el entrenamiento del modelo y la optimización de hiperparámetros

Con nuestros datos preprocesados y listos para el modelado, es hora de entrenar algunos modelos de ML y ajustar sus hiperparámetros para maximizar el rendimiento predictivo. Utilizamos XGBoost-Ray, una plataforma distribuida para XGBoost construida sobre Ray que permite entrenar modelos de XGBoost en conjuntos de datos grandes utilizando múltiples nodos y GPU. Proporciona reemplazos sencillos para las APIs de entrenamiento y predicción de XGBoost, mientras maneja las complejidades de la gestión y el entrenamiento de datos distribuidos internamente.

Para habilitar la distribución del entrenamiento en varios nodos, utilizamos una clase auxiliar llamada RayHelper. Como se muestra en el siguiente código, utilizamos la configuración de recursos del trabajo de entrenamiento y elegimos el primer host como el nodo principal:

class RayHelper():
    def __init__(self, ray_port:str="9339", redis_pass:str="redis_password"):
        ....
        self.resource_config = self.get_resource_config()
        self.head_host = self.resource_config["hosts"][0]
        self.n_hosts = len(self.resource_config["hosts"])

Podemos utilizar la información del host para decidir cómo inicializar Ray en cada una de las instancias del trabajo de entrenamiento:

def start_ray(self): 
   head_ip = self._get_ip_from_host()
   # Si el host actual es el host elegido como nodo principal
   # ejecutamos `ray start` especificando la bandera --head para que este sea el nodo principal
    if self.resource_config["current_host"] == self.head_host:
        output = subprocess.run(['ray', 'start', '--head', '-vvv', '--port', 
        self.ray_port, '--redis-password', self.redis_pass, 
        '--include-dashboard', 'false'], stdout=subprocess.PIPE)
        print(output.stdout.decode("utf-8"))
        ray.init(address="auto", include_dashboard=False)
        self._wait_for_workers()
        print("Todos los trabajadores presentes y contabilizados")
        print(ray.cluster_resources())

    else:
       # Si el host actual no es el nodo principal, 
       # ejecutamos `ray start` especificando la dirección IP como el host principal
        time.sleep(10)
        output = subprocess.run(['ray', 'start', 
        f"--address={head_ip}:{self.ray_port}", 
        '--redis-password', self.redis_pass, "--block"], stdout=subprocess.PIPE)
        print(output.stdout.decode("utf-8"))
        sys.exit(0)

Cuando se inicia un trabajo de entrenamiento, se puede inicializar un clúster Ray llamando al método start_ray() en una instancia de RayHelper:

if __name__ == '__main__':
    ray_helper = RayHelper()
    ray_helper.start_ray()
    args = read_parameters()
    sess = sagemaker.Session(boto3.Session(region_name=args.region))

Luego usamos el entrenador XGBoost de XGBoost-Ray para el entrenamiento:

def train_xgboost(ds_train, ds_val, params, num_workers, target_col = "price") -> Result:
    """
    Crea un entrenador XGBoost, lo entrena y devuelve el resultado.
    Args:
        ds_train (ray.data.dataset): Conjunto de datos de entrenamiento
        ds_val (ray.data.dataset): Conjunto de datos de validación
        params (dict): Hiperparámetros
        num_workers (int): número de workers para distribuir el entrenamiento
        target_col (str): columna objetivo
    Returns:
        result (ray.air.result.Result): Resultado del trabajo de entrenamiento
    """
    
    train_set = RayDMatrix(ds_train, 'PRICE')
    val_set = RayDMatrix(ds_val, 'PRICE')
    
    evals_result = {}
    
    trainer = train(
        params=params,
        dtrain=train_set,
        evals_result=evals_result,
        evals=[(val_set, "validation")],
        verbose_eval=False,
        num_boost_round=100,
        ray_params=RayParams(num_actors=num_workers, cpus_per_actor=1),
    )
    
    output_path=os.path.join(args.model_dir, 'model.xgb')
    
    trainer.save_model(output_path)
    
    valMAE = evals_result["validation"]["mae"][-1]
    valRMSE = evals_result["validation"]["rmse"][-1]
 
    print('[3] #011validation-mae:{}'.format(valMAE))
    print('[4] #011validation-rmse:{}'.format(valRMSE))
    
    local_testing = False
    try:
        load_run(sagemaker_session=sess)
    except:
        local_testing = True
    if not local_testing: # Seguimiento del experimento si se utiliza SageMaker Training
        with load_run(sagemaker_session=sess) as run:
            run.log_metric('validation-mae', valMAE)
            run.log_metric('validation-rmse', valRMSE)

Ten en cuenta que al instanciar el trainer, pasamos RayParams, que toma el número de actores y el número de CPUs por actor. XGBoost-Ray utiliza esta información para distribuir el entrenamiento en todos los nodos conectados al clúster Ray.

Ahora creamos un objeto estimador XGBoost basado en el SDK de Python de SageMaker y lo usamos para el trabajo de HPO.

Orquestar los pasos anteriores usando SageMaker Pipelines

Para construir un flujo de trabajo de ML escalable y reutilizable de principio a fin, debemos usar una herramienta de CI/CD para orquestar los pasos anteriores en una canalización. SageMaker Pipelines tiene integración directa con SageMaker, el SDK de Python de SageMaker y SageMaker Studio. Esta integración te permite crear flujos de trabajo de ML con un SDK de Python fácil de usar y luego visualizar y administrar tu flujo de trabajo usando SageMaker Studio. También puedes hacer un seguimiento del historial de tus datos dentro de la ejecución de la canalización y designar pasos para el almacenamiento en caché.

SageMaker Pipelines crea un Grafo Acíclico Dirigido (DAG) que incluye los pasos necesarios para construir un flujo de trabajo de ML. Cada canalización es una serie de pasos interconectados orquestados por dependencias de datos entre los pasos, y se pueden parametrizar, lo que te permite proporcionar variables de entrada como parámetros para cada ejecución de la canalización. SageMaker Pipelines tiene cuatro tipos de parámetros de canalización: ParameterString, ParameterInteger, ParameterFloat y ParameterBoolean. En esta sección, parametrizamos algunas de las variables de entrada y configuramos la configuración de almacenamiento en caché del paso:

processing_instance_count = ParameterInteger(
    name='ProcessingInstanceCount',
    default_value=1
)
feature_group_name = ParameterString(
    name='FeatureGroupName',
    default_value='fs-ray-synthetic-housing-data'
)
bucket_prefix = ParameterString(
    name='Bucket_Prefix',
    default_value='aws-ray-mlops-workshop/feature-store'
)
rmse_threshold = ParameterFloat(name="RMSEThreshold", default_value=15000.0)
    train_size = ParameterString(
    name='TrainSize',
    default_value="0.6"
)
val_size = ParameterString(
    name='ValidationSize',
    default_value="0.2"
)
test_size = ParameterString(
    name='TestSize',
    default_value="0.2"
)

cache_config = CacheConfig(enable_caching=True, expire_after="PT12H")

Definimos dos pasos de procesamiento: uno para la ingestión de SageMaker Feature Store y otro para la preparación de datos. Esto debería verse muy similar a los pasos anteriores descritos anteriormente. La única línea de código nueva es el ProcessingStep después de la definición de los pasos, que nos permite tomar la configuración del trabajo de procesamiento e incluirlo como un paso en el pipeline. Especificamos además la dependencia del paso de preparación de datos en el paso de ingestión de SageMaker Feature Store. Veamos el siguiente código:

feature_store_ingestion_step = ProcessingStep(
    name='IngestiónFeatureStore',
    step_args=fs_processor_args,
    cache_config=cache_config
)

preprocess_dataset_step = ProcessingStep(
    name='PreparaciónDatos',
    step_args=processor_args,
    cache_config=cache_config
)
preprocess_dataset_step.add_depends_on([feature_store_ingestion_step])

De manera similar, para construir un paso de entrenamiento y ajuste del modelo, necesitamos agregar una definición de TuningStep después del código del paso de entrenamiento del modelo para permitirnos ejecutar la sintonización de hiperparámetros de SageMaker como un paso en el pipeline:

tuning_step = TuningStep(
    name="AjusteHP",
    tuner=tuner,
    inputs={
        "entrenamiento": TrainingInput(
            s3_data=preprocess_dataset_step.properties.ProcessingOutputConfig.Outputs[
            "entrenamiento"
            ].S3Output.S3Uri,
            content_type="text/csv"
        ),
        "validación": TrainingInput(
            s3_data=preprocess_dataset_step.properties.ProcessingOutputConfig.Outputs[
            "validación"
            ].S3Output.S3Uri,
            content_type="text/csv"
        )
    },
    cache_config=cache_config,
)
tuning_step.add_depends_on([preprocess_dataset_step])

Después del paso de ajuste, elegimos registrar el mejor modelo en el Registro de Modelos de SageMaker. Para controlar la calidad del modelo, implementamos un umbral mínimo de calidad que compara la métrica objetivo del mejor modelo (RMSE) con un umbral definido como el parámetro de entrada del pipeline rmse_threshold. Para hacer esta evaluación, creamos otro paso de procesamiento para ejecutar un script de evaluación. El resultado de la evaluación del modelo se almacenará como un archivo de propiedades. Los archivos de propiedades son particularmente útiles al analizar los resultados de un paso de procesamiento para decidir cómo se deben ejecutar otros pasos. Veamos el siguiente código:

# Especificamos dónde almacenaremos los resultados de la evaluación del modelo para que otros pasos puedan acceder a esos resultados
evaluation_report = PropertyFile(
    name='InformeEvaluación',
    output_name='evaluación',
    path='evaluation.json',
)

# Se utiliza un ProcessingStep para evaluar el rendimiento de un modelo seleccionado del paso de HPO.
# En este caso, se evalúa el modelo de mejor rendimiento.
evaluation_step = ProcessingStep(
    name='EvaluarModelo',
    processor=evaluation_processor,
    inputs=[
        ProcessingInput(
            source=tuning_step.get_top_model_s3_uri(
                top_k=0, s3_bucket=bucket, prefix=s3_prefix
            ),
            destination='/opt/ml/processing/model',
        ),
        ProcessingInput(
            source=preprocess_dataset_step.properties.ProcessingOutputConfig.Outputs['test'].S3Output.S3Uri,
            destination='/opt/ml/processing/test',
        ),
    ],
    outputs=[
        ProcessingOutput(
            output_name='evaluación', source='/opt/ml/processing/evaluation'
        ),
    ],
    code='./pipeline_scripts/evaluate/script.py',
    property_files=[evaluation_report],
)

Definimos un ModelStep para registrar el mejor modelo en el Registro de Modelos de SageMaker en nuestro pipeline. En caso de que el mejor modelo no supere nuestro control de calidad predeterminado, especificamos adicionalmente un FailStep para mostrar un mensaje de error:

register_step = ModelStep(
    name='RegistrarModeloEntrenado',
    step_args=model_registry_args
)

metrics_fail_step = FailStep(
    name="FalloRMSE",
    error_message=Join(on=" ", values=["La ejecución falló debido a RMSE >", rmse_threshold]),
)

A continuación, utilizamos un ConditionStep para evaluar si se debe realizar el paso de registro del modelo o el paso de falla a continuación en el pipeline. En nuestro caso, el mejor modelo se registrará si su puntuación de RMSE es menor que el umbral.

# Paso condicional para evaluar la calidad del modelo y bifurcar la ejecución
cond_lte = ConditionLessThanOrEqualTo(
    left=JsonGet(
        step_name=evaluation_step.name,
        property_file=evaluation_report,
        json_path='regression_metrics.rmse.value',
    ),
    right=rmse_threshold,
)
condition_step = ConditionStep(
    name='VerificarEvaluación',
    conditions=[cond_lte],
    if_steps=[register_step],
    else_steps=[metrics_fail_step],
)

Finalmente, orquestamos todos los pasos definidos en un pipeline:

pipeline_name = 'synthetic-housing-training-sm-pipeline-ray'
step_list = [
             feature_store_ingestion_step,
             preprocess_dataset_step,
             tuning_step,
             evaluation_step,
             condition_step
            ]

training_pipeline = Pipeline(
    name=pipeline_name,
    parameters=[
        processing_instance_count,
        feature_group_name,
        train_size,
        val_size,
        test_size,
        bucket_prefix,
        rmse_threshold
    ],
    steps=step_list
)

# Nota: Si ya existe un pipeline con el mismo nombre, se sobrescribirá.
training_pipeline.upsert(role_arn=role_arn)

El pipeline anterior se puede visualizar y ejecutar directamente en SageMaker Studio o ejecutarse llamando a execution = training_pipeline.start(). La siguiente figura ilustra el flujo del pipeline.

Además, podemos revisar la línea de artefactos generados por la ejecución del pipeline.

from sagemaker.lineage.visualizer import LineageTableVisualizer

viz = LineageTableVisualizer(sagemaker.session.Session())
for execution_step in reversed(execution.list_steps()):
    print(execution_step)
    display(viz.show(pipeline_execution_step=execution_step))
    time.sleep(5)

Implementar el modelo

Después de que el mejor modelo se registre en el Registro de modelos de SageMaker a través de una ejecución del pipeline, implementamos el modelo en un punto final en tiempo real utilizando las capacidades de implementación de modelos totalmente administrados de SageMaker. SageMaker tiene otras opciones de implementación de modelos para satisfacer las necesidades de diferentes casos de uso. Para obtener más detalles, consulte Implementar modelos para inferencia al elegir la opción adecuada para su caso de uso. Primero, registremos el modelo en el Registro de modelos de SageMaker:

xgb_regressor_model = ModelPackage(
    role_arn,
    model_package_arn=model_package_arn,
    name=model_name
)

El estado actual del modelo es PendingApproval. Necesitamos establecer su estado en Approved antes de la implementación:

sagemaker_client.update_model_package(
    ModelPackageArn=xgb_regressor_model.model_package_arn,
    ModelApprovalStatus='Approved'
)

xgb_regressor_model.deploy(
    initial_instance_count=1,
    instance_type='ml.m5.xlarge',
    endpoint_name=endpoint_name
)

Limpiar

Después de terminar de experimentar, recuerde limpiar los recursos para evitar cargos innecesarios. Para limpiar, elimine el punto final en tiempo real, el grupo de modelos, el pipeline y el grupo de características llamando a las APIs DeleteEndpoint, DeleteModelPackageGroup, DeletePipeline y DeleteFeatureGroup, respectivamente, y cierre todas las instancias de bloc de notas de SageMaker Studio.

Conclusión

Esta publicación demostró un recorrido paso a paso sobre cómo utilizar SageMaker Pipelines para orquestar flujos de trabajo de ML basados en Ray. También demostramos la capacidad de SageMaker Pipelines para integrarse con herramientas de ML de terceros. Hay varios servicios de AWS que admiten cargas de trabajo de Ray de manera escalable y segura para garantizar un rendimiento excelente y una eficiencia operativa. Ahora, es tu turno de explorar estas poderosas capacidades y comenzar a optimizar tus flujos de trabajo de aprendizaje automático con Amazon SageMaker Pipelines y Ray. ¡Toma acción hoy y desbloquea todo el potencial de tus proyectos de ML!