Pronóstico de series de tiempo probabilísticas multivariadas con Informer

'Pronóstico multivariado de series de tiempo con Informer'

Introducción

Hace unos meses presentamos el Time Series Transformer, que es el Transformer estándar (Vaswani et al., 2017) aplicado a la predicción, y mostramos un ejemplo para la tarea de predicción probabilística univariante (es decir, predecir la distribución unidimensional de cada serie de tiempo de forma individual). En esta publicación presentamos el modelo Informer (Zhou, Haoyi, et al., 2021), el mejor artículo de AAAI21, que ahora está disponible en 🤗 Transformers. Mostraremos cómo utilizar el modelo Informer para la tarea de predicción probabilística multivariante, es decir, predecir la distribución de un vector futuro de valores objetivo de series de tiempo. Tenga en cuenta que esto también funcionará para el modelo vanilla Time Series Transformer.

Predicción Probabilística Multivariante de Series de Tiempo

En cuanto al aspecto de modelado de la predicción probabilística, el Transformer/Informer no requerirá cambios al tratar con series de tiempo multivariante. Tanto en el entorno univariante como en el multivariante, el modelo recibirá una secuencia de vectores y, por lo tanto, el único cambio se produce en la salida o emisión.

Modelar la distribución conjunta completa condicional de datos de alta dimensión puede resultar computacionalmente costoso, por lo que los métodos recurren a alguna aproximación de la distribución, siendo la más sencilla modelar los datos como una distribución independiente de la misma familia, o alguna aproximación de baja dimensionalidad a la covarianza completa, etc. Aquí simplemente recurriremos a las emisiones independientes (o diagonales), que son compatibles con las familias de distribuciones que hemos implementado aquí.

Informer – Bajo el Capó

Basado en el Transformer estándar (Vaswani et al., 2017), Informer emplea dos mejoras principales. Para comprender estas mejoras, recordemos las desventajas del Transformer estándar:

  1. Computación cuadrática de la autoatención canónica: El Transformer estándar tiene una complejidad computacional de O(T^2 D), donde T es la longitud de la serie de tiempo y D es la dimensión de los estados ocultos. Para la predicción de series de tiempo de secuencia larga (también conocido como problema LSTF), esto puede ser computacionalmente costoso. Para resolver este problema, Informer emplea un nuevo mecanismo de autoatención llamado atención ProbSparse, que tiene una complejidad temporal y espacial de O(T log T).
  2. Cuello de botella de memoria al apilar capas: Al apilar N capas de codificador/decodificador, el Transformer estándar tiene un uso de memoria de O(N T^2), lo que limita la capacidad del modelo para secuencias largas. Informer utiliza una operación de destilación para reducir el tamaño de entrada entre capas a la mitad. Al hacerlo, reduce el uso total de memoria a O(N · T log T).

Como puede ver, la motivación del modelo Informer es similar a Longformer (Beltagy et al., 2020), Sparse Transformer (Child et al., 2019) y otros artículos de procesamiento de lenguaje natural (NLP) para reducir la complejidad cuadrática del mecanismo de autoatención cuando la secuencia de entrada es larga. Ahora, sumerjámonos en la atención ProbSparse y la operación de destilación con ejemplos de código.

Atención ProbSparse

La idea principal de ProbSparse es que las puntuaciones de autoatención canónica forman una distribución de cola larga, donde las consultas “activas” se encuentran en las puntuaciones “principales” y las consultas “perezosas” se encuentran en el área “de cola”. Por “consulta activa” nos referimos a una consulta q_i que contribuye a la atención principal, mientras que una consulta “perezosa” forma un producto escalar que genera una atención trivial. Aquí, q_i y k_i son las filas i-ésimas de las matrices de atención Q y K, respectivamente.

Dada la idea de consultas “activas” y “perezosas”, la atención ProbSparse selecciona las consultas “activas” y crea una matriz de consulta reducida, Q_reduced, que se utiliza para calcular los pesos de atención en O(T log T). Veamos esto con más detalle con un ejemplo de código.

Recuerda la fórmula canónica de autoatención:

Atención ( Q , K , V ) = softmax ( Q K T d k ) V

Donde Q ∈ R L Q × d , K ∈ R L K × d y V ∈ R L V × d . Observa que en la práctica, la longitud de entrada de las consultas y las claves son típicamente equivalentes en el cálculo de la autoatención, es decir, L Q = L K = T, donde T es la longitud de la serie temporal. Por lo tanto, la multiplicación Q K T tiene una complejidad computacional de O ( T 2 ⋅ d ). En la atención ProbSparse, nuestro objetivo es crear una nueva matriz Q r e d u c e y definir:

AtenciónProbSparse ( Q , K , V ) = softmax ( Q r e d u c e K T d k ) V

donde la matriz Q r e d u c e solo selecciona las mejores u consultas “activas”. Aquí, u = c ⋅ log ⁡ L Q y c se llama el factor de muestreo hiperparámetro para la atención ProbSparse. Dado que Q r e d u c e selecciona solo las mejores u consultas, su tamaño es c ⋅ log ⁡ L Q × d, por lo que la multiplicación Q r e d u c e K T toma solo O ( L K log ⁡ L Q ) = O ( T log ⁡ T ).

¡Esto es bueno! Pero, ¿cómo podemos seleccionar las consultas “activas” u para crear Q r e d u c e ? Definamos la Medida de Esparsidad de Consulta.

Medida de Esparsidad de Consulta

La Medida de Esparsidad de Consulta M ( q i , K ) se utiliza para seleccionar las consultas “activas” u q i de Q y crear Q r e d u c e . En teoría, los pares ⟨ q i , k i ⟩ dominantes fomentan la distribución de probabilidad “activa” de q i lejos de la distribución uniforme, como se puede ver en la figura de abajo. Por lo tanto, la divergencia KL entre la distribución real de las consultas y la distribución uniforme se utiliza para definir la medida de esparsidad.

En la práctica, la medida se define como:

M ( q i , K ) = max ⁡ j ( q i k j T d ) − 1 L k ∑ j = 1 L k ( q i k j T d )

Lo importante de entender aquí es que cuando M ( q i , K ) M(q_i, K) M ( q i ​ , K ) es más grande, la consulta q i q_i q i ​ debería estar en Q r e d u c e Q_{reduce} Q r e d u c e ​ y viceversa.

Pero ¿cómo podemos calcular el término q i k j T q_ik_j^T q i ​ k j T ​ en tiempo no cuadrático? Recordemos que la mayoría de los productos punto ⟨ q i , k i ⟩ \langle q_i,k_i \rangle ⟨ q i ​ , k i ​ ⟩ generan de cualquier manera la atención trivial (es decir, la propiedad de distribución de cola larga), por lo que es suficiente muestrear aleatoriamente un subconjunto de claves de K K K , que se llamará K_sample en el código.

Ahora, estamos listos para ver el código de probsparse_attention:

from torch import nn
import math


def probsparse_attention(query_states, key_states, value_states, sampling_factor=5):
    """
    Calcula la auto-atención prob-escasa.
    Forma de entrada: Lote x Tiempo x Canal

    Nota: se incluye el parámetro adicional `sampling_factor`.
    """
    # obtiene los tamaños de entrada con logaritmos
    L_K = key_states.size(1)
    L_Q = query_states.size(1)
    log_L_K = np.ceil(np.log1p(L_K)).astype("int").item()
    log_L_Q = np.ceil(np.log1p(L_Q)).astype("int").item()

    # calcula un subconjunto de muestras para cortar de K y crear Q_K_sample
    U_part = min(sampling_factor * L_Q * log_L_K, L_K)

    # crea Q_K_sample (el término q_i * k_j^T en la medición de la escasez)
    index_sample = torch.randint(0, L_K, (U_part,))
    K_sample = key_states[:, index_sample, :]
    Q_K_sample = torch.bmm(query_states, K_sample.transpose(1, 2))

    # calcula la medición de escasez de la consulta con Q_K_sample
    M = Q_K_sample.max(dim=-1)[0] - torch.div(Q_K_sample.sum(dim=-1), L_K)

    # calcula u para encontrar las consultas principales bajo la medición de escasez
    u = min(sampling_factor * log_L_Q, L_Q)
    M_top = M.topk(u, sorted=False)[1]

    # calcula Q_reduce como query_states[:, M_top]
    dim_for_slice = torch.arange(query_states.size(0)).unsqueeze(-1)
    Q_reduce = query_states[dim_for_slice, M_top]  # tamaño: c*log_L_Q x canal

    # y ahora, igual que el canónico
    d_k = query_states.size(-1)
    attn_scores = torch.bmm(Q_reduce, key_states.transpose(-2, -1))  # Q_reduce x K^T
    attn_scores = attn_scores / math.sqrt(d_k)
    attn_probs = nn.functional.softmax(attn_scores, dim=-1)
    attn_output = torch.bmm(attn_probs, value_states)

    return attn_output, attn_scores

Ten en cuenta que en la implementación, U p a r t U_{part} U p a r t ​ contiene L Q L_Q L Q ​ en el cálculo, por problemas de estabilidad (ver esta discusión para más información).

¡Lo logramos! Ten en cuenta que esto es solo una implementación parcial de probsparse_attention, y la implementación completa se puede encontrar en 🤗 Transformers.

Destilación

Debido a la auto-atención prob-escasa, el mapa de características del codificador tiene cierta redundancia que se puede eliminar. Por lo tanto, se utiliza la operación de destilación para reducir el tamaño de entrada entre las capas del codificador a su mitad, eliminando así teóricamente esta redundancia. En la práctica, la operación de “destilación” de Informer simplemente agrega capas de convolución 1D con max pooling entre cada una de las capas del codificador. Sea X n X_n X n ​ la salida de la n n n -ésima capa del codificador, entonces la operación de destilación se define como:

X n + 1 = MaxPool ( ELU ( Conv1d ( X n ) ) X_{n+1} = \textrm{MaxPool} ( \textrm{ELU}(\textrm{Conv1d}(X_n)) X n + 1 ​ = MaxPool ( ELU ( Conv1d ( X n ​ ) )

Veamos esto en código:

from torch import nn

# ConvLayer es una clase con un paso hacia adelante que aplica ELU y MaxPool1d
def informer_encoder_forward(x_input, num_encoder_layers=3, distil=True):
    # Inicializa las capas de convolución
    if distil:
        conv_layers = nn.ModuleList([ConvLayer() for _ in range(num_encoder_layers - 1)])
        conv_layers.append(None)
    else:
        conv_layers = [None] * num_encoder_layers
    
    # Aplica conv_layer entre cada capa del codificador
    for encoder_layer, conv_layer in zip(encoder_layers, conv_layers):
        output = encoder_layer(x_input)
        if conv_layer is not None:
            output = conv_layer(loutput)
    
    return output

Al reducir la entrada de cada capa a la mitad, obtenemos un uso de memoria de O(N ⋅ T log T) en lugar de O(N ⋅ T^2), donde N es el número de capas de codificador/descodificador. ¡Esto es lo que queríamos!

El modelo Informer ahora está disponible en la biblioteca 🤗 Transformers y se llama simplemente InformerModel. En las secciones siguientes, mostraremos cómo entrenar este modelo en un conjunto de datos personalizado de series de tiempo multivariadas.

Configurar el entorno

Primero, instalemos las bibliotecas necesarias: 🤗 Transformers, 🤗 Datasets, 🤗 Evaluate, 🤗 Accelerate y GluonTS.

Como mostraremos, GluonTS se utilizará para transformar los datos y crear características, así como para crear lotes de entrenamiento, validación y prueba adecuados.

!pip install -q transformers datasets evaluate accelerate gluonts ujson

Cargar conjunto de datos

En esta publicación de blog, usaremos el conjunto de datos traffic_hourly, que está disponible en el Hugging Face Hub. Este conjunto de datos contiene el conjunto de datos de tráfico de San Francisco utilizado por Lai et al. (2017). Contiene 862 series de tiempo por hora que muestran las tasas de ocupación de las carreteras en el rango de [0, 1] en el área de la bahía de San Francisco desde 2015 hasta 2016.

Este conjunto de datos forma parte del repositorio de pronóstico de series de tiempo de Monash, una colección de conjuntos de datos de series de tiempo de varios dominios. Se puede ver como el benchmark GLUE de pronóstico de series de tiempo.

from datasets import load_dataset

dataset = load_dataset("monash_tsf", "traffic_hourly")

Como se puede ver, el conjunto de datos contiene 3 divisiones: entrenamiento, validación y prueba.

dataset

>>> DatasetDict({
        train: Dataset({
            features: ['start', 'target', 'feat_static_cat', 'feat_dynamic_real', 'item_id'],
            num_rows: 862
        })
        test: Dataset({
            features: ['start', 'target', 'feat_static_cat', 'feat_dynamic_real', 'item_id'],
            num_rows: 862
        })
        validation: Dataset({
            features: ['start', 'target', 'feat_static_cat', 'feat_dynamic_real', 'item_id'],
            num_rows: 862
        })
    })

Cada ejemplo contiene algunas claves, de las cuales start y target son las más importantes. Veamos la primera serie de tiempo en el conjunto de datos:

train_example = dataset["train"][0]
train_example.keys()

>>> dict_keys(['start', 'target', 'feat_static_cat', 'feat_dynamic_real', 'item_id'])

El start simplemente indica el inicio de la serie de tiempo (como una fecha y hora), y el target contiene los valores reales de la serie de tiempo.

El start será útil para agregar características relacionadas con el tiempo a los valores de la serie de tiempo, como entrada adicional para el modelo (por ejemplo, “mes del año”). Dado que conocemos la frecuencia de los datos es por hora, sabemos, por ejemplo, que el segundo valor tiene la marca de tiempo 2015-01-01 01:00:01, 2015-01-01 02:00:01, etc.

print(train_example["start"])
print(len(train_example["target"]))

>>> 2015-01-01 00:00:01
    17448

El conjunto de validación contiene los mismos datos que el conjunto de entrenamiento, pero para un período de tiempo más largo de longitud de predicción. Esto nos permite validar las predicciones del modelo frente a la verdad básica.

El conjunto de prueba es nuevamente un conjunto de datos de longitud de predicción más largo en comparación con el conjunto de validación (o un múltiplo de longitud de predicción más largo en comparación con el conjunto de entrenamiento para realizar pruebas en varias ventanas móviles).

validation_example = dataset["validation"][0]
validation_example.keys()

>>> dict_keys(['start', 'target', 'feat_static_cat', 'feat_dynamic_real', 'item_id'])

Los valores iniciales son exactamente los mismos que los del ejemplo de entrenamiento correspondiente. Sin embargo, este ejemplo tiene prediction_length=48 (48 horas, o 2 días) valores adicionales en comparación con el ejemplo de entrenamiento. Vamos a verificarlo.

freq = "1H"
prediction_length = 48

assert len(train_example["target"]) + prediction_length == len(
    dataset["validation"][0]["target"]
)

Vamos a visualizar esto:

import matplotlib.pyplot as plt

num_of_samples = 150

figure, axes = plt.subplots()
axes.plot(train_example["target"][-num_of_samples:], color="blue")
axes.plot(
    validation_example["target"][-num_of_samples - prediction_length :],
    color="red",
    alpha=0.5,
)

plt.show()

Vamos a dividir los datos:

train_dataset = dataset["train"]
test_dataset = dataset["test"]

Actualizar start a pd.Period

Lo primero que haremos es convertir la característica start de cada serie temporal en un índice de pandas Period utilizando la frecuencia de los datos:

from functools import lru_cache

import pandas as pd
import numpy as np


@lru_cache(10_000)
def convert_to_pandas_period(date, freq):
    return pd.Period(date, freq)


def transform_start_field(batch, freq):
    batch["start"] = [convert_to_pandas_period(date, freq) for date in batch["start"]]
    return batch

Ahora utilizamos la funcionalidad set_transform de datasets para hacer esto sobre la marcha:

from functools import partial

train_dataset.set_transform(partial(transform_start_field, freq=freq))
test_dataset.set_transform(partial(transform_start_field, freq=freq))

A continuación, vamos a convertir el conjunto de datos en una serie temporal multivariable utilizando el MultivariateGrouper de GluonTS. Este agrupador convertirá las series temporales unidimensionales individuales en una matriz 2D única.

from gluonts.dataset.multivariate_grouper import MultivariateGrouper

num_of_variates = len(train_dataset)

train_grouper = MultivariateGrouper(max_target_dim=num_of_variates)
test_grouper = MultivariateGrouper(
    max_target_dim=num_of_variates,
    num_test_dates=len(test_dataset) // num_of_variates, # número de ventanas de prueba en movimiento
)

multi_variate_train_dataset = train_grouper(train_dataset)
multi_variate_test_dataset = test_grouper(test_dataset)

Tenga en cuenta que el objetivo ahora es bidimensional, donde la primera dimensión es el número de variables (número de series temporales) y la segunda es los valores de la serie temporal (dimensión temporal):

multi_variate_train_example = multi_variate_train_dataset[0]
print("multi_variate_train_example["target"].shape =", multi_variate_train_example["target"].shape)

>>> multi_variate_train_example["target"].shape = (862, 17448)

Definir el Modelo

A continuación, vamos a instanciar un modelo. El modelo se entrenará desde cero, por lo que no utilizaremos el método from_pretrained aquí, sino que inicializaremos el modelo de manera aleatoria a partir de una config.

Especificamos un par de parámetros adicionales para el modelo:

  • prediction_length (en nuestro caso, 48 horas): este es el horizonte que el decodificador del Informer aprenderá a predecir;
  • context_length: el modelo establecerá el context_length (entrada del codificador) igual al prediction_length, si no se especifica un context_length;
  • lags para una frecuencia dada: estos especifican un mecanismo eficiente de “mirada atrás”, donde concatenamos valores del pasado a los valores actuales como características adicionales, por ejemplo, para una frecuencia Daily podríamos considerar una mirada atrás de [1, 7, 30, ...] o para datos Minute podríamos considerar [1, 30, 60, 60*24, ...], etc.;
  • el número de características temporales: en nuestro caso, esto será 5, ya que agregaremos características de HourOfDay, DayOfWeek, …, y Age (ver más abajo).

Veamos los desfases predeterminados proporcionados por GluonTS para la frecuencia indicada (“hora”):

from gluonts.time_feature import get_lags_for_frequency

lags_sequence = get_lags_for_frequency(freq)
print(lags_sequence)

>>> [1, 2, 3, 4, 5, 6, 7, 23, 24, 25, 47, 48, 49, 71, 72, 73, 95, 96, 97, 119, 120, 
     121, 143, 144, 145, 167, 168, 169, 335, 336, 337, 503, 504, 505, 671, 672, 673, 719, 720, 721]

Esto significa que se retrocederá hasta 721 horas (~30 días) para cada paso de tiempo, como características adicionales. Sin embargo, el vector de características resultante tendría un tamaño de len(lags_sequence)*num_of_variates, ¡que en nuestro caso sería de 34480! Esto no va a funcionar, así que usaremos nuestros propios desfases sensibles.

También veamos las características de tiempo predeterminadas que nos proporciona GluonTS:

from gluonts.time_feature import time_features_from_frequency_str

time_features = time_features_from_frequency_str(freq)
print(time_features)

>>> [<function hour_of_day at 0x7f3809539240>, <function day_of_week at 0x7f3809539360>, <function day_of_month at 0x7f3809539480>, <function day_of_year at 0x7f38095395a0>]

En este caso, hay cuatro características adicionales, a saber, “hora del día”, “día de la semana”, “día del mes” y “día del año”. Esto significa que para cada paso de tiempo, agregaremos estas características como valores escalares. Por ejemplo, consideremos la marca de tiempo 2015-01-01 01:00:01. Las cuatro características adicionales serán:

from pandas.core.arrays.period import period_array

timestamp = pd.Period("2015-01-01 01:00:01", freq=freq)
timestamp_as_index = pd.PeriodIndex(data=period_array([timestamp]))
additional_features = [
    (time_feature.__name__, time_feature(timestamp_as_index))
    for time_feature in time_features
]
print(dict(additional_features))

>>> {'hour_of_day': array([-0.45652174]), 'day_of_week': array([0.]), 'day_of_month': array([-0.5]), 'day_of_year': array([-0.5])}

Tenga en cuenta que las horas y los días se codifican como valores entre [-0.5, 0.5] en GluonTS. Para obtener más información sobre time_features, consulte esto . Además de esas 4 características, también agregaremos una característica “edad” como veremos más adelante en las transformaciones de datos.

Ahora tenemos todo para definir el modelo:

from transformers import InformerConfig, InformerForPrediction

config = InformerConfig(
    # en el entorno multivariable, input_size es el número de variables en la serie de tiempo por paso de tiempo
    input_size=num_of_variates,
    # longitud de predicción:
    prediction_length=prediction_length,
    # longitud de contexto:
    context_length=prediction_length * 2,
    # valor de desfases copiado de 1 semana antes:
    lags_sequence=[1, 24 * 7],
    # agregaremos 5 características de tiempo ("hora_del_día", ..., y "edad"):
    num_time_features=len(time_features) + 1,
    
    # parámetros de informer:
    dropout=0.1,
    encoder_layers=6,
    decoder_layers=4,
    # proyectar la entrada desde num_of_variates*len(lags_sequence)+num_time_features a:
    d_model=64,
)

model = InformerForPrediction(config)

De forma predeterminada, el modelo utiliza una distribución diagonal Student-t (pero esto es configurable):

model.config.distribution_output

>>> 'student_t'

Definir Transformaciones

A continuación, definimos las transformaciones para los datos, en particular para la creación de las características de tiempo (basadas en el conjunto de datos o en características universales).

Nuevamente, usaremos la biblioteca GluonTS para esto. Definimos una Chain de transformaciones (que es un poco comparable a torchvision.transforms.Compose para imágenes). Nos permite combinar varias transformaciones en un solo pipeline.

from gluonts.time_feature import TimeFeature
from gluonts.dataset.field_names import FieldName
from gluonts.transform import (
    AddAgeFeature,
    AddObservedValuesIndicator,
    AddTimeFeatures,
    AsNumpyArray,
    Chain,
    ExpectedNumInstanceSampler,
    InstanceSplitter,
    RemoveFields,
    SelectFields,
    SetField,
    TestSplitSampler,
    Transformation,
    ValidationSplitSampler,
    VstackFeatures,
    RenameFields,
)

Las transformaciones a continuación están anotadas con comentarios para explicar qué hacen. En un nivel alto, iteraremos sobre las series de tiempo individuales de nuestro conjunto de datos y agregaremos/eliminaremos campos o características:

from transformers import PretrainedConfig


def create_transformation(freq: str, config: PretrainedConfig) -> Transformation:
    # crear lista de campos a eliminar más tarde
    remove_field_names = []
    if config.num_static_real_features == 0:
        remove_field_names.append(FieldName.FEAT_STATIC_REAL)
    if config.num_dynamic_real_features == 0:
        remove_field_names.append(FieldName.FEAT_DYNAMIC_REAL)
    if config.num_static_categorical_features == 0:
        remove_field_names.append(FieldName.FEAT_STATIC_CAT)

    return Chain(
        # paso 1: eliminar campos estáticos/dinámicos si no se especifican
        [RemoveFields(field_names=remove_field_names)]
        # paso 2: convertir los datos a NumPy (potencialmente no es necesario)
        + (
            [
                AsNumpyArray(
                    field=FieldName.FEAT_STATIC_CAT,
                    expected_ndim=1,
                    dtype=int,
                )
            ]
            if config.num_static_categorical_features > 0
            else []
        )
        + (
            [
                AsNumpyArray(
                    field=FieldName.FEAT_STATIC_REAL,
                    expected_ndim=1,
                )
            ]
            if config.num_static_real_features > 0
            else []
        )
        + [
            AsNumpyArray(
                field=FieldName.TARGET,
                # esperamos una dimensión adicional para el caso multivariado:
                expected_ndim=1 if config.input_size == 1 else 2,
            ),
            # paso 3: manejar los NaN's llenando el objetivo con cero
            # y devolver la máscara (que está en los valores observados)
            # verdadero para valores observados, falso para NaN's
            # el decodificador utiliza esta máscara (no se incurre en pérdida para valores no observados)
            # ver loss_weights dentro del modelo xxxForPrediction
            AddObservedValuesIndicator(
                target_field=FieldName.TARGET,
                output_field=FieldName.OBSERVED_VALUES,
            ),
            # paso 4: agregar características temporales basadas en la frecuencia del conjunto de datos
            # estas sirven como codificaciones posicionales
            AddTimeFeatures(
                start_field=FieldName.START,
                target_field=FieldName.TARGET,
                output_field=FieldName.FEAT_TIME,
                time_features=time_features_from_frequency_str(freq),
                pred_length=config.prediction_length,
            ),
            # paso 5: agregar otra característica temporal (solo un número)
            # indica al modelo en qué parte de la vida se encuentra el valor de la serie de tiempo
            # una especie de contador en marcha
            AddAgeFeature(
                target_field=FieldName.TARGET,
                output_field=FieldName.FEAT_AGE,
                pred_length=config.prediction_length,
                log_scale=True,
            ),
            # paso 6: apilar verticalmente todas las características temporales en la clave FEAT_TIME
            VstackFeatures(
                output_field=FieldName.FEAT_TIME,
                input_fields=[FieldName.FEAT_TIME, FieldName.FEAT_AGE]
                + (
                    [FieldName.FEAT_DYNAMIC_REAL]
                    if config.num_dynamic_real_features > 0
                    else []
                ),
            ),
            # paso 7: renombrar para que coincida con los nombres de HuggingFace
            RenameFields(
                mapping={
                    FieldName.FEAT_STATIC_CAT: "static_categorical_features",
                    FieldName.FEAT_STATIC_REAL: "static_real_features",
                    FieldName.FEAT_TIME: "time_features",
                    FieldName.TARGET: "values",
                    FieldName.OBSERVED_VALUES: "observed_mask",
                }
            ),
        ]
    )

Definir InstanceSplitter

Para entrenamiento/validación/prueba, a continuación creamos un InstanceSplitter que se utiliza para muestrear ventanas del conjunto de datos (ya que, recuerda, no podemos pasar todo el historial de valores al modelo debido a restricciones de tiempo y memoria).

El instance splitter muestrea ventanas aleatorias de tamaño context_length y ventanas subsiguientes de tamaño prediction_length de los datos y agrega una clave past_ o future_ a cualquier clave temporal para las ventanas respectivas. Esto asegura que los values se dividirán en claves past_values y future_values, que servirán como entradas del codificador y decodificador respectivamente. Lo mismo ocurre para cualquier clave en el argumento time_series_fields:

from gluonts.transform.sampler import InstanceSampler
from typing import Optional


def create_instance_splitter(
    config: PretrainedConfig,
    mode: str,
    train_sampler: Optional[InstanceSampler] = None,
    validation_sampler: Optional[InstanceSampler] = None,
) -> Transformation:
    assert mode in ["train", "validation", "test"]

    instance_sampler = {
        "train": train_sampler
        or ExpectedNumInstanceSampler(
            num_instances=1.0, min_future=config.prediction_length
        ),
        "validation": validation_sampler
        or ValidationSplitSampler(min_future=config.prediction_length),
        "test": TestSplitSampler(),
    }[mode]

    return InstanceSplitter(
        target_field="values",
        is_pad_field=FieldName.IS_PAD,
        start_field=FieldName.START,
        forecast_start_field=FieldName.FORECAST_START,
        instance_sampler=instance_sampler,
        past_length=config.context_length + max(config.lags_sequence),
        future_length=config.prediction_length,
        time_series_fields=["time_features", "observed_mask"],
    )

Crear DataLoaders

A continuación, es hora de crear los DataLoaders, que nos permiten tener lotes de pares de (entrada, salida) – o en otras palabras (past_values, future_values).

from typing import Iterable

import torch
from gluonts.itertools import Cached, Cyclic
from gluonts.dataset.loader import as_stacked_batches


def create_train_dataloader(
    config: PretrainedConfig,
    freq,
    data,
    batch_size: int,
    num_batches_per_epoch: int,
    shuffle_buffer_length: Optional[int] = None,
    cache_data: bool = True,
    **kwargs,
) -> Iterable:
    PREDICTION_INPUT_NAMES = [
        "past_time_features",
        "past_values",
        "past_observed_mask",
        "future_time_features",
    ]
    if config.num_static_categorical_features > 0:
        PREDICTION_INPUT_NAMES.append("static_categorical_features")

    if config.num_static_real_features > 0:
        PREDICTION_INPUT_NAMES.append("static_real_features")

    TRAINING_INPUT_NAMES = PREDICTION_INPUT_NAMES + [
        "future_values",
        "future_observed_mask",
    ]

    transformation = create_transformation(freq, config)
    transformed_data = transformation.apply(data, is_train=True)
    if cache_data:
        transformed_data = Cached(transformed_data)

    # inicializamos una instancia de Training
    instance_splitter = create_instance_splitter(config, "train")

    # el instance splitter muestreará una ventana de longitud de contexto + lags + longitud de predicción (de todas las posibles series de tiempo transformadas, 1 en nuestro caso)
    # aleatoriamente desde dentro de la serie de tiempo objetivo y devolverá un iterador.
    stream = Cyclic(transformed_data).stream()
    training_instances = instance_splitter.apply(
        stream, is_train=True
    )
    
    return as_stacked_batches(
        training_instances,
        batch_size=batch_size,
        shuffle_buffer_length=shuffle_buffer_length,
        field_names=TRAINING_INPUT_NAMES,
        output_type=torch.tensor,
        num_batches_per_epoch=num_batches_per_epoch,
    )

def create_test_dataloader(
    config: PretrainedConfig,
    freq,
    data,
    batch_size: int,
    **kwargs,
):
    PREDICTION_INPUT_NAMES = [
        "past_time_features",
        "past_values",
        "past_observed_mask",
        "future_time_features",
    ]
    if config.num_static_categorical_features > 0:
        PREDICTION_INPUT_NAMES.append("static_categorical_features")

    if config.num_static_real_features > 0:
        PREDICTION_INPUT_NAMES.append("static_real_features")

    transformation = create_transformation(freq, config)
    transformed_data = transformation.apply(data, is_train=False)

    # creamos un Test Instance splitter que muestreará la última ventana de contexto vista durante el entrenamiento solo para el codificador.
    instance_sampler = create_instance_splitter(config, "test")

    # aplicamos las transformaciones en modo de prueba
    testing_instances = instance_sampler.apply(transformed_data, is_train=False)
    
    return as_stacked_batches(
        testing_instances,
        batch_size=batch_size,
        output_type=torch.tensor,
        field_names=PREDICTION_INPUT_NAMES,
    )

train_dataloader = create_train_dataloader(
    config=config,
    freq=freq,
    data=multi_variate_train_dataset,
    batch_size=256,
    num_batches_per_epoch=100,
    num_workers=2,
)

test_dataloader = create_test_dataloader(
    config=config,
    freq=freq,
    data=multi_variate_test_dataset,
    batch_size=32,
)

Verifiquemos el primer lote:

lote = next(iter(train_dataloader))
for k, v in lote.items():
    print(k, v.shape, v.type())

>>> past_time_features torch.Size([256, 264, 5]) torch.FloatTensor
    past_values torch.Size([256, 264, 862]) torch.FloatTensor
    past_observed_mask torch.Size([256, 264, 862]) torch.FloatTensor
    future_time_features torch.Size([256, 48, 5]) torch.FloatTensor
    future_values torch.Size([256, 48, 862]) torch.FloatTensor
    future_observed_mask torch.Size([256, 48, 862]) torch.FloatTensor

Como se puede ver, no alimentamos input_ids y attention_mask al codificador (como sería el caso de los modelos de procesamiento de lenguaje natural), sino past_values, junto con past_observed_mask, past_time_features y static_real_features.

Las entradas del decodificador consisten en future_values, future_observed_mask y future_time_features. Los future_values se pueden ver como el equivalente de decoder_input_ids en NLP.

Consulte la documentación para obtener una explicación detallada de cada uno de ellos.

Paso hacia adelante

Realicemos un solo paso hacia adelante con el lote que acabamos de crear:

# realizar paso hacia adelante
outputs = model(
    past_values=lote["past_values"],
    past_time_features=lote["past_time_features"],
    past_observed_mask=lote["past_observed_mask"],
    static_categorical_features=lote["static_categorical_features"]
    if config.num_static_categorical_features > 0
    else None,
    static_real_features=lote["static_real_features"]
    if config.num_static_real_features > 0
    else None,
    future_values=lote["future_values"],
    future_time_features=lote["future_time_features"],
    future_observed_mask=lote["future_observed_mask"],
    output_hidden_states=True,
)

print("Pérdida:", outputs.loss.item())

>>> Pérdida: -1071.5718994140625

Tenga en cuenta que el modelo devuelve una pérdida. Esto es posible ya que el decodificador desplaza automáticamente los future_values una posición a la derecha para tener las etiquetas. Esto permite calcular una pérdida entre los valores predichos y las etiquetas. La pérdida es el logaritmo negativo de la verosimilitud de la distribución predicha con respecto a los valores verdaderos y tiende hacia menos infinito.

También tenga en cuenta que el decodificador utiliza una máscara causal para no mirar hacia el futuro, ya que los valores que necesita predecir están en el tensor future_values.

Entrenar el modelo

¡Es hora de entrenar el modelo! Utilizaremos un bucle de entrenamiento estándar de PyTorch.

Aquí utilizaremos la biblioteca 🤗 Accelerate, que coloca automáticamente el modelo, el optimizador y el dataloader en el device apropiado.

from accelerate import Accelerator
from torch.optim import AdamW

epochs = 25
historial_de_perdidas = []

acelerador = Accelerator()
dispositivo = acelerador.device

model.to(dispositivo)
optimizador = AdamW(model.parameters(), lr=6e-4, betas=(0.9, 0.95), weight_decay=1e-1)

model, optimizador, train_dataloader = acelerador.prepare(
    model,
    optimizador,
    train_dataloader,
)

model.train()
for epoca in range(epochs):
    for idx, lote in enumerate(train_dataloader):
        optimizador.zero_grad()
        outputs = model(
            static_categorical_features=lote["static_categorical_features"].to(dispositivo)
            if config.num_static_categorical_features > 0
            else None,
            static_real_features=lote["static_real_features"].to(dispositivo)
            if config.num_static_real_features > 0
            else None,
            past_time_features=lote["past_time_features"].to(dispositivo),
            past_values=lote["past_values"].to(dispositivo),
            future_time_features=lote["future_time_features"].to(dispositivo),
            future_values=lote["future_values"].to(dispositivo),
            past_observed_mask=lote["past_observed_mask"].to(dispositivo),
            future_observed_mask=lote["future_observed_mask"].to(dispositivo),
        )
        perdida = outputs.loss

        # Retropropagación
        acelerador.backward(perdida)
        optimizador.step()

        historial_de_perdidas.append(perdida.item())
        if idx % 100 == 0:
            print(perdida.item())

>>> -1081.978515625
    ...
    -2877.723876953125

# ver entrenamiento
historial_de_perdidas = np.array(historial_de_perdidas).reshape(-1)
x = range(historial_de_perdidas.shape[0])
plt.figure(figsize=(10, 5))
plt.plot(x, historial_de_perdidas, label="entrenamiento")
plt.title("Pérdida", fontsize=15)
plt.legend(loc="upper right")
plt.xlabel("iteración")
plt.ylabel("nll")
plt.show()

Inferencia

En el momento de la inferencia, se recomienda utilizar el método generate() para la generación autoregresiva, similar a los modelos de NLP.

La predicción implica obtener datos del muestreador de instancias de prueba, que muestreará la ventana de valores de tamaño context_length más reciente de cada serie de tiempo en el conjunto de datos, y pasarlo al modelo. Tenga en cuenta que pasamos future_time_features, que se conocen de antemano, al decodificador.

El modelo muestreará autoregresivamente un cierto número de valores de la distribución predicha y los pasará de vuelta al decodificador para devolver las salidas de predicción:

model.eval()

forecasts_ = []

for batch in test_dataloader:
    outputs = model.generate(
        static_categorical_features=batch["static_categorical_features"].to(device)
        if config.num_static_categorical_features > 0
        else None,
        static_real_features=batch["static_real_features"].to(device)
        if config.num_static_real_features > 0
        else None,
        past_time_features=batch["past_time_features"].to(device),
        past_values=batch["past_values"].to(device),
        future_time_features=batch["future_time_features"].to(device),
        past_observed_mask=batch["past_observed_mask"].to(device),
    )
    forecasts_.append(outputs.sequences.cpu().numpy())

El modelo devuelve un tensor de forma (tamaño del lote, número de muestras, longitud de la predicción, tamaño de entrada).

En este caso, obtenemos 100 valores posibles para las próximas 48 horas para cada una de las 862 series de tiempo (para cada ejemplo en el lote, que tiene un tamaño de 1 ya que solo tenemos una única serie de tiempo multivariable):

forecasts_[0].shape

>>> (1, 100, 48, 862)

Los apilaremos verticalmente para obtener predicciones para todas las series de tiempo en el conjunto de datos de prueba (en caso de que haya más series de tiempo en el conjunto de prueba):

forecasts = np.vstack(forecasts_)
print(forecasts.shape)

>>> (1, 100, 48, 862)

Podemos evaluar la predicción resultante con respecto a los valores fuera de muestra del conjunto de prueba. Para eso, utilizaremos la biblioteca 🤗 Evaluate, que incluye las métricas MASE y sMAPE.

Calculamos ambas métricas para cada variable de serie de tiempo en el conjunto de datos:

from evaluate import load
from gluonts.time_feature import get_seasonality

mase_metric = load("evaluate-metric/mase")
smape_metric = load("evaluate-metric/smape")

forecast_median = np.median(forecasts, 1).squeeze(0).T

mase_metrics = []
smape_metrics = []

for item_id, ts in enumerate(test_dataset):
    training_data = ts["target"][:-prediction_length]
    ground_truth = ts["target"][-prediction_length:]
    mase = mase_metric.compute(
        predictions=forecast_median[item_id],
        references=np.array(ground_truth),
        training=np.array(training_data),
        periodicity=get_seasonality(freq),
    )
    mase_metrics.append(mase["mase"])

    smape = smape_metric.compute(
        predictions=forecast_median[item_id],
        references=np.array(ground_truth),
    )
    smape_metrics.append(smape["smape"])

print(f"MASE: {np.mean(mase_metrics)}")

>>> MASE: 1.1913437728068093

print(f"sMAPE: {np.mean(smape_metrics)}")

>>> sMAPE: 0.5322665081607634

plt.scatter(mase_metrics, smape_metrics, alpha=0.2)
plt.xlabel("MASE")
plt.ylabel("sMAPE")
plt.show()

Para trazar la predicción para cualquier variable de serie de tiempo con respecto a los datos de prueba de verdad fundamentales, definimos la siguiente función auxiliar:

import matplotlib.dates as mdates


def graficar(ts_index, mv_index):
    fig, ax = plt.subplots()

    index = pd.period_range(
        start=multi_variate_test_dataset[ts_index][FieldName.START],
        periods=len(multi_variate_test_dataset[ts_index][FieldName.TARGET]),
        freq=multi_variate_test_dataset[ts_index][FieldName.START].freq,
    ).to_timestamp()

    ax.xaxis.set_minor_locator(mdates.HourLocator())

    ax.plot(
        index[-2 * prediction_length :],
        multi_variate_test_dataset[ts_index]["target"][mv_index, -2 * prediction_length :],
        label="real",
    )

    ax.plot(
        index[-prediction_length:],
        forecasts[ts_index, ..., mv_index].mean(axis=0),
        label="media",
    )
    ax.fill_between(
        index[-prediction_length:],
        forecasts[ts_index, ..., mv_index].mean(0)
        - forecasts[ts_index, ..., mv_index].std(axis=0),
        forecasts[ts_index, ..., mv_index].mean(0)
        + forecasts[ts_index, ..., mv_index].std(axis=0),
        alpha=0.2,
        interpolate=True,
        label="+/- 1-std",
    )
    ax.legend()
    fig.autofmt_xdate()

Por ejemplo:

graficar(0, 344)

Conclusion

¿Cómo nos comparamos con otros modelos? El Repositorio de Series de Tiempo de Monash tiene una tabla de comparación de métricas MASE en el conjunto de prueba a la cual podemos agregar:

Como se puede ver, y tal vez sorprendente para algunos, las predicciones multivariables suelen ser peores que las univariables, debido a la dificultad para estimar las correlaciones/relaciones entre series. La varianza adicional agregada por las estimaciones a menudo perjudica las predicciones resultantes o el modelo aprende correlaciones espurias. Se puede consultar este documento para obtener más información. Los modelos multivariables tienden a funcionar bien cuando se entrenan con muchos datos.

¡Por lo tanto, el Transformer básico sigue siendo el mejor aquí! En el futuro, esperamos poder realizar mejores pruebas de referencia de estos modelos en un lugar central para facilitar la reproducción de los resultados de varios artículos. ¡Estén atentos para más novedades!

Recursos

Recomendamos consultar la documentación de Informer y el cuaderno de ejemplo vinculado en la parte superior de esta publicación de blog.