-
Notifications
You must be signed in to change notification settings - Fork 253
feat: Async open ai llm generate #4879
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
being nosy... just gentle reminder to check docstring format in build |
import logging | ||
|
||
logger = logging.getLogger(__name__) | ||
logger.exception(e) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Currently exceptions that are unserializable are not handled properly: #4881, so this is a workaround until it is fixed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Greptile Summary
This PR introduces asynchronous OpenAI API support to the LLM generation functionality in Daft. The core change migrates from using the synchronous openai.OpenAI
client to openai.AsyncOpenAI
, enabling concurrent API requests through asyncio.gather()
for improved performance when processing multiple prompts. The implementation adds event loop management that attempts to use existing running loops or creates new ones when needed.
Additionally, the PR makes batch sizes provider-specific: OpenAI now defaults to 128 (down from 1024) while vLLM retains 1024. This reflects the different optimal batch characteristics between API-based inference (OpenAI) and local inference (vLLM), where smaller concurrent batches may be more efficient due to API rate limits and latency considerations.
The changes integrate with Daft's existing LLM function infrastructure by maintaining the same public interface while internally switching to async operations. The test suite has been updated accordingly, replacing openai.OpenAI
patches with openai.AsyncOpenAI
and implementing proper async mocks for the completion creation flow.
Confidence score: 2/5
- This PR introduces complex async event loop management that could cause issues in different execution contexts, particularly when nested event loops are involved
- Score reflects concerns about the event loop initialization logic and potential runtime errors when
run_until_complete()
is called from within an already running loop - Pay close attention to
daft/functions/llm.py
lines 143-175 where event loop management and async execution could fail in certain environments
2 files reviewed, 3 comments
try: | ||
self.loop = asyncio.get_running_loop() | ||
except RuntimeError: | ||
self.loop = asyncio.new_event_loop() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
logic: Event loop management is problematic. Creating a new event loop in __init__
without setting it as the current loop can cause issues. The loop may not be properly configured for the execution context.
return await asyncio.gather(*tasks) | ||
|
||
try: | ||
outputs = self.loop.run_until_complete(gather_completions()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
logic: Using run_until_complete()
on a potentially running loop will raise RuntimeError if called from within an async context. This will break in environments like Jupyter notebooks or async UDF execution.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Watch out for these and do some testing from within Jupyter? I did see some users complaining about needing to do some shenanigans with nested_async to get our stuff working in a notebook.
Generally needing to manually handle self.loop
feels kinda icky to me. Is it not possible to just grab the currently available loop?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah can cfm that it works in jupyter (native + ray).
self.loop
is to store either the current available loop, or create a new one if it does not exist
Codecov Report❌ Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## main #4879 +/- ##
==========================================
- Coverage 78.56% 78.14% -0.43%
==========================================
Files 917 917
Lines 128066 129997 +1931
==========================================
+ Hits 100621 101589 +968
- Misses 27445 28408 +963
🚀 New features to boost your workflow:
|
Awesome PR. Next steps structured gen syntax? It varies between vllm/sglang/tensorRT-llm and normal OpenAI |
This would be super interesting. Open an issue? Would love to consider. I was just reading this: https://simmering.dev/blog/openai_structured_output/ |
Changes Made
Use asyncio to make concurrent requests for llm generate with openai. Additionally this PR changes the default batch size for openai to 128 instead of 1024.
Related Issues
Checklist
docs/mkdocs.yml
navigation