05 Streaming

anthropic_api_fundamentalsanthropic-courses

Streaming

Learning goals

  • Understand how streaming works
  • Work with stream events

Let's start by importing the anthropic SDK and setting up our client:

[1]

So far, we've sent messages to Claude using the following syntax:

[3]
We have a response back!
========================
Here is an essay about macaws and clay licks in the Amazon:

Macaws and Clay Licks in the Amazon

Deep within the lush, verdant rainforests of the Amazon basin, a remarkable natural phenomenon takes place. Flashes of vibrant color dart through the canopy, as large, magnificent parrots known as macaws congregate at special sites called clay licks. These clay licks, or "collpas" as they are known locally, are essential to the survival and well-being of macaws and other Amazonian wildlife.

Macaws are some of the most striking and iconic birds of the Amazon. With their vividly-hued plumage, hooked beaks, and long, tapered tails, these large parrots are a sight to behold as they soar through the treetops. The most well-known species include the scarlet macaw, the blue-and-gold macaw, and the green-winged macaw, each adorned in a stunning array of reds, blues, greens, and golds. 

These magnificent birds play a crucial role in the Amazon ecosystem, acting as important seed dispersers as they feed on a variety of fruits and nuts. However, macaws face a unique dietary challenge - they require essential minerals and nutrients that are often lacking in the fruits and seeds that make up their primary diet. This is where the clay licks come into play.

Clay licks are exposed deposits of mineral-rich clay that attract hundreds, sometimes thousands, of macaws and other parrots, as well as other Amazonian wildlife such as tapirs, peccaries, and monkeys. The clay contains high concentrations of sodium, calcium, and other vital minerals that help macaws and other animals to regulate their digestive systems, neutralize toxins, and maintain overall health.

Visiting a clay lick is a truly awe-inspiring experience. As the first rays of dawn break through the forest canopy, the air fills with the raucous squawks and screeches of macaws as they begin to gather at the lick. Slowly, the birds will descend from the treetops, cautiously approaching the clay bank and taking turns nibbling at the mineral-rich soil. The sheer number of these vibrant birds, their colors contrasting against the earthy tones of the clay, creates a mesmerizing natural spectacle.

The importance of clay licks to the survival of macaws and other Amazonian wildlife cannot be overstated. These unique geological formations are essential feeding grounds that help sustain the delicate balance of the rainforest ecosystem. As deforestation and habitat loss continue to threaten the Amazon, the protection and preservation of these clay licks has become increasingly crucial to the long-term conservation of macaws and the broader biodiversity of this remarkable region.

This works fine, but it's important to remember that with this approach, we only get content back from the API once all of the content has been generated. Try running the above cell again, and you'll see that nothing is printed out until the entire response is printed all at once. This is fine in many situations, but it can lead to bad user experiences if you're building an application that forces a user to wait around for an entire response to be generated.

Enter streaming!

Streaming enables us to write applications that receive content as it is generated by the model, rather than having to wait for the entire response to be generated. This is how apps like claude.ai work. As the model generates responses, that content is streamed to a user's browser and displayed:

claude_streaming.gif

Working with streams

To get a streaming response from the API, it's as simple as passing stream=True to client.messages.create. That's the easy part. Where things get a little trickier is in how we subsequently work with the streaming response and handle incoming data.

[4]

Let's take a look at our stream variable

[5]
<anthropic.Stream at 0x111e5ec10>

There's not much to look at! This stream object on its own won't do a whole lot for us. The stream object is a generator object that yields individual server-sent events (SSE) as they are received from the API. We need to write code to iterate over it and work with each individual server-sent event. Remember, our data is no longer coming in one finalized chunk of data. Let's try iterating over the stream response:

[6]
MessageStartEvent(message=Message(id='msg_01EZHjA6qmf6y8VWMZSeBmN5', content=[], model='claude-3-haiku-20240307', role='assistant', stop_reason=None, stop_sequence=None, type='message', usage=Usage(input_tokens=30, output_tokens=2)), type='message_start')
ContentBlockStartEvent(content_block=ContentBlock(text='', type='text'), index=0, type='content_block_start')
ContentBlockDeltaEvent(delta=TextDelta(text='Cats', type='text_delta'), index=0, type='content_block_delta')
ContentBlockDeltaEvent(delta=TextDelta(text=' me', type='text_delta'), index=0, type='content_block_delta')
ContentBlockDeltaEvent(delta=TextDelta(text='ow lou', type='text_delta'), index=0, type='content_block_delta')
ContentBlockDeltaEvent(delta=TextDelta(text='dly.', type='text_delta'), index=0, type='content_block_delta')
ContentBlockStopEvent(index=0, type='content_block_stop')
MessageDeltaEvent(delta=Delta(stop_reason='end_turn', stop_sequence=None), type='message_delta', usage=MessageDeltaUsage(output_tokens=10))
MessageStopEvent(type='message_stop')

As you can see, we ended up receiving many server-sent events from the API. Let's take a closer look at what these events mean. This is a color-coded explanation of the events:

streaming_output.png

Each stream contains a series of events in the following order:

  • MessageStartEvent - A message with empty content
  • Series of content blocks - Each of which contains:
    • A ContentBlockStartEvent
    • One or more ContentBlockDeltaEvents
    • A ContentBlockStopEvent
  • One or more MessageDeltaEvents which indicate top-level changes to the final message
  • A final MessageStopEvent

In the above response, there was only a single content block. This diagram shows all the events associated with it:

content_block_streaming.png

All of the actual model-generated content we care about comes from the ContentBlockDeltaEvents, which each contain a type set to "content_block_delta." To actually get the content itself, we want to access the text property inside of delta. Let's try exclusively printing out the text that was generated:

[7]
Cats
 me
ow lou
dly.

We are successfully printing out the content, though the way it's formatted is a little tricky to read. When printing out the streamed text using Python's print() function, it's useful to pass two additional arguments:

  • end="": By default, the print() function adds a newline character (\n) at the end of the printed text. However, by setting end="", we specify that the printed text should not be followed by a newline character. This means that the next print() statement will continue printing on the same line.
  • flush=True: The flush parameter is set to True to force the output to be immediately written to the console or standard output, without waiting for a newline character or the buffer to be filled. This ensures that the text is displayed in real-time as it is received from the streaming response.

Let's try making these changes:

[8]
Cats meow loudly.

With such a short piece of text, the streaming functionality may not be obvious. Let's try asking the model to generate something longer:

[9]
Large language models like myself work by using deep learning neural networks that are trained on massive amounts of text data. The key aspects are:

1. Neural network architecture - We use large, multi-layer neural networks with many parameters that can learn complex patterns in language.

2. Training data - We are trained on huge corpora of text from the internet, books, articles, and other sources. This allows us to learn the statistical patterns and structures of language.

3. Self-supervised learning - During training, the model learns to predict the next word in a sequence of text, without any explicit labels. This allows it to learn general language understanding.

4. Transfer learning - The knowledge gained during this pre-training can then be fine-tuned for specific tasks like question answering, summarization, translation, etc.

5. Attention mechanisms - Advanced models like transformers use attention to dynamically focus on the most relevant parts of the input when generating output.

The end result is a model that can understand and generate human-like text by leveraging the patterns and structures it has learned from its training data. Of course, the details get quite technical, but that's the high-level overview of how large language models work. Let me know if you have any other questions!

Try running the above cell if you haven't already. You should see text content printed out incrementally as it comes in!

As we've seen, the ContentBlockDeltaEvents contain the text content generated by the model. There are many other events, we need to care about those? Yes! Here's one quick example:

If we want to access information about our token usage, we'll need to look in two places:

  • MessageStartEvent contains our input(prompt) token usage information
  • MessageDeltaEvent contains information on how many output tokens were generated

streaming_tokens.png

Let's update our code from above to print out how many tokens are used in our prompt and how many tokens the model generated:

[42]
MESSAGE START EVENT
Input tokens used: 14
========================
Large language models like myself work by using deep learning neural networks that are trained on massive amounts of text data. The key aspects are:

1. Neural network architecture - We use large, multi-layer neural networks with many parameters that can learn complex patterns in language.

2. Training data - We are trained on huge corpora of text from the internet, books, articles, and other sources. This allows us to learn the statistical patterns and structures of language.

3. Self-supervised learning - During training, the model learns to predict the next word in a sequence of text, without any explicit labels. This allows it to learn general language understanding.

4. Transfer learning - The knowledge gained during this pre-training can then be fine-tuned for specific tasks like question answering, summarization, translation, etc.

5. Attention mechanisms - Advanced models like transformers use attention to dynamically focus on the most relevant parts of the input when generating output.

The end result is a model that can understand and generate human-like text, drawing upon its broad knowledge of language and the world. But the inner workings are highly complex and not fully understood. There's still a lot to learn about how these large language models work.
========================
MESSAGE DELTA EVENT
Output tokens used: 258

Other streaming event types

When working with streams, you may encounter some other event types including:

  • Ping events - streams may also include any number of ping events.
  • Error events - you may occasionally see error events in the event stream. For example, during periods of high usage, you may receive an overloaded_error, which would normally correspond to an HTTP 529 in a non-streaming context.

Here's an example error event:

event: error
data: {"type": "error", "error": {"type": "overloaded_error", "message": "Overloaded"}}

Time to first token (TTFT)

The major reason to use streaming is to improve your time to first token: that amount of time it takes for you or your user to receive the first bit of model generated content. Let's try and demonstrate the impact that streaming can have on TTFT.

We'll start with a non-streaming approach. We'll ask the model to generate a very long piece of text but cut it off at 500 tokens:

[4]
[50]
Time to receive first token: 4.194 seconds
Time to recieve complete response: 4.194 seconds
Total tokens generated: 500
Here is a long essay explaining the history of the American Revolution:

The American Revolution was a pivotal event in the history of the United States, marking the country's transition from a collection of British colonies to an independent nation. The roots of the revolution can be traced back to the French and Indian War, which was fought between Britain and France from 1754 to 1763. This conflict, which was part of a larger global war, resulted in the British gaining control of much of North America, including the French colonies. However, the war also left Britain with a significant debt, which it sought to recoup by imposing a series of taxes and regulations on its American colonies.

One of the first major events that led to the American Revolution was the Stamp Act, which was passed by the British Parliament in 1765. This act required all printed materials in the colonies, including newspapers, pamphlets, bills, legal documents, licenses, almanacs, dice, and playing cards, to carry an embossed revenue stamp. The colonists were outraged by this tax, which they saw as a violation of their rights as British subjects. They argued that they were not represented in the British Parliament and therefore should not be subject to taxation without their consent.

In response to the Stamp Act, the colonists organized a series of protests and boycotts, which eventually led to the repeal of the act in 1766. However, this was just the beginning of a series of increasingly contentious conflicts between the colonies and the British government. In 1767, the Townshend Acts were passed, which imposed new taxes on a variety of goods imported to the colonies, including glass, paint, lead, paper, and tea.

The colonists responded with further protests and boycotts, and in 1770, a group of protesters in Boston were fired upon by British soldiers, resulting in the deaths of five civilians in an event known as the Boston Massacre. This incident further inflamed tensions between the colonies and the British government, and in 1773, the British East India Company was granted a monopoly on the tea trade in the colonies. In response, a group of colonists in Boston, disguised as Native Americans, boarded a British ship and dumped hundreds of chests of tea into the harbor, an event known as the Boston Tea Party.

The British government responded to the Boston Tea Party

Now let's try the same thing using a streaming approach:

[57]
[58]
Here is a long essay explaining the history of the American Revolution:

The American Revolution was a pivotal event in the history of the United States, marking the country's transition from a collection of British colonies to an independent nation. The roots of the revolution can be traced back to the French and Indian War, which was fought between Britain and France from 1754 to 1763. This conflict, which was part of a larger global war, resulted in the British gaining control of much of North America, including the French colonies. However, the war also left Britain with a significant debt, which it sought to recoup by imposing a series of taxes and regulations on its American colonies.

One of the first major events that led to the American Revolution was the Stamp Act, which was passed by the British Parliament in 1765. This act required all printed materials in the colonies, including newspapers, pamphlets, bills, legal documents, licenses, almanacs, dice, and playing cards, to carry an embossed revenue stamp. The colonists were outraged by this tax, which they saw as a violation of their rights as British subjects. They argued that they were not represented in the British Parliament and therefore should not be subject to taxation without their consent.

In response to the Stamp Act, the colonists organized a series of protests and boycotts, which eventually led to the repeal of the act in 1766. However, this was just the beginning of a series of increasingly contentious conflicts between the colonies and the British government. In 1767, the Townshend Acts were passed, which imposed new taxes on a variety of goods imported to the colonies, including glass, paint, lead, paper, and tea.

The colonists responded with further protests and boycotts, and in 1770, a group of protesters in Boston were fired upon by British soldiers, resulting in the deaths of five civilians in an event known as the Boston Massacre. This incident further inflamed tensions between the colonies and the British government, and in 1773, the British East India Company was granted a monopoly on the tea trade in the colonies. In response, a group of colonists in Boston, disguised as Native Americans, boarded a British ship and dumped hundreds of chests of tea into the harbor, an event known as the Boston Tea Party.

The British government responded to the Boston Tea Party
Time to receive first token: 0.492 seconds
Time to recieve complete response: 4.274 seconds
Total tokens generated: 500

Let's compare the results.

  • Without Streaming
    • Time to receive first token: 4.194 seconds
    • Time to recieve complete response: 4.194 seconds
    • Total tokens generated: 500
  • With Streaming
    • Time to receive first token: 0.492 seconds
    • Time to recieve complete response: 4.274 seconds
    • Total tokens generated: 500

As you can see, there's a huge difference in TTFT! This demo is only generating 500 tokens, and it's using Haiku which is our fastest model. If we try an example that generates 1000 tokens using Opus, the numbers will be drastically different!

[2]
[5]
OPUS STREAMING
Time to first token: 1.8863098621368408
Tokens generated: 997
#########################################################
OPUS NON STREAMING
Time to first token: 47.03177309036255
Tokens generated: 998

When we use Opus to generate a longer piece of text, the impact of streaming on TTFT is even more obvious. With a non-streaming approach, it took 47 seconds to get our first token. With streaming, it only took 1.8 seconds to get our first token!

Note: Remember that streaming does not magically improve the overall time the moodel takes to generate a response. We get initial data much faster, but it still takes the same amount of time between the start of our request and receiving the final generated token

Streaming helpers

The Python SDK provides several conveniences for streaming messages. Instead of using client.messages.create with stream=True, we can instead use client.messages.stream which gives us access to useful helper methods. client.messages.stream() returns a MessageStreamManager, which is a context manager that yields a MessageStream which is iterable, emits events, and accumulates messages.

The code below uses client.messages.stream which allows us to use helpers like stream.text_stream to easily access generated text content as it streams in, without having to manually check the stream event type. stream.text_stream provides an iterator over just the text deltas in the stream.

There are other useful helper methods like get_final_message which returns a final accumulated message once the stream has been read to completion. This can be useful if you both want to use streaming but also need access to the entire finished text generation when it's complete. You can of course write some code to build up your own accumulated message, but this helper method makes it easy.

The following example prints out each incoming piece of text as it is received, and also prints out the final completed message when the stream is complete:

[79]
In gardens fair, where beauty reigns supreme,
The orchid stands, a queen among the blooms,
Her delicate petals, like a lovely dream,
Adorned in nature's most exquisite plumes.

With colors ranging from pure white to bold,
And patterns intricate, a work of art,
Each blossom tells a story, bright and old,
Of evolution's path, a world apart.

From rainforests dense to mountain peaks so high,
The orchid thrives, a testament to grace,
Her beauty captivates the wandering eye,
And in our hearts, she finds a cherished place.

Oh, orchid fair, your splendor knows no bounds,
Forever in our gardens and hearts you'll be found.

STREAMING IS DONE.  HERE IS THE FINAL ACCUMULATED MESSAGE: 
{
  "id": "msg_018x1nZcs3sfq15zKaS4z4gD",
  "content": [
    {
      "text": "In gardens fair, where beauty reigns supreme,\nThe orchid stands, a queen among the blooms,\nHer delicate petals, like a lovely dream,\nAdorned in nature's most exquisite plumes.\n\nWith colors ranging from pure white to bold,\nAnd patterns intricate, a work of art,\nEach blossom tells a story, bright and old,\nOf evolution's path, a world apart.\n\nFrom rainforests dense to mountain peaks so high,\nThe orchid thrives, a testament to grace,\nHer beauty captivates the wandering eye,\nAnd in our hearts, she finds a cherished place.\n\nOh, orchid fair, your splendor knows no bounds,\nForever in our gardens and hearts you'll be found.",
      "type": "text"
    }
  ],
  "model": "claude-3-opus-20240229",
  "role": "assistant",
  "stop_reason": "end_turn",
  "stop_sequence": null,
  "type": "message",
  "usage": {
    "input_tokens": 14,
    "output_tokens": 171
  }
}

When using client.messages.stream(), we can also define custom event handlers that run when any stream event occurs, or only when text is generated, etc.

The example below uses two of these custom event handlers. We use client.messages.stream() and ask the model to "generate a 5-word poem". We define our own MyStream class which has two event handlers defined:

  • on_text - The event is fired when a text ContentBlock object is being accumulated. The first argument is the text delta and the second is the current accumulated text. In the example below, we're using this event handler to print out any generated text as it streams in. The text is printed out in green to make it easier to visualize.
  • on_stream_event - The event is fired when any event is received from the API. In the example below, we're printing the event type anytime an event is received.

We then pass an event_handler argument to client.messages.stream to register callback methods that are fired when certain events happen:

[94]
on_event fired: message_start
on_event fired: content_block_start
on_event fired: content_block_delta
Whis
on_event fired: content_block_delta
pers
on_event fired: content_block_delta
 dance
on_event fired: content_block_delta
,
on_event fired: content_block_delta
 secrets
on_event fired: content_block_delta
 unf
on_event fired: content_block_delta
ol
on_event fired: content_block_delta
d,
on_event fired: content_block_delta
 love
on_event fired: content_block_delta
.
on_event fired: content_block_stop
on_event fired: message_delta
on_event fired: message_stop
accumulated final message:  {
  "id": "msg_014G44rr3M14DzadHXPn9Xaj",
  "content": [
    {
      "text": "Whispers dance, secrets unfold, love.",
      "type": "text"
    }
  ],
  "model": "claude-3-opus-20240229",
  "role": "assistant",
  "stop_reason": "end_turn",
  "stop_sequence": null,
  "type": "message",
  "usage": {
    "input_tokens": 14,
    "output_tokens": 14
  }
}

The Python SDK gives us access to a handful of other event handlers we can utilize including:

on_message(message: Message)

The event is fired when a full Message object has been accumulated. This corresponds to the message_stop SSE.

on_content_block(content_block: ContentBlock)

The event is fired when a full ContentBlock object has been accumulated. This corresponds to the content_block_stop SSE.

on_exception(exception: Exception)

The event is fired when an exception is encountered while streaming the response.

on_timeout()

The event is fired when the request times out.

on_end()

The last event fired in the stream.


Exercise

Write a simple Claude chatbot that uses streaming. The following gif illustrates how it should work. Please note that the color-coding of the output is completely optional and mostly helps to make the gif readable/watchable: