Cree conjuntos de datos listos para el aprendizaje automático a partir del Almacén de características fuera de línea de Amazon SageMaker utilizando el SDK de Python de Amazon SageMaker.

Create machine learning-ready datasets from Amazon SageMaker's offline Feature Store using Amazon SageMaker's Python SDK.

Amazon SageMaker Feature Store es un servicio diseñado específicamente para almacenar y recuperar datos de características para su uso por parte de modelos de aprendizaje automático (ML). Feature Store proporciona una tienda en línea capaz de realizar lecturas y escrituras de baja latencia y alta velocidad, y una tienda fuera de línea que proporciona acceso masivo a todos los datos de registro históricos. Feature Store maneja la sincronización de datos entre las tiendas en línea y fuera de línea.

Debido a que el desarrollo de modelos es un proceso iterativo, los clientes frecuentemente consultarán la tienda fuera de línea y crearán varios conjuntos de datos para el entrenamiento del modelo. Actualmente, hay varias formas de acceder a las características en la tienda fuera de línea, incluyendo la ejecución de consultas SQL con Amazon Athena o el uso de Spark SQL en Apache Spark. Sin embargo, estos patrones requieren escribir declaraciones SQL ad hoc (y a veces complejas), lo cual no siempre es adecuado para la persona del científico de datos.

Feature Store recientemente extendió el SDK de Python de SageMaker para hacer más fácil la creación de conjuntos de datos desde la tienda fuera de línea. Con esta versión, puede utilizar un nuevo conjunto de métodos en el SDK para crear conjuntos de datos sin escribir consultas SQL. Estos nuevos métodos admiten operaciones comunes como el viaje en el tiempo, la eliminación de registros duplicados y la unión de múltiples grupos de características mientras se asegura la precisión en el tiempo.

En esta publicación, demostramos cómo utilizar el SDK de Python de SageMaker para construir conjuntos de datos listos para ML sin escribir ninguna declaración SQL.

Descripción general de la solución

Para demostrar la nueva funcionalidad, trabajamos con dos conjuntos de datos: clientes potenciales y métricas de marketing en la web. Estos conjuntos de datos se pueden utilizar para construir un modelo que predice si un cliente potencial se convertirá en una venta dadas las actividades de marketing y las métricas capturadas para ese cliente.

Los datos de los clientes potenciales contienen información sobre posibles clientes que se identifican utilizando Lead_ProspectID. Las características para un cliente (por ejemplo, LeadSource) pueden actualizarse con el tiempo, lo que resulta en un nuevo registro para ese cliente. El Lead_EventTime representa el momento en que se crea cada registro. La siguiente captura de pantalla muestra un ejemplo de estos datos.

Los datos de las métricas de marketing en la web realizan un seguimiento de las métricas de participación para un cliente potencial, donde cada cliente potencial se identifica utilizando el Web_ProspectID. El Web_EventTime representa el momento en que se creó el registro. A diferencia del grupo de características de clientes potenciales, hay solo un registro por cliente potencial en este grupo de características. La siguiente captura de pantalla muestra un ejemplo de estos datos.

Recorremos las partes clave del cuaderno sagemaker-feature-store-offline-sdk.ipynb, que demuestra los siguientes pasos:

  1. Crear un conjunto de datos a partir de un grupo de características.
  2. Unirse a múltiples grupos de características.
  3. Crear una unión en el tiempo entre un grupo de características y un conjunto de datos basados en un conjunto de eventos en marcas de tiempo específicas.
  4. Recuperar el historial de características dentro de un rango de tiempo específico.
  5. Recuperar características en una marca de tiempo específica.

Requisitos previos

Necesita los siguientes requisitos previos:

  • Una cuenta de AWS.
  • Una instancia de cuaderno SageMaker Jupyter. Acceda al código del repositorio de GitHub y cárguelo en su instancia de cuaderno.
  • También puede ejecutar el cuaderno en un entorno de Amazon SageMaker Studio, que es un IDE para el desarrollo de ML. Puede clonar el repositorio de GitHub a través de una terminal dentro del entorno Studio utilizando el siguiente comando:
git clone https://github.com/aws-samples/amazon-sagemaker-feature-store-offline-queries.git

Suponemos que se ha creado un grupo de características para los datos de clientes potenciales utilizando el método existente FeatureGroup.create, y se puede hacer referencia utilizando la variable base_fg. Para obtener más información sobre los grupos de características, consulte Crear grupos de características.

Crear un conjunto de datos a partir de un grupo de características

Para crear un conjunto de datos utilizando el SDK de SageMaker, utilizamos la nueva clase FeatureStore, que contiene el método create_dataset. Este método acepta un grupo de características base que se puede unir con otros grupos de características o DataFrames. Empezamos proporcionando el grupo de características de los clientes potenciales como base y una ruta de Amazon Simple Storage Service (Amazon S3) para almacenar el conjunto de datos:

from sagemaker.feature_store.feature_store import FeatureStore
feature_store = FeatureStore(sagemaker_session=feature_store_session)
ds1_builder = feature_store.create_dataset (base=base_fg,
output_path=f"s3://{s3_bucket_name}/dataset_query_results",)

El método create_dataset devuelve un objeto DatasetBuilder, que se puede utilizar para generar un conjunto de datos a partir de uno o varios grupos de características (que se demostrará en la siguiente sección). Para crear un conjunto de datos simple que consista solo en las características de los clientes potenciales, invocamos el método to_csv_file. Esto ejecuta una consulta en Athena para recuperar las características del almacenamiento sin conexión y guarda los resultados en la ruta S3 especificada.

csv, query = ds1_builder.to_csv_file()
# Mostrar la ubicación S3 del archivo CSV
print(f'Archivo CSV: {csv}')

Unir múltiples grupos de características

Con el SDK de SageMaker, se pueden unir fácilmente múltiples grupos de características para construir un conjunto de datos. También se pueden realizar operaciones de unión entre un DataFrame de Pandas existente y uno o varios grupos de características. El grupo de características base es un concepto importante para las uniones. El grupo de características base es el grupo de características al que se unen otros grupos de características o el DataFrame de Pandas.

Mientras se crea el conjunto de datos utilizando la función create_dataset, se utiliza el método with_feature_group, que realiza una unión interna entre el grupo de características base y otro grupo de características utilizando el identificador de registro y el nombre de la característica objetivo en el grupo de características base. En nuestro ejemplo, el grupo de características base es el grupo de características de clientes potenciales, y el grupo de características objetivo es el grupo de características de marketing web. El método with_feature_group acepta los siguientes argumentos:

  • feature_group – Este es el grupo de características con el que se está uniendo. En nuestro ejemplo de código, el grupo de características objetivo se crea utilizando el conjunto de datos de marketing web.
  • target_feature_name_in_base – El nombre de la característica en el grupo de características base que estamos utilizando como clave en la unión. Utilizamos Lead_ProspectID como identificador de registro para el grupo de características base.
  • included_feature_names – Esta es la lista de nombres de características del grupo de características base. Utilizamos este campo para especificar las características que queremos incluir en el conjunto de datos.

El siguiente código muestra un ejemplo de creación de un conjunto de datos uniendo el grupo de características base con el grupo de características objetivo:

join_builder = feature_store.create_dataset(base=base_fg, 
output_path=f"s3://{s3_bucket_name}/dataset_query_results").with_feature_group(
feature_group=target_fg,
target_feature_name_in_base="Lead_ProspectID",
included_feature_names=["Web_ProspectID",
"LastCampaignActivity","PageViewsPerVisit",
"TotalTimeOnWebsite","TotalWebVisits",
"AttendedMarketingEvent","OrganicSearch",
"ViewedAdvertisement",],)

Se pueden ampliar las operaciones de unión para incluir múltiples grupos de características añadiendo el método with_feature_group al final del ejemplo de código anterior y definiendo los argumentos necesarios para el nuevo grupo de características. También se pueden realizar operaciones de unión con un DataFrame existente definiendo la base como el DataFrame de Pandas existente y uniéndose con los grupos de características de interés. El siguiente ejemplo de código muestra cómo crear un conjunto de datos con un DataFrame de Pandas existente y un grupo de características existente:

ds2_builder = feature_store.create_dataset(
base=new_records_df2, # DataFrame de Pandas
event_time_identifier_feature_name="Lead_EventTime",
record_identifier_feature_name="Lead_ProspectID",
output_path=f"s3://{s3_bucket_name}/dataset_query_results",).with_feature_group(
base_fg, "Lead_ProspectID", ["LeadSource"])

Para obtener más ejemplos sobre estas diversas configuraciones, consulte Crear un conjunto de datos a partir de sus grupos de características.

Crear una unión en un punto en el tiempo

Una de las capacidades más potentes de esta mejora es realizar uniones en un punto en el tiempo de manera simple y sin necesidad de escribir código SQL complejo. Al construir modelos de ML, los científicos de datos necesitan evitar la fuga de datos o la fuga de objetivos, que consiste en utilizar accidentalmente datos durante el entrenamiento del modelo que no estarían disponibles en el momento de la predicción. Por ejemplo, si intentamos predecir el fraude de tarjetas de crédito, debemos excluir las transacciones que llegan después del cargo fraudulento que intentamos predecir, de lo contrario, el modelo entrenado podría utilizar esta información posterior al fraude para alterar el modelo, haciendo que generalice menos bien.

La recuperación de datos precisos en un momento determinado de características requiere que proporcione un DataFrame de entidad que proporcione un conjunto de ID de registro (o clave primaria) y los tiempos de evento correspondientes que sirvan como tiempo de corte para el evento. Este mecanismo de recuperación se llama a veces viaje en el tiempo a nivel de fila, porque permite aplicar una restricción de tiempo diferente para cada clave de fila. Para realizar uniones en un momento determinado con el SDK de SageMaker, usamos la clase Dataset Builder y proporcionamos el DataFrame de entidad como argumento base para el constructor.

En el siguiente código, creamos un DataFrame de entidad simple con dos registros. Establecemos los tiempos de evento, que se utilizan para indicar el tiempo de corte, cerca del centro de los datos de la serie temporal (mediados de enero de 2023):

# Crear el DataFrame de eventos (tabla de entidad) para pasar el sello de tiempo para la unión en un momento determinado
eventos = [['2023-01-20T00:00:00Z', id_registro1],
['2023-01-15T00:00:00Z', id_registro2]]
df_eventos = pd.DataFrame(eventos, columns=['Tiempo_Evento', 'ID_Proceso_Lider'])

Cuando usamos la funcionalidad de point_in_time_accurate_join con la llamada de create_dataset, la consulta interna excluye todos los registros con marcas de tiempo posteriores a los tiempos de corte suministrados, devolviendo los últimos valores de características que habrían estado disponibles en el momento del evento:

# Crear Dataset Builder usando la función de point-in-time-accurate-join
pit_builder = feature_store.create_dataset(
base=df_eventos,
event_time_identifier_feature_name='Tiempo_Evento',
record_identifier_feature_name='ID_Proceso_Lider',
output_path=f"s3://{s3_bucket_name}/{s3_prefix}/dataset_query_results"
).with_feature_group(base_fg, "ID_Proceso_Lider"
).point_in_time_accurate_join(
).with_number_of_recent_records_by_record_identifier(1)

Observe que solo hay dos registros en el DataFrame devuelto por la unión en un momento determinado. Esto se debe a que solo enviamos dos ID de registro en el DataFrame de entidad, uno para cada ID_Proceso_Lider que queremos recuperar. El criterio de unión en un momento determinado especifica que el tiempo de evento de un registro (almacenado en el campo Lead_Eventtime) debe contener un valor que sea inferior al tiempo de corte.

Además, instruimos la consulta para recuperar solo el último registro que cumpla con este criterio porque hemos aplicado el método with_number_of_recent_records_by_record_identifier. Cuando se usa en conjunto con el método point_in_time_accurate_join, esto permite al llamador especificar cuántos registros devolver de los que cumplen los criterios de unión en un momento determinado.

Comparar los resultados de la unión en un momento determinado con los resultados de la consulta Athena

Para verificar la salida devuelta por la función point_in_time_accurate_join de SageMaker SDK, la comparamos con el resultado de una consulta Athena. Primero, creamos una consulta Athena estándar usando una instrucción SELECT vinculada a la tabla específica creada por el tiempo de ejecución de Feature Store. El nombre de esta tabla se puede encontrar haciendo referencia al campo table_name después de instanciar el athena_query desde la API de FeatureGroup:

SELECT * FROM "sagemaker_featurestore"."off_sdk_fg_lead_1682348629" 
WHERE "off_sdk_fg_lead_1682348629"."ID_Proceso_Lider" = '5e84c78f-6438-4d91-aa96-b492f7e91029'

La consulta Athena no contiene ninguna semántica de unión en un momento determinado, por lo que devuelve todos los registros que coinciden con el ID_Proceso_Lider especificado.

A continuación, usamos la biblioteca Pandas para ordenar los resultados de Athena por tiempos de evento para una comparación fácil. Los registros con marcas de tiempo posteriores a los tiempos de evento especificados en el DataFrame de entidad (por ejemplo, 2023-01-15T00:00:00Z) enviados a la point_in_time_accurate_join no aparecen en los resultados en un momento determinado. Debido a que también especificamos que solo queremos un solo registro del código anterior de create_dataset, solo obtenemos el registro más reciente antes del tiempo de corte. Al comparar los resultados de SageMaker SDK con los resultados de la consulta Athena, vemos que la función de unión en un momento determinado devolvió los registros adecuados.

Por lo tanto, tenemos confianza en que podemos usar el SDK de SageMaker para realizar un viaje en el tiempo a nivel de fila y evitar la fuga de objetivos. Además, esta capacidad funciona en múltiples grupos de características que pueden actualizarse en plazos completamente diferentes.

Recuperar el historial de características dentro de un rango de tiempo específico

También queremos demostrar el uso de especificar una ventana de tiempo cuando se unen los grupos de características para formar un conjunto de datos. La ventana de tiempo se define utilizando with_event_time_range, que acepta dos entradas, starting_timestamp y ending_timestamp, y devuelve un objeto constructor de conjunto de datos. En nuestro ejemplo de código, establecemos la ventana de tiempo de recuperación durante 1 día completo desde 2022-07-01 00:00:00 hasta 2022-07-02 00:00:00.

El siguiente código muestra cómo crear un conjunto de datos con la ventana de tiempo de evento especificada mientras se unen el grupo de características base con el grupo de características objetivo:

# Configurar ventana de tiempo de evento: segundos de tiempo de epoch unix
# Comenzar el 01/07/2022 y establecer la ventana de tiempo en un día
start_ts = 1656633600
time_window = 86400
# Usando marcas de tiempo codificadas en duro del conjunto de datos, luego agregando la ventana de tiempo
datetime_start = datetime.fromtimestamp(start_ts)
datetime_end = datetime.fromtimestamp(start_ts+time_window)
print(f'Estableciendo la ventana de tiempo de recuperación: {datetime_start} hasta {datetime_end}')
time_window_builder = (feature_store.create_dataset(
base=base_fg, output_path=f"s3://{s3_bucket_name}/dataset_query_results").with_feature_group(
feature_group=target_fg,
target_feature_name_in_base="Lead_ProspectID",
included_feature_names=["Web_ProspectID","LastCampaignActivity","PageViewsPerVisit",
"TotalTimeOnWebsite","TotalWebVisits","AttendedMarketingEvent",
"OrganicSearch","ViewedAdvertisement",],)
.with_event_time_range(starting_timestamp=datetime_start,ending_timestamp=datetime_end))

También confirmamos la diferencia entre los tamaños del conjunto de datos creado usando with_event_time_range exportando a un DataFrame de Pandas con el método to_dataframe() y mostrando los datos. Observe cómo el conjunto de resultados tiene solo una fracción de los 10,020 registros originales, porque solo recupera registros cuyo event_time se encuentra dentro del período de tiempo de 1 día.

Recuperar características a partir de una marca de tiempo específica

El método DatasetBuilder as_of recupera características de un conjunto de datos que cumplen una restricción basada en una marca de tiempo, que el llamador proporciona como argumento a la función. Este mecanismo es útil para escenarios como volver a ejecutar experimentos en datos previamente recopilados, probar modelos de series de tiempo, o construir un conjunto de datos a partir de un estado anterior de la tienda sin conexión para fines de auditoría de datos. Esta funcionalidad a veces se denomina viaje en el tiempo porque esencialmente retrocede la tienda de datos a una fecha y hora anterior. Esta restricción de tiempo también se conoce como la marca de tiempo de corte.

En nuestro código de muestra, primero creamos la marca de tiempo de corte leyendo el valor de write_time para el último registro escrito en Feature Store, el escrito con put_record. Luego proporcionamos esta marca de tiempo de corte al DatasetBuilder como un argumento al método as_of:

# Crear conjunto de datos usando la marca de tiempo de corte
print(f'usando la marca de tiempo de corte: {asof_cutoff_datetime}')
as_of_builder = feature_store.create_dataset(
base=base_fg,
output_path=f"s3://{s3_bucket_name}/{s3_prefix}/dataset_query_results").with_feature_group(
feature_group=target_fg,
target_feature_name_in_base='Lead_ProspectID',
included_feature_names=['Web_ProspectID','Web_EventTime',
'TotalWebVisits']).as_of(asof_cutoff_datetime)

Es importante tener en cuenta que el método as_of aplica la restricción de tiempo al campo interno de write_time, que se genera automáticamente por Feature Store. El campo write_time representa la marca de tiempo real en la que se escribe el registro en la tienda de datos. Esto es diferente de otros métodos como point-in-time-accurate-join y with_event_time_range que utilizan el campo de event_time proporcionado por el cliente como comparador.

Limpieza

Asegúrese de eliminar todos los recursos creados como parte de este ejemplo para evitar cargos continuos. Esto incluye los grupos de características y el bucket S3 que contiene los datos de almacenamiento sin conexión.

Experiencia con SageMaker Python SDK vs. escribir SQL

Los nuevos métodos en SageMaker Python SDK le permiten crear conjuntos de datos rápidamente y avanzar rápidamente hacia el paso de entrenamiento durante el ciclo de vida de ML. Para mostrar el tiempo y el esfuerzo que se pueden ahorrar, examinemos un caso de uso en el que necesitamos unir dos grupos de características mientras recuperamos las características dentro de un marco de tiempo específico. La siguiente figura compara las consultas de Python en el Feature Store sin conexión frente a SQL utilizado para crear el conjunto de datos detrás de una consulta de Python.

Como puede ver, la misma operación de unir dos grupos de características requiere que cree una consulta SQL larga y compleja, mientras que se puede lograr utilizando solo los métodos with_feature_group y with_event_time_range en SageMaker Python SDK.

Conclusión

Los nuevos métodos de almacenamiento sin conexión en Python SageMaker SDK le permiten consultar sus características sin conexión sin tener que escribir declaraciones SQL complejas. Esto proporciona una experiencia perfecta para los clientes que están acostumbrados a escribir código Python durante el desarrollo de modelos. Para obtener más información sobre los grupos de características, consulte Crear un conjunto de datos a partir de sus grupos de características y las API de Feature Store: Feature Group.

El ejemplo completo en esta publicación se puede encontrar en el repositorio de GitHub . Pruébelo y háganos saber sus comentarios en los comentarios.