Mejores prácticas y patrones de diseño para construir flujos de trabajo de aprendizaje automático con Amazon SageMaker Pipelines
Best practices and design patterns for building machine learning workflows with Amazon SageMaker Pipelines
Amazon SageMaker Pipelines es un servicio totalmente administrado de AWS para construir y orquestar flujos de trabajo de aprendizaje automático (ML). SageMaker Pipelines ofrece a los desarrolladores de aplicaciones de ML la capacidad de orquestar diferentes pasos del flujo de trabajo de ML, incluyendo carga de datos, transformación de datos, entrenamiento, ajuste y despliegue. Puede utilizar SageMaker Pipelines para orquestar trabajos de ML en SageMaker, y su integración con el ecosistema más amplio de AWS también le permite utilizar recursos como funciones de AWS Lambda, trabajos de Amazon EMR y más. Esto le permite construir un flujo de trabajo personalizado y reproducible para requisitos específicos en sus flujos de trabajo de ML.
En esta publicación, proporcionamos algunas mejores prácticas para maximizar el valor de SageMaker Pipelines y hacer que la experiencia de desarrollo sea fluida. También discutimos algunos escenarios y patrones de diseño comunes al construir SageMaker Pipelines y proporcionamos ejemplos para abordarlos.
Mejores prácticas para SageMaker Pipelines
En esta sección, discutimos algunas mejores prácticas que se pueden seguir al diseñar flujos de trabajo utilizando SageMaker Pipelines. Adoptarlas puede mejorar el proceso de desarrollo y agilizar la gestión operativa de SageMaker Pipelines.
Utilice Pipeline Session para la carga diferida del pipeline
La Pipeline Session permite la inicialización diferida de los recursos del pipeline (los trabajos no se inician hasta el tiempo de ejecución del pipeline). El contexto PipelineSession
hereda la sesión de SageMaker e implementa métodos convenientes para interactuar con otras entidades y recursos de SageMaker, como trabajos de entrenamiento, puntos de enlace, conjuntos de datos de entrada en Amazon Simple Storage Service (Amazon S3), entre otros. Al definir SageMaker Pipelines, debe utilizar PipelineSession
en lugar de la sesión regular de SageMaker:
from sagemaker.workflow.pipeline_context import PipelineSession
from sagemaker.sklearn.processing import SKLearnProcessor
role = sagemaker.get_execution_role()
pipeline_session = PipelineSession()
sklearn_processor = SKLearnProcessor(
framework_version=’0.20.0’,
instance_type=’ml.m5.xlarge’,
instance_count=1,
base_job_name="sklearn-abalone-process",
role=role,
sagemaker_session=pipeline_session,
)
Ejecute los pipelines en modo local para iteraciones rápidas y rentables durante el desarrollo
Puede ejecutar un pipeline en modo local utilizando el contexto LocalPipelineSession
. En este modo, el pipeline y los trabajos se ejecutan localmente utilizando recursos en la máquina local, en lugar de los recursos administrados por SageMaker. El modo local proporciona una forma rentable de iterar en el código del pipeline con un subconjunto más pequeño de datos. Después de probar el pipeline localmente, se puede escalar para ejecutarse utilizando el contexto de PipelineSession.
- Habilitar métricas GPU basadas en pods en Amazon CloudWatch
- Estudio explora el potencial de la TMS asistida por robots para la ...
- Colombia Británica dividida sobre la seguridad de los coches autónomos
from sagemaker.sklearn.processing import SKLearnProcessor
from sagemaker.workflow.pipeline_context import LocalPipelineSession
local_pipeline_session = LocalPipelineSession()
role = sagemaker.get_execution_role()
sklearn_processor = SKLearnProcessor(
framework_version=’0.20.0’,
instance_type=’ml.m5.xlarge',
instance_count=1,
base_job_name="sklearn-abalone-process",
role=role,
sagemaker_session=local_pipeline_session,
)
Gestione un pipeline de SageMaker mediante versionado
El versionado de artefactos y definiciones de pipeline es un requisito común en el ciclo de vida de desarrollo. Puede crear múltiples versiones del pipeline mediante la asignación de nombres a los objetos del pipeline con un prefijo o sufijo único, siendo el más común un sello de tiempo, como se muestra en el siguiente código:
from sagemaker.workflow.pipeline_context import PipelineSession
import time
current_time = time.strftime("%Y-%m-%d-%H-%M-%S", time.gmtime())
pipeline_name = "pipeline_" + current_time
pipeline_session = PipelineSession()
pipeline = Pipeline(
name=pipeline_name,
steps=[step_process, step_train, step_eval, step_cond],
sagemaker_session=pipeline_session,
)
Organice y realice un seguimiento de las ejecuciones de SageMaker pipeline mediante la integración con SageMaker Experiments
SageMaker Pipelines se puede integrar fácilmente con SageMaker Experiments para organizar y realizar un seguimiento de las ejecuciones del pipeline. Esto se logra especificando PipelineExperimentConfig al crear un objeto de pipeline. Con este objeto de configuración, puede especificar un nombre de experimento y un nombre de prueba. Los detalles de ejecución de un pipeline de SageMaker se organizan en el experimento y prueba especificados. Si no especifica explícitamente un nombre de experimento, se utiliza el nombre del pipeline como nombre del experimento. De manera similar, si no especifica explícitamente un nombre de prueba, se utiliza el ID de ejecución del pipeline como nombre de prueba o grupo de ejecución. Vea el siguiente código:
Pipeline(
name="MiPipeline",
parameters=[...],
pipeline_experiment_config=PipelineExperimentConfig(
experiment_name=ExecutionVariables.PIPELINE_NAME,
trial_name=ExecutionVariables.PIPELINE_EXECUTION_ID
),
steps=[...]
)
Ejecutar de manera segura las pipelines de SageMaker dentro de una VPC privada
Para asegurar las cargas de trabajo de ML, es una buena práctica desplegar los trabajos orquestados por SageMaker Pipelines en una configuración de red segura dentro de una VPC privada, subredes privadas y grupos de seguridad. Para garantizar y hacer cumplir el uso de este entorno seguro, puedes implementar la siguiente política de AWS Identity and Access Management (IAM) para el rol de ejecución de SageMaker (este es el rol asumido por la pipeline durante su ejecución). También puedes agregar la política para ejecutar los trabajos orquestados por SageMaker Pipelines en modo de aislamiento de red.
# Política de IAM para hacer cumplir la ejecución dentro de una VPC privada
{
"Action": [
"sagemaker:CreateProcessingJob",
"sagemaker:CreateTrainingJob",
"sagemaker:CreateModel"
],
"Resource": "*",
"Effect": "Deny",
"Condition": {
"Null": {
"sagemaker:VpcSubnets": "true"
}
}
}
# Política de IAM para hacer cumplir la ejecución en modo de aislamiento de red
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Deny",
"Action": [
"sagemaker:Create*"
],
"Resource": "*",
"Condition": {
"StringNotEqualsIfExists": {
"sagemaker:NetworkIsolation": "true"
}
}
}
]
}
Para un ejemplo de implementación de una pipeline con estos controles de seguridad, consulta Orchestrating Jobs, Model Registration, and Continuous Deployment with Amazon SageMaker in a secure environment.
Monitorear el costo de las ejecuciones de la pipeline usando etiquetas
El uso de SageMaker Pipelines por sí mismo es gratuito; pagas por los recursos de cómputo y almacenamiento que utilizas como parte de los pasos individuales de la pipeline, como procesamiento, entrenamiento e inferencia por lotes. Para agregar los costos por ejecución de la pipeline, puedes incluir etiquetas en cada paso de la pipeline que crea un recurso. Estas etiquetas luego se pueden referenciar en el explorador de costos para filtrar y agregar el costo total de la ejecución de la pipeline, como se muestra en el siguiente ejemplo:
sklearn_processor = SKLearnProcessor(
framework_version=’0.20.0’,
instance_type=’ml.m5.xlarge,
instance_count=1,
base_job_name="sklearn-abalone-process",
role=role,
tags=[{'Key':'pipeline-cost-tag', 'Value':'<<tag_parameter>>'}]
)
step_process = ProcessingStep(
name="AbaloneProcess",
processor=sklearn_processor,
...
)
Desde el explorador de costos, ahora puedes obtener el costo filtrado por la etiqueta:
response = client.get_cost_and_usage(
TimePeriod={
'Start': '2023-07-01',
'End': '2023-07-15'
},
Metrics=['BLENDED_COST','USAGE_QUANTITY','UNBLENDED_COST'],
Granularity='MONTHLY',
Filter={
'Dimensions': {
'Key':'USAGE_TYPE',
'Values': [
‘SageMaker:Pipeline’
]
},
'Tags': {
'Key': 'keyName',
'Values': [
'keyValue',
]
}
}
)
Patrones de diseño para algunos escenarios comunes
En esta sección, discutiremos patrones de diseño para algunos casos de uso comunes con SageMaker Pipelines.
Ejecutar una función ligera de Python usando un paso de Lambda
Las funciones de Python están omnipresentes en los flujos de trabajo de ML; se utilizan en la preprocesamiento, posprocesamiento, evaluación y más. Lambda es un servicio de cómputo sin servidor que te permite ejecutar código sin aprovisionar ni administrar servidores. Con Lambda, puedes ejecutar código en tu lenguaje preferido que incluye Python. Puedes usar esto para ejecutar código Python personalizado como parte de tu pipeline. Un paso de Lambda te permite ejecutar funciones de Lambda como parte de tu pipeline de SageMaker. Comienza con el siguiente código:
%%writefile lambdafunc.py
import json
def lambda_handler(event, context):
str1 = event["str1"]
str2 = event["str2"]
str3 = str1 + str2
return {
"str3": str3
}
</
Crea la función Lambda utilizando el ayudante Lambda del SDK de Python de SageMaker:
from sagemaker.lambda_helper import Lambda
def create_lambda(function_name, script, handler):
response = Lambda(
function_name=function_name,
execution_role_arn=role,
script= script,
handler=handler,
timeout=600,
memory_size=10240,
).upsert()
function_arn = response['FunctionArn']
return function_arn
fn_arn = create_Lambda("func", "lambdafunc.py", handler = "lambdafunc.lambda_handler")
Llama al paso de Lambda:
from sagemaker.lambda_helper import Lambda
from sagemaker.workflow.lambda_step import (
LambdaStep,
LambdaOutput,
LambdaOutputTypeEnum
)
str3 = LambdaOutput(output_name="str3", output_type=LambdaOutputTypeEnum.String)
# Paso de Lambda
step_lambda1 = LambdaStep(
name="LambdaStep1",
lambda_func=Lambda(
function_arn=fn_arn
),
inputs={
"str1": "Hola",
"str2": " Mundo"
},
outputs=[str3],
)
Pasar datos entre pasos
Los datos de entrada para un paso del pipeline pueden ser una ubicación de datos accesible o datos generados por uno de los pasos anteriores en el pipeline. Puede proporcionar esta información como un parámetro ProcessingInput
. Veamos algunos escenarios de cómo se puede usar ProcessingInput.
Escenario 1: Pasar la salida (tipos de datos primitivos) de un paso de Lambda a un paso de procesamiento
Los tipos de datos primitivos se refieren a tipos de datos escalares como cadena, entero, booleano y flotante.
El siguiente fragmento de código define una función Lambda que devuelve un diccionario de variables con tipos de datos primitivos. El código de su función Lambda devolverá un JSON de pares clave-valor cuando se invoque desde el paso de Lambda dentro del pipeline de SageMaker.
def handler(event, context):
...
return {
"output1": "valor_cadena",
"output2": 1,
"output3": True,
"output4": 2.0,
}
En la definición del pipeline, puede definir los parámetros del pipeline de SageMaker que sean de un tipo de datos específico y establecer la variable en la salida de la función Lambda:
from sagemaker.workflow.lambda_step import (
LambdaStep,
LambdaOutput,
LambdaOutputTypeEnum
)
from sagemaker.workflow.pipeline_context import PipelineSession
from sagemaker.sklearn.processing import SKLearnProcessor
role = sagemaker.get_execution_role()
pipeline_session = PipelineSession()
# 1. Define los parámetros de salida del paso de Lambda
str_outputParam = LambdaOutput(output_name="output1", output_type=LambdaOutputTypeEnum.String)
int_outputParam = LambdaOutput(output_name"output2", output_type=LambdaOutputTypeEnum.Integer)
bool_outputParam = LambdaOutput(output_name"output3", output_type=LambdaOutputTypeEnum.Boolean)
float_outputParam = LambdaOutput(output_name"output4", output_type=LambdaOutputTypeEnum.Float)
# 2. Paso de Lambda invocando la función Lambda y devolviendo la salida
step_lambda = LambdaStep(
name="MiPasoLambda",
lambda_func=Lambda(
function_arn="arn:aws:lambda:us-west-2:123456789012:function:sagemaker_test_lambda",
session=PipelineSession(),
),
inputs={"arg1": "foo", "arg2": "foo1"},
outputs=[
str_outputParam, int_outputParam, bool_outputParam, float_outputParam
],
)
# 3. Extrae la salida de la Lambda
str_outputParam = step_lambda.properties.Outputs["output1"]
# 4. Úsalo en un paso posterior. Por ejemplo, un paso de procesamiento
sklearn_processor = SKLearnProcessor(
framework_version="0.23-1",
instance_type="ml.m5.xlarge",
instance_count=1,
sagemaker_session=pipeline_session,
role=role
)
processor_args = sklearn_processor.run(
code="code/preprocess.py", #python script to run
arguments=["--input-args", str_outputParam]
)
step_process = ProcessingStep(
name="pasoProceso1",
step_args=processor_args,
)
Escenario 2: Pasar la salida (tipos de datos no primitivos) de un paso de Lambda a un paso de procesamiento
Los tipos de datos no primitivos se refieren a los tipos de datos no escalares (por ejemplo, NamedTuple
). Puede haber un escenario en el que deba devolver un tipo de dato no primitivo desde una función Lambda. Para hacer esto, debe convertir su tipo de dato no primitivo a una cadena de texto:
# Código de la función Lambda que devuelve un tipo de dato no primitivo
from collections import namedtuple
def lambda_handler(event, context):
Outputs = namedtuple("Outputs", "sample_output")
named_tuple = Outputs(
[
{'output1': 1, 'output2': 2},
{'output3': 'foo', 'output4': 'foo1'}
]
)
return {
"named_tuple_string": str(named_tuple)
}
# Paso del flujo de trabajo que utiliza la salida de Lambda como "Parámetro de entrada"
output_ref = step_lambda.properties.Outputs["named_tuple_string"]
Luego, puede usar esta cadena de texto como entrada para un paso posterior en el flujo de trabajo. Para usar la tupla con nombre en el código, use eval()
para analizar la expresión de Python en la cadena de texto:
# Descifrar la cadena de texto en su código de lógica de procesamiento
import argparse
from collections import namedtuple
Outputs = namedtuple("Outputs", "sample_output")
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument("--named_tuple_string", type=str, required=True)
args = parser.parse_args()
# usar eval para obtener la tupla con nombre a partir de la cadena de texto
named_tuple = eval(args.named_tuple_string)
Escenario 3: Pasar la salida de un paso a través de un archivo de propiedades
También puede almacenar la salida de un paso de procesamiento en un archivo JSON de propiedades para consumirlo posteriormente en un ConditionStep
u otro ProcessingStep
. Puede usar la función JSONGet para consultar un archivo de propiedades. Vea el siguiente código:
# 1. Definir un Procesador con una ProcessingOutput
sklearn_processor = SKLearnProcessor(
framework_version="0.23-1",
instance_type="ml.m5.xlarge",
instance_count=1,
base_job_name="sklearn-abalone-preprocess",
sagemaker_session=session,
role=sagemaker.get_execution_role(),
)
step_args = sklearn_processor.run(
outputs=[
ProcessingOutput(
output_name="hyperparam",
source="/opt/ml/processing/evaluation"
),
],
code="./local/preprocess.py",
arguments=["--input-data", "s3://my-input"],
)
# 2. Definir un PropertyFile donde el nombre de salida coincida con el utilizado en el Procesador
hyperparam_report = PropertyFile(
name="AbaloneHyperparamReport",
output_name="hyperparam",
path="hyperparam.json",
)
Supongamos que el contenido del archivo de propiedades es el siguiente:
{
"hyperparam": {
"eta": {
"value": 0.6
}
}
}
En este caso, se puede consultar un valor específico y usarlo en pasos posteriores utilizando la función JsonGet:
# 3. Consultar el archivo de propiedades
eta = JsonGet(
step_name=step_process.name,
property_file=hyperparam_report,
json_path="hyperparam.eta.value",
)
Parametrizar una variable en la definición del flujo de trabajo
Parametrizar variables para que puedan usarse en tiempo de ejecución es a menudo deseable, por ejemplo, para construir una URI de S3. Puede parametrizar una cadena de texto de modo que se evalúe en tiempo de ejecución utilizando la función Join
. El siguiente fragmento de código muestra cómo definir la variable usando la función Join
y utilizarla para establecer la ubicación de salida en un paso de procesamiento:
# definir la variable para almacenar la URI de S3
s3_location = Join(
on="/",
values=[
"s3:/",
ParameterString(
name="MyBucket",
default_value=""
),
"training",
ExecutionVariables.PIPELINE_EXECUTION_ID
]
)
# definir el paso de procesamiento
sklearn_processor = SKLearnProcessor(
framework_version="1.2-1",
instance_type="ml.m5.xlarge",
instance_count=processing_instance_count,
base_job_name=f"{base_job_prefix}/sklearn-abalone-preprocess",
sagemaker_session=pipeline_session,
role=role,
)
# utilizar el s3uri como ubicación de salida en el paso de procesamiento
processor_run_args = sklearn_processor.run(
outputs=[
ProcessingOutput(
output_name="train",
source="/opt/ml/processing/train",
destination=s3_location,
),
],
code="code/preprocess.py"
)
step_process = ProcessingStep(
name="PreprocessingJob”,
step_args=processor_run_args,
)
Ejecutar código en paralelo sobre un iterable
Algunos flujos de trabajo de ML ejecutan código en bucles paralelos sobre un conjunto estático de elementos (un iterable). Puede ser el mismo código que se ejecuta en diferentes datos o un código diferente que debe ejecutarse para cada elemento. Por ejemplo, si tienes un número muy grande de filas en un archivo y quieres acelerar el tiempo de procesamiento, puedes confiar en el primer patrón. Si quieres realizar transformaciones diferentes en subgrupos específicos de los datos, es posible que debas ejecutar un código diferente para cada subgrupo en los datos. Los dos escenarios siguientes ilustran cómo puedes diseñar pipelines de SageMaker para este propósito.
Escenario 1: Implementar una lógica de procesamiento en diferentes partes de los datos
Puedes ejecutar un trabajo de procesamiento con múltiples instancias (configurando instance_count
a un valor mayor que 1). Esto distribuye los datos de entrada de Amazon S3 en todas las instancias de procesamiento. Luego, puedes usar un script (process.py) para trabajar en una parte específica de los datos basada en el número de instancia y el elemento correspondiente en la lista de elementos. La lógica de programación en process.py se puede escribir de manera que se ejecute un módulo o un fragmento de código diferente según la lista de elementos que procesa. El siguiente ejemplo define un procesador que se puede usar en un Paso de Procesamiento:
sklearn_processor = FrameworkProcessor(
estimator_cls=sagemaker.sklearn.estimator.SKLearn,
framework_version="0.23-1",
instance_type='ml.m5.4xlarge',
instance_count=4, #número de ejecuciones/instancias en paralelo
base_job_name="parallel-step",
sagemaker_session=session,
role=role,
)
step_args = sklearn_processor.run(
code='process.py',
arguments=[
"--items",
list_of_items, #estructura de datos que contiene una lista de elementos
inputs=[
ProcessingInput(source="s3://sagemaker-us-east-1-xxxxxxxxxxxx/abalone/abalone-dataset.csv",
destination="/opt/ml/processing/input"
)
],
]
)
Escenario 2: Ejecutar una secuencia de pasos
Cuando tienes una secuencia de pasos que deben ejecutarse en paralelo, puedes definir cada secuencia como un pipeline independiente de SageMaker. La ejecución de estos pipelines de SageMaker se puede activar desde una función Lambda que forma parte de un LambdaStep
en el pipeline principal. El siguiente fragmento de código ilustra el escenario donde se activan dos ejecuciones diferentes de pipelines de SageMaker:
import boto3
def lambda_handler(event, context):
items = [1, 2]
#cliente de SageMaker
sm_client = boto3.client("sagemaker")
#nombre del pipeline que se debe activar
#si hay varios, puedes obtener los pipelines disponibles usando la API de boto3
#y activar el adecuado según tu lógica.
pipeline_name = 'child-pipeline-1'
#activar pipeline para cada elemento
response_ppl = sm_client.start_pipeline_execution(
PipelineName=pipeline_name,
PipelineExecutionDisplayName=pipeline_name+'-item-%d' %(s),
)
pipeline_name = 'child-pipeline-2'
response_ppl = sm_client.start_pipeline_execution(
PipelineName=pipeline_name,
PipelineExecutionDisplayName=pipeline_name+'-item-%d' %(s),
)
return
Conclusión
En esta publicación, discutimos algunas mejores prácticas para el uso eficiente y el mantenimiento de los pipelines de SageMaker. También proporcionamos ciertos patrones que puedes adoptar al diseñar flujos de trabajo con SageMaker Pipelines, ya sea que estés creando nuevos pipelines o migrando flujos de trabajo de ML desde otras herramientas de orquestación. Para comenzar con SageMaker Pipelines para la orquestación de flujos de trabajo de ML, consulta los ejemplos de código en GitHub y Amazon SageMaker Model Building Pipelines.