LLM 流式通信之 SSE

寫在前面

SSE是LLM進行流式通信常用的技術(shù)方案, 下圖是 kimi 的示例

kimi回答時使用SSE

SSE 簡介

Server-Sent Events(SSE)是一種允許服務(wù)器向客戶端實時推送數(shù)據(jù)的技術(shù)平窘。它基于HTTP協(xié)議叽粹,允許服務(wù)器通過一個持久的HTTP連接向客戶端發(fā)送事件流币砂。以下是SSE的一些關(guān)鍵點:

  1. SSE的本質(zhì):SSE利用HTTP協(xié)議的流信息(streaming)特性,實現(xiàn)服務(wù)器向客戶端的單向通信笼踩。客戶端保持連接打開,等待服務(wù)器發(fā)送新的數(shù)據(jù)流。

  2. SSE的特點

    • 使用HTTP協(xié)議冕碟,現(xiàn)有的服務(wù)器軟件都支持。
    • 輕量級匆浙,使用簡單安寺,與WebSocket相比,協(xié)議相對簡單首尼。
    • 默認支持斷線重連挑庶,而WebSocket需要自己實現(xiàn)。
    • 一般只用來傳送文本數(shù)據(jù)软能,二進制數(shù)據(jù)需要編碼后傳送迎捺。
    • 支持自定義發(fā)送的消息類型。
  3. 客戶端API

    • EventSource對象用于創(chuàng)建與服務(wù)器的連接并接收事件查排。
    • 通過監(jiān)聽message事件接收服務(wù)器發(fā)送的消息凳枝。
    • 可以監(jiān)聽自定義事件,不僅限于message事件跋核。
  4. 服務(wù)器端發(fā)送事件

    • 服務(wù)器端腳本需要使用text/event-streamMIME類型響應(yīng)內(nèi)容岖瑰。
    • 每個通知以文本塊形式發(fā)送叛买,并以一對換行符結(jié)尾。
    • 消息由字段組成锭环,包括event聪全、dataidretry等辅辩。
  5. 事件流格式

    • 事件流是一個簡單的文本數(shù)據(jù)流难礼,使用UTF-8編碼。
    • 消息由一對換行符分開玫锋,以冒號開頭的行為注釋行蛾茉,會被忽略。
    • 每條消息由一行或多行文字組成撩鹿,列出該消息的字段谦炬。
  6. 瀏覽器兼容性

    • SSE在現(xiàn)代瀏覽器中得到了廣泛支持,除了IE/Edge外节沦,其他瀏覽器如Firefox键思、Chrome、Safari等都支持SSE甫贯。

SSE適用于需要服務(wù)器向客戶端單向?qū)崟r推送數(shù)據(jù)的場景吼鳞,如實時通知、股票行情叫搁、新聞推送等赔桌。它是一種有效降低服務(wù)器負載和網(wǎng)絡(luò)資源消耗的技術(shù),通過服務(wù)器主動向客戶端發(fā)送更新事件渴逻,實現(xiàn)實時通信疾党。

py 中使用 SSE

  • py 中異步: async + await
  • py 中流式接收 SSE: httpx
  • py 中流式返回 SSE: from fastapi.responses import StreamingResponse as FastapiStreamingResponse
  • 路由定義
@router.post("/stream", tags=["chat"])
async def streaming_chat(
    params: QuestionParams, current_user: TokenData = Depends(get_current_user)
):
    if not params.user_id:
        params.user_id = current_user.uid
    async_generator = RetrievalController().stream_answer(params)
    return StreamingResponse(async_generator)
  • 流式輸出定義
from typing import Mapping

from fastapi.responses import StreamingResponse as FastapiStreamingResponse
from starlette.background import BackgroundTask
from starlette.responses import ContentStream


class StreamingResponse(FastapiStreamingResponse):
    def __init__(
        self,
        content: ContentStream,
        status_code: int = 200,
        headers: Mapping[str, str] | None = None,
        media_type: str | None = None,
        background: BackgroundTask | None = None,
    ) -> None:
        default_headers = {"Content-Type": "text/event-stream", "Cache-Control": "no-cache", "X-Accel-Buffering": "no"}
        default_headers.update(headers or {})
        super().__init__(content, status_code, default_headers, media_type, background)
  • 流式接收并流式返回
    @LogDecorate(
        func_name="retrieval_controller::process_stream_answer", raise_exc=True
    )
    async def stream_answer(self, params: QuestionParams, model: int = 1):
        """
        :param model: 1-8B 2-32B
        """
        session_id = params.session_id
        if params.new_session:
            session_id = str(uuid.uuid1()).replace("-", "")
        request_body = dict(
            messages=msgs,
            user_id=params.user_id,
        )
        stream_answer_api = f"{AI_DOMAIN}{STREAM_ANSWER_API}"

        answer = ""
        # 流式接收
        async with httpx.AsyncClient() as client:
            async with client.stream(
                "POST",
                stream_answer_api,
                json=request_body,
                timeout=60,
                headers=dict(trace_id=get_req_ctx("trace_id")),
            ) as response:
                async for chunk in response.aiter_text():
                    answer += chunk
                    yield self.get_yield_data(
                        {"content": chunk, "create_at": int(time.time() * 1000)}
                    )

        yield self.get_yield_data("[DONE]")
        yield self.get_yield_data({"session_id": session_id})
        yield self.get_yield_data("[END]")

        # 落庫
        await user_qa_dao.save_user_qa(params.q, answer, session_id, params.user_id)

Go中使用SSE

使用 https://github.com/hertz-contrib/sse

import (
    "context"
    "encoding/json"
    "fmt"
    "time"

    "github.com/cloudwego/hertz/pkg/app"
    "github.com/cloudwego/hertz/pkg/common/hlog"
    "github.com/google/uuid"
    "github.com/hertz-contrib/sse"
    "github.com/spf13/cast"
)

func ChatStream(ctx context.Context, c *app.RequestContext) {
    u := ctl.CtxUser(c)

    var req struct {
        Query string `form:"query" json:"query"`
        Model int    `form:"model" json:"model"`
        Sid   string `form:"sid" json:"sid"` // session id
    }
    if err := c.BindAndValidate(&req); err != nil {
        utils.RespErr(c, err)
        return
    }

    // 聊天消息支持多輪對話
    var sid string
    if req.Sid != "" {
        sid = req.Sid
    } else {
        sid = uuid.New().String()
    }
    msg := chat.SaveUserMsg(ctx, sid, req.Query)
    content := &chat.Content{
        Messages: msg,
        UserId:   cast.ToString(u.ID),
        UserName: u.Name,
    }
    b, _ := json.Marshal(content)

    // https://github.com/hertz-contrib/sse/blob/main/examples/client/quickstart/main.go
    cli := sse.NewClient(conf.GetConf().Dev.AIDomain + "xxx")
    cli.SetMethod("POST")
    cli.SetHeaders(map[string]string{"Content-Type": "application/json", "trace_id": httpx.TraceId()})
    cli.SetBody(b)

    var ans, allAns string // AI 返回內(nèi)容
    var flag bool          // reply正文標識
    events := make(chan *sse.Event)
    errChan := make(chan error)
    s := sse.NewStream(c)
    go func() {
        cErr := cli.Subscribe(func(msg *sse.Event) {
            if msg != nil && msg.Data != nil {
                events <- msg
                return
            }
        })
        errChan <- cErr
    }()
    for {
        select {
        case e := <-events:
            m := map[string]any{}
            _ = json.Unmarshal(e.Data, &m)
            if v, ok := m["content"]; ok {
                allAns += v.(string)
                if flag {
                    ans += v.(string)
                }
                if v == "__REPLY_START__" {
                    flag = true
                }
                da := map[string]any{
                    "content":   v,
                    "create_at": time.Now().Unix(),
                }
                jsonData, _ := json.Marshal(da)
                hlog.Info("publish event data = %s", string(jsonData))
                _ = s.Publish(&sse.Event{Data: jsonData})
            } else {
                hlog.Info("invalid event data = %s", string(e.Data))
            }
        case err := <-errChan:
            if err != nil {
                hlog.CtxErrorf(context.Background(), "err = %s", err.Error())
            }
            chat.SaveAssistantMsg(ctx, sid, ans, msg)
            chat.SaveQA(u.ID, sid, req.Query, allAns)
            _ = s.Publish(&sse.Event{Data: []byte("[DONE]")})
            _ = s.Publish(&sse.Event{Data: []byte(fmt.Sprintf(`{"session_id": "%s"}`, sid))})
            _ = s.Publish(&sse.Event{Data: []byte("[END]")})
            hlog.Info("cli get all event")
            return
        }
    }
}

寫在最后

需要注意的點

  • py 使用 httpx 接收 SSE 流式數(shù)據(jù), 對數(shù)據(jù)結(jié)構(gòu)沒有要求, 比如 SSE event 常見的 data: xxx, 可以不帶 data 標識返回
  • go 中使用 https://github.com/hertz-contrib/sse 接收 SSE 流式數(shù)據(jù)
    • 底層會解析 SSE 數(shù)據(jù)格式, 需要判斷 data 標識, 如果沒有, 會導致解析失敗
    • 如果數(shù)據(jù)包含 \n換行, 也會導致數(shù)據(jù)解析失敗, 比較簡單的做法 data: json 格式數(shù)據(jù)
// go 中對應(yīng) SSE 庫數(shù)據(jù)解析源碼
func (c *Client) processEvent(msg []byte) (event *Event, err error) {
    var e Event

    if len(msg) < 1 {
        return nil, fmt.Errorf("event message was empty")
    }

    // Normalize the crlf to lf to make it easier to split the lines.
    // Split the line by "\n" or "\r", per the spec.
    for _, line := range bytes.FieldsFunc(msg, func(r rune) bool { return r == '\n' || r == '\r' }) {
        switch {
        case bytes.HasPrefix(line, headerID):
            e.ID = string(append([]byte(nil), trimHeader(len(headerID), line)...))
        case bytes.HasPrefix(line, headerData):
            // The spec allows for multiple data fields per event, concatenated them with "\n".
            e.Data = append(e.Data[:], append(trimHeader(len(headerData), line), byte('\n'))...)
        // The spec says that a line that simply contains the string "data" should be treated as a data field with an empty body.
        case bytes.Equal(line, bytes.TrimSuffix(headerData, []byte(":"))):
            e.Data = append(e.Data, byte('\n'))
        case bytes.HasPrefix(line, headerEvent):
            e.Event = string(append([]byte(nil), trimHeader(len(headerEvent), line)...))
        case bytes.HasPrefix(line, headerRetry):
            e.Retry, err = strconv.ParseUint(b2s(append([]byte(nil), trimHeader(len(headerRetry), line)...)), 10, 64)
            if err != nil {
                return nil, fmt.Errorf("process message `retry` failed, err is %s", err)
            }
        default:
            // Ignore any garbage that doesn't match what we're looking for.
        }
    }

    // Trim the last "\n" per the spec.
    e.Data = bytes.TrimSuffix(e.Data, []byte("\n"))

    if c.encodingBase64 {
        buf := make([]byte, base64.StdEncoding.DecodedLen(len(e.Data)))

        n, err := base64.StdEncoding.Decode(buf, e.Data)
        if err != nil {
            err = fmt.Errorf("failed to decode event message: %s", err)
            return &e, err
        }
        e.Data = buf[:n]
    }
    return &e, err
}
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個濱河市惨奕,隨后出現(xiàn)的幾起案子雪位,更是在濱河造成了極大的恐慌,老刑警劉巖梨撞,帶你破解...
    沈念sama閱讀 206,126評論 6 481
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件雹洗,死亡現(xiàn)場離奇詭異,居然都是意外死亡聋袋,警方通過查閱死者的電腦和手機队伟,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 88,254評論 2 382
  • 文/潘曉璐 我一進店門穴吹,熙熙樓的掌柜王于貴愁眉苦臉地迎上來幽勒,“玉大人,你說我怎么就攤上這事港令∩度荩” “怎么了锈颗?”我有些...
    開封第一講書人閱讀 152,445評論 0 341
  • 文/不壞的土叔 我叫張陵,是天一觀的道長咪惠。 經(jīng)常有香客問我击吱,道長,這世上最難降的妖魔是什么遥昧? 我笑而不...
    開封第一講書人閱讀 55,185評論 1 278
  • 正文 為了忘掉前任覆醇,我火速辦了婚禮,結(jié)果婚禮上炭臭,老公的妹妹穿的比我還像新娘永脓。我一直安慰自己,他們只是感情好鞋仍,可當我...
    茶點故事閱讀 64,178評論 5 371
  • 文/花漫 我一把揭開白布常摧。 她就那樣靜靜地躺著,像睡著了一般威创。 火紅的嫁衣襯著肌膚如雪落午。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 48,970評論 1 284
  • 那天肚豺,我揣著相機與錄音溃斋,去河邊找鬼。 笑死详炬,一個胖子當著我的面吹牛盐类,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播呛谜,決...
    沈念sama閱讀 38,276評論 3 399
  • 文/蒼蘭香墨 我猛地睜開眼在跳,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了隐岛?” 一聲冷哼從身側(cè)響起猫妙,我...
    開封第一講書人閱讀 36,927評論 0 259
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎聚凹,沒想到半個月后割坠,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 43,400評論 1 300
  • 正文 獨居荒郊野嶺守林人離奇死亡妒牙,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 35,883評論 2 323
  • 正文 我和宋清朗相戀三年彼哼,在試婚紗的時候發(fā)現(xiàn)自己被綠了。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片湘今。...
    茶點故事閱讀 37,997評論 1 333
  • 序言:一個原本活蹦亂跳的男人離奇死亡敢朱,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情拴签,我是刑警寧澤孝常,帶...
    沈念sama閱讀 33,646評論 4 322
  • 正文 年R本政府宣布,位于F島的核電站蚓哩,受9級特大地震影響构灸,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜岸梨,卻給世界環(huán)境...
    茶點故事閱讀 39,213評論 3 307
  • 文/蒙蒙 一喜颁、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧曹阔,春花似錦洛巢、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,204評論 0 19
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至芥炭,卻和暖如春漓库,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背园蝠。 一陣腳步聲響...
    開封第一講書人閱讀 31,423評論 1 260
  • 我被黑心中介騙來泰國打工渺蒿, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留,地道東北人彪薛。 一個月前我還...
    沈念sama閱讀 45,423評論 2 352
  • 正文 我出身青樓茂装,卻偏偏與公主長得像,于是被迫代替她去往敵國和親善延。 傳聞我的和親對象是個殘疾皇子少态,可洞房花燭夜當晚...
    茶點故事閱讀 42,722評論 2 345

推薦閱讀更多精彩內(nèi)容