Overview
stream_object
provides real-time streaming of structured output with automatic validation. It combines the benefits of streaming (low latency) with structured output (type safety) by providing both text chunks and partially-parsed objects as they become available.
Basic usage
Kick off the stream
from ai_sdk import stream_object, openai
from pydantic import BaseModel
class Weather(BaseModel):
location: str
temperature_c: float
result = stream_object(
model=openai("gpt-4.1-mini"),
schema=Weather,
prompt="Report the current temperature in Berlin as JSON.",
)
Consume deltas + partial objects
async for delta in result.object_stream:
print(delta, end="")
Pass on_partial=lambda obj: print("partial", obj)
to receive partially-parsed objects while streaming.
Parameters
Same as stream_text
plus:
Name | Type | Required | Description |
---|
schema | Type[BaseModel] | ✓ | Pydantic model defining the desired output shape. |
on_partial | Callable[[BaseModel], None] | – | Callback executed when a partial object is successfully parsed. |
Return value
stream_object
returns a StreamObjectResult
with:
object_stream
: Async iterator yielding text chunks
object()
: Async method to get the complete parsed object
text()
: Async method to get the complete text
usage
: Token usage statistics
finish_reason
: Why the stream ended
tool_calls
: Tool calls if any were made
Examples
Basic streaming with objects
import asyncio
from ai_sdk import stream_object, openai
from pydantic import BaseModel
class Story(BaseModel):
title: str
characters: List[str]
plot: str
async def main():
model = openai("gpt-4.1-mini")
result = stream_object(
model=model,
schema=Story,
prompt="Write a short story about a robot learning to paint"
)
# Stream the text chunks
async for chunk in result.object_stream:
print(chunk, end="", flush=True)
# Get the final parsed object
final_object = await result.object()
print(f"\n\nFinal object: {final_object}")
asyncio.run(main())
With partial object callbacks
import asyncio
from ai_sdk import stream_object, openai
from pydantic import BaseModel
class Recipe(BaseModel):
title: str
ingredients: List[str]
instructions: List[str]
async def main():
model = openai("gpt-4.1-mini")
def on_partial(obj):
print(f"Partial object: {obj}")
result = stream_object(
model=model,
schema=Recipe,
prompt="Create a recipe for chocolate chip cookies",
on_partial=on_partial
)
async for chunk in result.object_stream:
print(chunk, end="", flush=True)
final_recipe = await result.object()
print(f"\n\nComplete recipe: {final_recipe}")
asyncio.run(main())
With system instructions
import asyncio
from ai_sdk import stream_object, openai
from pydantic import BaseModel
class Product(BaseModel):
name: str
price: float
description: str
category: str
async def main():
model = openai("gpt-4.1-mini")
result = stream_object(
model=model,
schema=Product,
system="You are a helpful product catalog assistant. Always provide accurate product information.",
prompt="Create a product description for a wireless headphones"
)
async for chunk in result.object_stream:
print(chunk, end="", flush=True)
product = await result.object()
print(f"\n\nProduct: {product}")
asyncio.run(main())
With custom parameters
import asyncio
from ai_sdk import stream_object, openai
from pydantic import BaseModel
class Poem(BaseModel):
title: str
verses: List[str]
theme: str
async def main():
model = openai("gpt-4.1-mini")
result = stream_object(
model=model,
schema=Poem,
prompt="Write a poem about the ocean",
temperature=0.8,
max_tokens=300
)
async for chunk in result.object_stream:
print(chunk, end="", flush=True)
poem = await result.object()
print(f"\n\nPoem: {poem}")
asyncio.run(main())
With complex nested schemas
import asyncio
from ai_sdk import stream_object, openai
from pydantic import BaseModel
from typing import List, Optional
class Address(BaseModel):
street: str
city: str
country: str
class Contact(BaseModel):
email: str
phone: Optional[str] = None
class Person(BaseModel):
name: str
age: int
addresses: List[Address]
contact: Contact
async def main():
model = openai("gpt-4.1-mini")
result = stream_object(
model=model,
schema=Person,
prompt="Create a person profile with multiple addresses"
)
async for chunk in result.object_stream:
print(chunk, end="", flush=True)
person = await result.object()
print(f"\n\nPerson: {person}")
asyncio.run(main())
Error handling
stream_object
handles validation errors gracefully:
import asyncio
from ai_sdk import stream_object, openai
from pydantic import BaseModel, ValidationError
class User(BaseModel):
name: str
age: int
async def main():
model = openai("gpt-4.1-mini")
try:
result = stream_object(
model=model,
schema=User,
prompt="Create a user with invalid data"
)
async for chunk in result.object_stream:
print(chunk, end="", flush=True)
user = await result.object()
print(f"\n\nUser: {user}")
except ValidationError as e:
print(f"Schema validation failed: {e}")
asyncio.run(main())
See the dedicated Tool page for a complete walkthrough.
import asyncio
from ai_sdk import tool, stream_object, openai
from pydantic import BaseModel
class Calculation(BaseModel):
result: float
operation: str
steps: List[str]
add = tool(
name="add",
description="Add two numbers.",
parameters={
"type": "object",
"properties": {"a": {"type": "number"}, "b": {"type": "number"}},
"required": ["a", "b"],
},
execute=lambda a, b: a + b,
)
async def main():
model = openai("gpt-4.1-mini")
result = stream_object(
model=model,
schema=Calculation,
prompt="Calculate 15 + 27 and explain the steps",
tools=[add],
)
async for chunk in result.object_stream:
print(chunk, end="", flush=True)
calculation = await result.object()
print(f"\n\nCalculation: {calculation}")
asyncio.run(main())
stream_object
is provider-agnostic. Swap openai()
for
anthropic()
or any other future implementation – no code changes required.