La forma adecuada de realizar llamadas a la API de ChatGPT

Forma adecuada de llamar a la API de ChatGPT

Cómo realizar llamadas confiables a la API de ChatGPT para construir aplicaciones robustas

LLMs están en todas partes ahora, especialmente ChatGPT. Hay un montón de aplicaciones siendo construidas sobre él y si no lo estás haciendo, deberías intentarlo.

Creado con Midjourney.

Construir aplicaciones sobre ChatGPT probablemente requerirá que realices varias llamadas paralelas. Desafortunadamente, no eres el único. Con tantas aplicaciones realizando millones de solicitudes al día (por cierto, felicitaciones a su equipo de ingeniería), a menudo la API devolverá un error de “demasiadas solicitudes”. Por lo tanto, necesitamos una buena manera de manejar tales errores al realizar varias llamadas paralelas.

En este pequeño tutorial de Python, cubriremos estos dos temas importantes para realizar llamadas eficientes a la API de ChatGPT:

  1. Realizar varias llamadas en paralelo
  2. Reintentar llamadas en caso de fallo

1. Realizar varias llamadas en paralelo

La forma más sencilla de realizar una llamada es hacerlo de forma sincrónica, es decir, enviar la solicitud y esperar a que llegue la respuesta para continuar con el programa. Podemos hacerlo de la siguiente manera:

import requestsheaders = {    "Content-Type": "application/json",    "Authorization": f"Bearer {OPENAI_API_KEY}"}response_json = requests.post("https://api.openai.com/v1/chat/completions", headers=headers, json={    "model": "gpt-3.5-turbo",    "messages": [{"role": "user", "content": "ping"}],    "temperature": 0}).json()print(response_json["choices"][0]["message"]["content"])

¡Pong!

Si estamos trabajando en un sistema simple, está bien, sin embargo, si deseamos realizar varias llamadas en paralelo a una API u otros recursos como una base de datos, podemos hacerlo de forma asíncrona para obtener respuestas más rápidas.

Ejecutar tareas de forma asíncrona desencadenará cada acción y esperará a que finalicen en paralelo, lo que reducirá el tiempo de espera.

Una forma básica de hacer esto es crear diferentes hilos para procesar cada solicitud, sin embargo, hay una mejor manera de hacerlo utilizando llamadas asíncronas.

Hacer una llamada asíncrona es a menudo más eficiente, ya que puedes especificar los lugares exactos donde tu aplicación debe esperar, mientras que en el enfoque tradicional de subprocesamiento, el sistema colocará automáticamente los hilos en espera, lo que puede ser subóptimo.

A continuación, presentamos un ejemplo que muestra la diferencia entre el uso de llamadas sincrónicas y asíncronas.

# Llamada sincrónicaimport timedef delay_print(msg):    print(msg, end=" ")    time.sleep(1)def sync_print():    for i in range(10):        delay_print(i)start_time = time.time()sync_print()print("\n", time.time() - start_time, "segundos.")

0 1 2 3 4 5 6 7 8 9  10.019574642181396 segundos.

# Llamada asíncronaimport asyncioasync def delay_print_async(msg):    print(msg, end=" ")    await asyncio.sleep(1)async def async_print():    asyncio.gather(*[delay_print_async(i) for i in range(10)])start_time = time.time()await async_print()print("\n", time.time() - start_time, "segundos.")

0.0002448558807373047 segundos.0 1 2 3 4 5 6 7 8 9 

El método asyncio.gather desencadenará todas las llamadas asíncronas que se le pasen y devolverá sus resultados una vez que estén listos.

Desafortunadamente, realizar llamadas asíncronas con la biblioteca requests no es posible. Para hacerlo, puedes usar la biblioteca aiohttp. A continuación, hay un ejemplo de cómo realizar una llamada asíncrona con aiohttp.

import aiohttpasync def get_completion(content):    async with aiohttp.ClientSession() as session:        async with session.post("https://api.openai.com/v1/chat/completions", headers=headers, json={            "model": "gpt-3.5-turbo",            "messages": [{"role": "user", "content": content}],            "temperature": 0        }) as resp:            response_json = await resp.json()            return response_json["choices"][0]['message']["content"]await get_completion("Ping")

¡Pong!

Como se mencionó anteriormente, para realizar solicitudes asíncronas necesitamos hacer uso del método asyncio.gather.

async def get_completion_list(content_list):    return await asyncio.gather(*[get_completion(content) for content in content_list])await get_completion_list(["ping", "pong"]*5)

['¡Pong!', '¡Ping!', '¡Pong!', '¡Ping!', '¡Pong!', '¡Ping!', '¡Pong!', '¡Ping!', '¡Pong!', '¡Ping!']

Aunque esto funciona, realizar llamadas de esta manera no es ideal ya que estamos recreando el objeto de sesión para cada llamada. Podemos ahorrar recursos y tiempo reutilizando el mismo objeto de sesión de esta manera:

async def get_completion(content, session):    async with session.post("https://api.openai.com/v1/chat/completions", headers=headers, json={        "model": "gpt-3.5-turbo",        "messages": [{"role": "user", "content": content}],        "temperature": 0    }) as resp:        response_json = await resp.json()        return response_json["choices"][0]['message']["content"]async def get_completion_list(content_list):    async with aiohttp.ClientSession() as session:        return await asyncio.gather(*[get_completion(content, session) for content in content_list])await get_completion_list(["ping", "pong"]*5)

¡Simple, ¿verdad? Con esto, puedes realizar fácilmente varias llamadas. Sin embargo, un problema es que a menudo no es una buena práctica realizar llamadas ilimitadas de esta manera, ya que puedes sobrecargar un sistema y ser penalizado, lo que te impide realizar solicitudes adicionales durante cierto tiempo (créeme, lo harás). Por lo tanto, es una buena idea limitar la cantidad de llamadas que puedes realizar al mismo tiempo. Puedes hacer esto fácilmente con la clase asyncio.Semaphore.

La clase Semaphore crea un administrador de contexto que manejará la cantidad de llamadas asíncronas que se están realizando actualmente dentro de su contexto. Si se alcanza el número máximo, se bloqueará hasta que algunas de las llamadas se terminen.

async def get_completion(content, session, semaphore):    async with semaphore:        await asyncio.sleep(1)        async with session.post("https://api.openai.com/v1/chat/completions", headers=headers, json={            "model": "gpt-3.5-turbo",            "messages": [{"role": "user", "content": content}],            "temperature": 0        }) as resp:            response_json = await resp.json()            return response_json["choices"][0]['message']["content"]async def get_completion_list(content_list, max_parallel_calls):    semaphore = asyncio.Semaphore(value=max_parallel_calls)    async with aiohttp.ClientSession() as session:        return await asyncio.gather(*[get_completion(content, session, semaphore) for content in content_list])start_time = time.perf_counter()completion_list = await get_completion_list(["ping", "pong"]*5, 100)print("Tiempo transcurrido: ", time.perf_counter() - start_time, "segundos.")print(completion_list)

Tiempo transcurrido:  1.8094507199984946 segundos.['¡Pong!', '¡Ping!', '¡Pong!', '¡Ping!', '¡Pong!', '¡Ping!', '¡Pong!', '¡Ping!', '¡Pong!', '¡Ping!']

Una cosa opcional aquí es informar cómo va el progreso de las llamadas. Puedes hacerlo creando una pequeña clase que llevará el registro del progreso y se compartirá en todas las llamadas. Puedes hacerlo de la siguiente manera:

class ProgressLog:    def __init__(self, total):        self.total = total        self.done = 0    def increment(self):        self.done = self.done + 1    def __repr__(self):        return f"Ejecuciones completadas {self.done}/{self.total}."async def get_completion(content, session, semaphore, progress_log):    async with semaphore:        await asyncio.sleep(1)        async with session.post("https://api.openai.com/v1/chat/completions", headers=headers, json={            "model": "gpt-3.5-turbo",            "messages": [{"role": "user", "content": content}],            "temperature": 0        }) as resp:            response_json = await resp.json()            progress_log.increment()            print(progress_log)            return response_json["choices"][0]['message']["content"]async def get_completion_list(content_list, max_parallel_calls):    semaphore = asyncio.Semaphore(value=max_parallel_calls)    progress_log = ProgressLog(len(content_list))    async with aiohttp.ClientSession() as session:        return await asyncio.gather(*[get_completion(content, session, semaphore, progress_log) for content in content_list])start_time = time.perf_counter()completion_list = await get_completion_list(["ping", "pong"]*5, 100)print("Tiempo transcurrido: ", time.perf_counter() - start_time, "segundos.")print(completion_list)

Ejecuciones completadas 1/10.Ejecuciones completadas 2/10.Ejecuciones completadas 3/10.Ejecuciones completadas 4/10.Ejecuciones completadas 5/10.Ejecuciones completadas 6/10.Ejecuciones completadas 7/10.Ejecuciones completadas 8/10.Ejecuciones completadas 9/10.Ejecuciones completadas 10/10.Tiempo transcurrido:  1.755018908999773 segundos.['¡Pong!', '¡Ping!', '¡Pong!', '¡Ping!', '¡Pong!', '¡Ping!', '¡Pong!', '¡Ping!', '¡Pong!', '¡Ping!']

Se completa esta sección sobre cómo realizar múltiples solicitudes asíncronas. Con esto, puedes realizar varias llamadas asíncronas, limitar el número de llamadas por tiempo e informar sobre el progreso. Sin embargo, todavía hay algunos problemas por manejar.

Las solicitudes realizadas pueden fallar por varias razones diferentes, como sobrecarga del servidor, interrupción de la conexión, solicitudes incorrectas, etc. Estos pueden generar excepciones o devolver respuestas impredecibles, por lo que debemos tratar estos casos y volver a intentar automáticamente las llamadas fallidas.

2. Reintentar llamadas en caso de fallo

Para manejar las llamadas fallidas, utilizaremos la biblioteca tenacity. Tenacity proporciona decoradores de funciones que volverán a intentar automáticamente nuestra llamada de función en caso de que genere una excepción.

from tenacity import (    retry,    stop_after_attempt,    wait_random_exponential,)

Para proporcionar una funcionalidad de reintentar a nuestras llamadas, necesitaremos colocar el decorador @retry. Usarlo sin parámetros adicionales hará que la función vuelva a intentar inmediatamente e indefinidamente una vez que falle. Esto no es bueno por varias razones.

Una de ellas es que nuestra llamada de función puede fallar debido a una sobrecarga del servidor, lo que hace razonable esperar algún tiempo antes de volver a intentarlo. Para indicar el tiempo de espera, utilizaremos el enfoque de espera exponencial utilizando el parámetro wait=wait_random_exponential(min=min_value, max=max_value). Esto aumentará el tiempo de espera cuanto más falle la función.

Una cosa opcional es registrar mensajes cada vez que se produzca un reintentar. Podemos hacerlo proporcionando alguna función al parámetro before_sleep. Aquí utilizaremos la función print, sin embargo, una mejor manera es utilizar el módulo logging y pasar una función logging.error o logging.debug a este parámetro.

Para demostrarlo, generaremos excepciones aleatorias.

import randomclass ProgressLog:    def __init__(self, total):        self.total = total        self.done = 0    def increment(self):        self.done = self.done + 1    def __repr__(self):        return f"Ejecuciones realizadas {self.done}/{self.total}."@retry(wait=wait_random_exponential(min=1, max=60), before_sleep=print)async def get_completion(content, session, semaphore, progress_log):    async with semaphore:        #await asyncio.sleep(1)        if random.random() < 0.2:            raise Exception("Excepción aleatoria")        async with session.post("https://api.openai.com/v1/chat/completions", headers=headers, json={            "model": "gpt-3.5-turbo",            "messages": [{"role": "user", "content": content}],            "temperature": 0        }) as resp:            response_json = await resp.json()            progress_log.increment()            print(progress_log)            return response_json["choices"][0]['message']["content"]async def get_completion_list(content_list, max_parallel_calls):    semaphore = asyncio.Semaphore(value=max_parallel_calls)    progress_log = ProgressLog(len(content_list))    async with aiohttp.ClientSession() as session:        return await asyncio.gather(*[get_completion(content, session, semaphore, progress_log) for content in content_list])start_time = time.perf_counter()completion_list = await get_completion_list(["ping", "pong"]*5, 100)print("Tiempo transcurrido: ", time.perf_counter() - start_time, "segundos.")print(completion_list)

<RetryCallState 133364377433616: intento #1; esperó 0.74; último resultado: fallido (Excepción Excepción aleatoria)><RetryCallState 133364377424496: intento #1; esperó 0.79; último resultado: fallido (Excepción Excepción aleatoria)>Ejecuciones realizadas 1/10.Ejecuciones realizadas 2/10.Ejecuciones realizadas 3/10.Ejecuciones realizadas 4/10.Ejecuciones realizadas 5/10.Ejecuciones realizadas 6/10.Ejecuciones realizadas 7/10.Ejecuciones realizadas 8/10.Ejecuciones realizadas 9/10.Ejecuciones realizadas 10/10.Tiempo transcurrido: 1.1305301820011664 segundos.['¡Pong!', '¡Ping!', '¡Pong!', '¡Ping!', '¡Pong!', '¡Ping!', '¡Pong!', '¡Ping!', '¡Pong!', '¡Ping!']

Esto hará que nuestra función espere algún tiempo antes de volver a intentarlo. Sin embargo, la razón de la falla puede ser sistemática debido a una caída del servidor o una carga útil incorrecta, por ejemplo. En este caso, queremos que nuestra cantidad de reintentos sea limitada. Podemos hacerlo con el parámetro stop=stop_after_attempt(n).

import random

class ProgressLog:
    def __init__(self, total):
        self.total = total
        self.done = 0
    
    def increment(self):
        self.done = self.done + 1
    
    def __repr__(self):
        return f"Ejecuciones completadas {self.done}/{self.total}."

@retry(wait=wait_random_exponential(min=1, max=60), stop=stop_after_attempt(2), before_sleep=print)
async def get_completion(content, session, semaphore, progress_log):
    async with semaphore:
        #await asyncio.sleep(1)
        if random.random() < 0.9:
            raise Exception("Excepción aleatoria")
        async with session.post("https://api.openai.com/v1/chat/completions", headers=headers, json={
            "model": "gpt-3.5-turbo",
            "messages": [{"role": "user", "content": content}],
            "temperature": 0
        }) as resp:
            response_json = await resp.json()
            progress_log.increment()
            print(progress_log)
            return response_json["choices"][0]['message']["content"]

async def get_completion_list(content_list, max_parallel_calls):
    semaphore = asyncio.Semaphore(value=max_parallel_calls)
    progress_log = ProgressLog(len(content_list))
    async with aiohttp.ClientSession() as session:
        return await asyncio.gather(*[get_completion(content, session, semaphore, progress_log) for content in content_list])

start_time = time.perf_counter()
completion_list = await get_completion_list(["ping", "pong"]*5, 100)
print("Tiempo transcurrido: ", time.perf_counter() - start_time, "segundos.")
print(completion_list)

<RetryCallState 133364608660048: intento #1; dormido durante 0.1; último resultado: fallido (Excepción Excepción aleatoria)>
<RetryCallState 133364377435680: intento #1; dormido durante 0.71; último resultado: fallido (Excepción Excepción aleatoria)>
<RetryCallState 133364377421472: intento #1; dormido durante 0.17; último resultado: fallido (Excepción Excepción aleatoria)>
<RetryCallState 133364377424256: intento #1; dormido durante 0.37; último resultado: fallido (Excepción Excepción aleatoria)>
<RetryCallState 133364377430928: intento #1; dormido durante 0.87; último resultado: fallido (Excepción Excepción aleatoria)>
<RetryCallState 133364377420752: intento #1; dormido durante 0.42; último resultado: fallido (Excepción Excepción aleatoria)>
<RetryCallState 133364377422576: intento #1; dormido durante 0.47; último resultado: fallido (Excepción Excepción aleatoria)>
<RetryCallState 133364377431312: intento #1; dormido durante 0.11; último resultado: fallido (Excepción Excepción aleatoria)>
<RetryCallState 133364377425840: intento #1; dormido durante 0.69; último resultado: fallido (Excepción Excepción aleatoria)>
<RetryCallState 133364377424592: intento #1; dormido durante 0.89; último resultado: fallido (Excepción Excepción aleatoria)>

---------------------------------------------------------------------------
Exception                                 Traceback (most recent call last)
/usr/local/lib/python3.10/dist-packages/tenacity/_asyncio.py in __call__(self, fn, *args, **kwargs)
     49                 try:
---> 50                     result = await fn(*args, **kwargs)
     51                 except BaseException:  # noqa: B9025 frames

Exception: Excepción aleatoria

The above exception was the direct cause of the following exception:

RetryError                                Traceback (most recent call last)
/usr/local/lib/python3.10/dist-packages/tenacity/__init__.py in iter(self, retry_state)
    324             if self.reraise:
    325                 raise retry_exc.reraise()
--> 326             raise retry_exc from fut.exception()
    327 
    328         if self.wait:

RetryError: RetryError[<Future at 0x794b5057a590 state=finished raised Exception>]

Con este parámetro configurado, un RetryError se generará una vez que la cantidad de intentos alcance el valor máximo. Sin embargo, puede ser el caso de que queramos continuar nuestra ejecución sin generar una excepción, simplemente guardando un valor None en el retorno de la llamada para manejarlo posteriormente. Para hacerlo, podemos usar la función de devolución de llamada retry_error_callback para devolver el valor None en caso de que ocurra un error de RetryError:

import randomclass ProgressLog:    def __init__(self, total):        self.total = total        self.done = 0    def increment(self):        self.done = self.done + 1    def __repr__(self):        return f"Ejecuciones completadas {self.done}/{self.total}."@retry(wait=wait_random_exponential(min=1, max=60), stop=stop_after_attempt(2), before_sleep=print, retry_error_callback=lambda _: None)async def get_completion(content, session, semaphore, progress_log):    async with semaphore:        #await asyncio.sleep(1)        if random.random() < 0.7:            raise Exception("Excepción aleatoria")        async with session.post("https://api.openai.com/v1/chat/completions", headers=headers, json={            "model": "gpt-3.5-turbo",            "messages": [{"role": "user", "content": content}],            "temperature": 0        }) as resp:            response_json = await resp.json()            progress_log.increment()            print(progress_log)            return response_json["choices"][0]['message']["content"]async def get_completion_list(content_list, max_parallel_calls):    semaphore = asyncio.Semaphore(value=max_parallel_calls)    progress_log = ProgressLog(len(content_list))    async with aiohttp.ClientSession(timeout=aiohttp.ClientTimeout(1)) as session:        return await asyncio.gather(*[get_completion(content, session, semaphore, progress_log) for content in content_list])start_time = time.perf_counter()completion_list = await get_completion_list(["ping", "pong"]*5, 100)print("Tiempo transcurrido: ", time.perf_counter() - start_time, "segundos.")print(completion_list)

<RetryCallState 133364377805024: intento #1; esperó 0.22; último resultado: fallido (Excepción Excepción aleatoria)><RetryCallState 133364377799456: intento #1; esperó 0.53; último resultado: fallido (Excepción Excepción aleatoria)><RetryCallState 133364377801328: intento #1; esperó 0.24; último resultado: fallido (Excepción Excepción aleatoria)><RetryCallState 133364377810208: intento #1; esperó 0.38; último resultado: fallido (Excepción Excepción aleatoria)><RetryCallState 133364377801616: intento #1; esperó 0.54; último resultado: fallido (Excepción Excepción aleatoria)><RetryCallState 133364377422096: intento #1; esperó 0.59; último resultado: fallido (Excepción Excepción aleatoria)><RetryCallState 133364377430592: intento #1; esperó 0.07; último resultado: fallido (Excepción Excepción aleatoria)><RetryCallState 133364377425648: intento #1; esperó 0.05; último resultado: fallido (Excepción Excepción aleatoria)>Ejecuciones completadas 1/10.Ejecuciones completadas 2/10.Ejecuciones completadas 3/10.Tiempo transcurrido:  2.6409040250000544 segundos.['¡Pong!', '¡Ping!', None, None, None, None, None, '¡Ping!', None, None]

Con esto, se devolverán valores de None en lugar de generar errores.

Un problema que aún no se ha resuelto es el problema de la conexión bloqueada. Esto ocurre cuando realizamos una solicitud y, por alguna razón, el host mantiene la conexión pero no falla ni devuelve algo. Para manejar estos casos, debemos establecer un tiempo límite para devolver en caso de que la llamada no devuelva un valor dentro de un período determinado. Para hacerlo, podemos usar el parámetro timeout de la biblioteca aiohttp junto con la clase aiohttp.ClientTimeout. En caso de que ocurra un tiempo de espera aquí, se lanzará un TimeoutError, que luego será manejado por el decorador retry de tenacity y ejecutará automáticamente la función nuevamente.

class ProgressLog:    def __init__(self, total):        self.total = total        self.done = 0    def increment(self):        self.done = self.done + 1    def __repr__(self):        return f"Ejecuciones completadas {self.done}/{self.total}."@retry(wait=wait_random_exponential(min=1, max=60), stop=stop_after_attempt(20), before_sleep=print, retry_error_callback=lambda _: None)async def get_completion(content, session, semaphore, progress_log):    async with semaphore:        async with session.post("https://api.openai.com/v1/chat/completions", headers=headers, json={            "model": "gpt-3.5-turbo",            "messages": [{"role": "user", "content": content}],            "temperature": 0        }) as resp:            response_json = await resp.json()            progress_log.increment()            print(progress_log)            return response_json["choices"][0]['message']["content"]async def get_completion_list(content_list, max_parallel_calls):    semaphore = asyncio.Semaphore(value=max_parallel_calls)    progress_log = ProgressLog(len(content_list))    async with aiohttp.ClientSession(timeout=aiohttp.ClientTimeout(10)) as session:        return await asyncio.gather(*[get_completion(content, session, semaphore, progress_log) for content in content_list])start_time = time.perf_counter()completion_list = await get_completion_list(["ping", "pong"]*100, 100)print("Tiempo transcurrido: ", time.perf_counter() - start_time, "segundos.")

<RetryCallState 133364375201936: intento #1; esperó 0.57; último resultado: fallido (TimeoutError)>Tiempo transcurrido:  12.705538211999738 segundos.

¡Genial! Ahora tenemos una forma robusta de ejecutar múltiples solicitudes paralelas que automáticamente volverán a intentar en caso de fallos y devolverán valores None en caso de que el fallo sea sistemático. Entonces, el código final será el siguiente:

import asyncio
import aiohttp
from tenacity import (
    retry,
    stop_after_attempt,
    wait_random_exponential,
)

headers = {
    "Content-Type": "application/json",
    "Authorization": f"Bearer {OPENAI_API_KEY}"
}

class ProgressLog:
    def __init__(self, total):
        self.total = total
        self.done = 0
    
    def increment(self):
        self.done = self.done + 1
    
    def __repr__(self):
        return f"Ejecuciones realizadas {self.done}/{self.total}."

@retry(wait=wait_random_exponential(min=1, max=60), stop=stop_after_attempt(20), before_sleep=print, retry_error_callback=lambda _: None)
async def get_completion(content, session, semaphore, progress_log):
    async with semaphore:
        async with session.post("https://api.openai.com/v1/chat/completions", headers=headers, json={
            "model": "gpt-3.5-turbo",
            "messages": [{"role": "user", "content": content}],
            "temperature": 0
        }) as resp:
            response_json = await resp.json()
            progress_log.increment()
            print(progress_log)
            return response_json["choices"][0]['message']["content"]

async def get_completion_list(content_list, max_parallel_calls, timeout):
    semaphore = asyncio.Semaphore(value=max_parallel_calls)
    progress_log = ProgressLog(len(content_list))
    async with aiohttp.ClientSession(timeout=aiohttp.ClientTimeout(timeout)) as session:
        return await asyncio.gather(*[get_completion(content, session, semaphore, progress_log) for content in content_list])

En resumen, hemos implementado las siguientes características:

  1. Llamadas asíncronas para reducir el tiempo de espera.
  2. Registro del progreso de las llamadas asíncronas.
  3. Reintentos automáticos cuando una llamada falla.
  4. Devuelve valores None en caso de fallos sistemáticos.
  5. Reintenta una llamada cuando se agota el tiempo y no devuelve nada.

Si tienes alguna pregunta, encontraste algún error o tienes alguna idea de cómo mejorar esto, ¡deja un comentario a continuación!