本人小白,另外一个帖子原理看不太明白,求大佬写一个完整小dome
我也是小白,也没弄明白。。
首先这个输出到前端,从概念上就太大了,你这个问题不明确。
不同的框架实现方式是不同的。这里举一个通过FASTAPI的streamingresponse的例子:
class IteratorCallbackHandler(BaseCallbackHandler):
"""Callback handler that returns an iterator."""
sleep_per_iteration: float = 0.1
timeout: float = 30.0
queue: Queue[str]
done: Event
@property
def always_verbose(self) -> bool:
return True
def __init__(self) -> None:
self.queue = Queue()
self.done = Event()
def on_llm_start(
self, serialized: Dict[str, Any], prompts: List[str], **kwargs: Any
) -> None:
# If two calls are made in a row, this resets the state
self.done.clear()
def on_llm_new_token(self, token: str, **kwargs: Any) -> None:
self.queue.put_nowait(token)
def on_llm_end(self, response: LLMResult, **kwargs: Any) -> None:
self.done.set()
def on_llm_error(
self, error: Union[Exception, KeyboardInterrupt], **kwargs: Any
) -> None:
self.done.set()
def iter(self) -> Iterator[str]:
start = time.time()
while not self.queue.empty() or not self.done.is_set():
while not self.queue.empty():
yield self.queue.get()
if self.done.is_set():
break
time.sleep(self.sleep_per_iteration) # sleep for a short time to prevent busy waiting
if time.time() - start > self.timeout:
logging.warning("Timeout reached")
break
def on_chain_start(
self, serialized: Dict[str, Any], inputs: Dict[str, Any], **kwargs: Any
) -> None:
pass
def on_chain_end(self, outputs: Dict[str, Any], **kwargs: Any) -> None:
pass
def on_chain_error(
self, error: Union[Exception, KeyboardInterrupt], **kwargs: Any
) -> None:
pass
def on_tool_start(
self,
serialized: Dict[str, Any],
input_str: str,
**kwargs: Any,
) -> None:
pass
def on_agent_action(
self, action: AgentAction, color: Optional[str] = None, **kwargs: Any
) -> Any:
# TODO new line
self.queue.put_nowait("\n")
pass
def on_tool_end(
self,
output: str,
color: Optional[str] = None,
observation_prefix: Optional[str] = None,
llm_prefix: Optional[str] = None,
**kwargs: Any,
) -> None:
pass
def on_tool_error(
self, error: Union[Exception, KeyboardInterrupt], **kwargs: Any
) -> None:
pass
def on_text(
self,
text: str,
color: Optional[str] = None,
end: str = "",
**kwargs: Optional[str],
) -> None:
pass
def on_agent_finish(
self, finish: AgentFinish, color: Optional[str] = None, **kwargs: Any
) -> None:
pass
调用方:
from fastapi.responses import StreamingResponse
# 无关逻辑省略
callback_handler = IteratorCallbackHandler()
callback_manager = CallbackManager([callback_handler])
# 将callback_manager放到chain中,并异步执行(注意不要阻塞)
return StreamingResponse(callback_handler.iter())
上面这个对接上FASTAPI的router已经是直接可以用的了
1 个赞
强调下,这里chain的执行一定要异步执行(比如可以放到线程池中去执行)。不然导致阻塞,就没有流式的效果了
谢谢大佬,chain异步执行是怎么操作的
class StreamWeb(StreamingStdOutCallbackHandler):
def __init__(self):
self.num = 0
self.tokens = []
self.finish = False
def on_llm_new_token(self, token: str, **kwargs: Any) -> None:
self.tokens.append(token)
def on_llm_end(self, response: LLMResult, **kwargs: Any) -> None:
self.finish = True
def generate_tokens(self) -> Generator:
while not self.finish or self.tokens:
if self.tokens:
data = self.tokens.pop(0)
yield data
else:
time.sleep(1)
sWeb = StreamWeb()
# 流式回答gpt
llm_streaming = ChatOpenAI(temperature=0,
streaming=True,
verbose=True,
callbacks=[sWeb],
model_name="gpt-3.5-turbo")
@app.route('/get_data_streaming', methods=["GET"])
def get_data_streaming():
msg = request.args.get('msg')
chat_history = []
llm_streaming([HumanMessage(content=msg)])
result = stream_with_context(sWeb.generate_tokens())
return Response(result, mimetype='text/plain')
我这样有异步吗
你好,请问你这个有效果吗
没效果
大哥,这个怎么异步,搞了一晚上还是不行
谢谢大佬,我试试
from langchain.chat_models import ChatOpenAI
from langchain.callbacks import AsyncIteratorCallbackHandler
from langchain.schema import HumanMessage
import asyncio
from fastapi.responses import StreamingResponse
stream_handler = AsyncIteratorCallbackHandler()
chat = ChatOpenAI(streaming=True, callbacks=[stream_handler], temperature=temperature)
asyncio.create_task(chat.agenerate([[HumanMessage(content=question)]]))
return StreamingResponse(stream_handler.aiter())