求助,langchain流输出到前端,有大佬能写一个完整的小dome吗

本人小白,另外一个帖子原理看不太明白,求大佬写一个完整小dome

我也是小白,也没弄明白。。

首先这个输出到前端,从概念上就太大了,你这个问题不明确。
不同的框架实现方式是不同的。这里举一个通过FASTAPI的streamingresponse的例子:



class IteratorCallbackHandler(BaseCallbackHandler):
    """Callback handler that returns an iterator."""
    sleep_per_iteration: float = 0.1
    timeout: float = 30.0

    queue: Queue[str]

    done: Event

    @property
    def always_verbose(self) -> bool:
        return True

    def __init__(self) -> None:
        self.queue = Queue()
        self.done = Event()

    def on_llm_start(
        self, serialized: Dict[str, Any], prompts: List[str], **kwargs: Any
    ) -> None:
        # If two calls are made in a row, this resets the state
        self.done.clear()

    def on_llm_new_token(self, token: str, **kwargs: Any) -> None:
        self.queue.put_nowait(token)

    def on_llm_end(self, response: LLMResult, **kwargs: Any) -> None:
        self.done.set()

    def on_llm_error(
        self, error: Union[Exception, KeyboardInterrupt], **kwargs: Any
    ) -> None:
        self.done.set()

    def iter(self) -> Iterator[str]:
        start = time.time()
        while not self.queue.empty() or not self.done.is_set():
            while not self.queue.empty():
                yield self.queue.get()
            if self.done.is_set():
                break
            time.sleep(self.sleep_per_iteration)  # sleep for a short time to prevent busy waiting
            if time.time() - start > self.timeout:
                logging.warning("Timeout reached")
                break

    def on_chain_start(
            self, serialized: Dict[str, Any], inputs: Dict[str, Any], **kwargs: Any
    ) -> None:
        pass

    def on_chain_end(self, outputs: Dict[str, Any], **kwargs: Any) -> None:
        pass

    def on_chain_error(
            self, error: Union[Exception, KeyboardInterrupt], **kwargs: Any
    ) -> None:
        pass

    def on_tool_start(
            self,
            serialized: Dict[str, Any],
            input_str: str,
            **kwargs: Any,
    ) -> None:
        pass

    def on_agent_action(
            self, action: AgentAction, color: Optional[str] = None, **kwargs: Any
    ) -> Any:
        # TODO new line
        self.queue.put_nowait("\n")
        pass

    def on_tool_end(
            self,
            output: str,
            color: Optional[str] = None,
            observation_prefix: Optional[str] = None,
            llm_prefix: Optional[str] = None,
            **kwargs: Any,
    ) -> None:
        pass

    def on_tool_error(
            self, error: Union[Exception, KeyboardInterrupt], **kwargs: Any
    ) -> None:
        pass

    def on_text(
            self,
            text: str,
            color: Optional[str] = None,
            end: str = "",
            **kwargs: Optional[str],
    ) -> None:
        pass

    def on_agent_finish(
            self, finish: AgentFinish, color: Optional[str] = None, **kwargs: Any
    ) -> None:
        pass
    

调用方:

from fastapi.responses import StreamingResponse
# 无关逻辑省略
callback_handler = IteratorCallbackHandler()
callback_manager = CallbackManager([callback_handler])
# 将callback_manager放到chain中,并异步执行(注意不要阻塞)
return StreamingResponse(callback_handler.iter())

上面这个对接上FASTAPI的router已经是直接可以用的了

1 个赞

强调下,这里chain的执行一定要异步执行(比如可以放到线程池中去执行)。不然导致阻塞,就没有流式的效果了

谢谢大佬,chain异步执行是怎么操作的

class StreamWeb(StreamingStdOutCallbackHandler):
    def __init__(self):
        self.num = 0
        self.tokens = []
        self.finish = False

    def on_llm_new_token(self, token: str, **kwargs: Any) -> None:
        self.tokens.append(token)

    def on_llm_end(self, response: LLMResult, **kwargs: Any) -> None:
        self.finish = True

    def generate_tokens(self) -> Generator:
        while not self.finish or self.tokens:
            if self.tokens:

                data = self.tokens.pop(0)
                yield data
            else:
                time.sleep(1)


sWeb = StreamWeb()
# 流式回答gpt
llm_streaming = ChatOpenAI(temperature=0,
                           streaming=True,
                           verbose=True,
                           callbacks=[sWeb],
                           model_name="gpt-3.5-turbo")




@app.route('/get_data_streaming', methods=["GET"])
def get_data_streaming():
    msg = request.args.get('msg')
    chat_history = []
    llm_streaming([HumanMessage(content=msg)])
    result = stream_with_context(sWeb.generate_tokens())
    return Response(result, mimetype='text/plain')

我这样有异步吗

你好,请问你这个有效果吗

没效果 :joy_cat: :joy_cat: :joy_cat: :joy_cat:

大哥,这个怎么异步,搞了一晚上还是不行

python异步执行的方式是有很多种啊 https://docs.python.org/3/library/asyncio-eventloop.html

1 个赞

谢谢大佬,我试试

from langchain.chat_models import ChatOpenAI
from langchain.callbacks import AsyncIteratorCallbackHandler
from langchain.schema import HumanMessage
import asyncio
from fastapi.responses import StreamingResponse

stream_handler = AsyncIteratorCallbackHandler()
chat = ChatOpenAI(streaming=True, callbacks=[stream_handler], temperature=temperature)

asyncio.create_task(chat.agenerate([[HumanMessage(content=question)]]))
return StreamingResponse(stream_handler.aiter())