Daniel Liden

Blog / About Me / Photos / LLM Fine Tuning / Notes /

Asynchronous Instructor

Asynchronous usage pattern for Instructor

In this example, I was extracting a list of topics from a list of 1000 JSON objects loaded as a list of Python dicts.

I used the Anthropic Haiku model.

Using the instructor library with the asynchronous Anthropic client makes it much faster to make a large number of calls to the Anthropic API fairly quickly.

(is there a batch option instead, and does it work with instructor?)

Setup

First, we set up the Anthropic async client with Instructor. We also set up the Pydantic schema we'll be using for structured extraction.

import instructor
from anthropic import Anthropic, AsyncAnthropic
from pydantic import BaseModel
from dotenv import load_dotenv
from pydantic import BaseModel, Field


# I had ANTHROPIC_API_KEY in a .env file
load_dotenv()

# set up the async client
aclient = instructor.from_anthropic(AsyncAnthropic())

# set up pydantic schema
class Topics(BaseModel):
    candidates: list = Field(
        description=(
            "List of topics that might be considered among the main topics of the talk."
            " Should include (1) high level categories like 'data science' or 'ai'; (2)"
            " key technologies/tools like 'spark' or 'LLMs', and (3) subtopics like"
            " 'structured streaaming' or 'retrieval-augmented generation'"
        )
    )
    topics: list = Field(
        description="List of the top main topics of the talk, from the list of candidates. Select at least one but no more than three main topics."
    )

Iterate throught the objects

import asyncio
import time
from typing import List
import anthropic
from tqdm import tqdm

# Create a semaphore with a limit of 10 concurrent tasks
sem = asyncio.Semaphore(10)

topics_list: List[str] = []
new_event_data = event_data.copy()

async def process_event(event: dict, sem: asyncio.Semaphore, progress_bar: tqdm) -> None:
    retries = 3
    delay = 1  # Initial delay in seconds

    while retries > 0:
        try:
            # Acquire the semaphore to limit concurrency
            async with sem:
                # Make an asynchronous API call to create a message
                resp = await aclient.messages.create(
                    model="claude-3-haiku-20240307",
                    max_tokens=1024,
                    messages=[
                        {
                            "role": "user",
                            "content": event['abstract'],
                        }
                    ],
                    response_model=Topics,
                )

                # Store the topics in the event dictionary
                event['topics'] = resp.topics
                topics_list.extend(resp.topics)
                progress_bar.update(1)
                return
        except Exception as e:
            if isinstance(e, anthropic._exceptions.RateLimitError):
                # Handle rate limit error
                retries -= 1
                if retries > 0:
                    print(f"Rate limit reached, retrying in {delay} seconds...")
                    await asyncio.sleep(delay)
                    delay *= 2  # Exponential backoff
                else:
                    print("Maximum retries reached, skipping event.")
            else:
                # Handle other errors
                print(f"Error processing event: {e}")
                retries -= 1
                if retries > 0:
                    await asyncio.sleep(delay)
                    delay *= 2
                else:
                    print("Maximum retries reached, skipping event.")

async def run_in_notebook() -> None:
    tasks = []
    with tqdm(total=len(new_event_data), unit="event") as progress_bar:
        for event in new_event_data:
            # Create a task for each event and add it to the tasks list
            tasks.append(process_event(event, sem, progress_bar))

        # Wait for all tasks to complete concurrently
        await asyncio.gather(*tasks)

# Enable asyncio support in Jupyter Notebook
import nest_asyncio
nest_asyncio.apply()

# Run the async function in the notebook
asyncio.get_event_loop().run_until_complete(run_in_notebook())

In this code, we:

  • define a semaphor, which limits the number of concurrent tasks/concurrent API calls. This is important as the per-minute (and, unfortunately, the per-day) rate limits for Haiku are quite low.
  • Rate limit errors result in a retry with backoff. Other exceptionls also result in retries.
  • await asyncio.gather(*tasks) waits until the tasks are all finished.
  • The nest_asyncio library is used to run the async methods in a notebook.

Related Notes

Date: 2024-04-18 Thu 00:00

Emacs 29.3 (Org mode 9.6.15)