服務(wù)器發(fā)送的事件(SSE)是一個(gè)用于將通知從 HTTP 服務(wù)器推送到客戶端輕量級(jí)策添、標(biāo)準(zhǔn)化協(xié)議膛壹。與提供雙向通信的 WebSocket 相比, SSE 只允許從服務(wù)器到客戶端的單向通信脆炎。如果這是你所需要的, SSE 的優(yōu)點(diǎn)是要簡(jiǎn)單得多, 只能依賴于 HTTP, 并提供瀏覽器中斷的連接的重試語義聚霜。
根據(jù) SSE 規(guī)范, 客戶端可以通過 HTTP 從服務(wù)器請(qǐng)求事件流效五。服務(wù)器使用具有固定字符編碼 UTF-8 的媒體類型text/event-stream
進(jìn)行響應(yīng), 并保持響應(yīng)打開, 以便在可用時(shí)將事件發(fā)送到客戶端噩茄。事件是文本結(jié)構(gòu), 它持有字段并以空行終止, 例如
data: { "username": "John Doe" }
event: added
id: 42
data: another event
重新連接后,客戶端可以選擇發(fā)送Last-Event-ID
(標(biāo)識(shí)最后一個(gè)已接受事件)頭部給服務(wù)器续徽。
模型
Akka HTTP 將事件流表示為Source[ServerSentEvent, NotUsed]
, 其中 ServerSentEvent 是具有以下只讀屬性的樣例類:
-
data: String
– 實(shí)際有效載荷, 可能跨越多行 -
eventType
: Option[String] – 可選限定符, 例如. “added”, “removed”, 等等. -
id: Option[String]
– 可選標(biāo)識(shí)符 -
retry
: Option[Int] – 可選的重新連接延遲 (毫秒)
根據(jù) SSE 規(guī)范Akka HTTP 還提供了Last-Event-ID
頭部和text/event-stream
媒體類型疑俭。
服務(wù)器端用法: 編組
為了響應(yīng)帶有事件流的 HTTP 請(qǐng)求, 必須將 EventStreamMarshalling 定義的 ToResponseMarshaller[Source[ServerSentEvent, Any]] 隱式引入到各自路由定義的范圍中:
import akka.NotUsed
import akka.stream.scaladsl.Source
import akka.http.scaladsl.Http
import akka.http.scaladsl.unmarshalling.Unmarshal
import akka.http.scaladsl.model.sse.ServerSentEvent
import scala.concurrent.duration._
import java.time.LocalTime
import java.time.format.DateTimeFormatter.ISO_LOCAL_TIME
def route: Route = {
import akka.http.scaladsl.marshalling.sse.EventStreamMarshalling._
path("events") {
get {
complete {
Source
.tick(2.seconds, 2.seconds, NotUsed)
.map(_ => LocalTime.now())
.map(time => ServerSentEvent(ISO_LOCAL_TIME.format(time)))
.keepAlive(1.second, () => ServerSentEvent.heartbeat)
}
}
}
}
客戶端用法:解組
為了解組作為Source[ServerSentEvent, NotUsed]
的事件流, 必須將 EventStreamUnmarshalling
定義的 FromEntityUnmarshaller[Source[ServerSentEvent, NotUsed]]
隱式引入到范圍中:
import akka.NotUsed
import akka.stream.scaladsl.Source
import akka.http.scaladsl.Http
import akka.http.scaladsl.unmarshalling.Unmarshal
import akka.http.scaladsl.model.sse.ServerSentEvent
import scala.concurrent.duration._
import java.time.LocalTime
import java.time.format.DateTimeFormatter.ISO_LOCAL_TIME
def route: Route = {
import akka.http.scaladsl.marshalling.sse.EventStreamUnmarshalling._
path("events") {
get {
complete {
Source
.tick(2.seconds, 2.seconds, NotUsed)
.map(_ => LocalTime.now())
.map(time => ServerSentEvent(ISO_LOCAL_TIME.format(time)))
.keepAlive(1.second, () => ServerSentEvent.heartbeat)
}
}
}
}
請(qǐng)注意, 如果您正在尋找一種能夠永久訂閱事件流的彈性方法, Alpakka 提供的 EventSource 連接器可以使用上次收到的事件 id 自動(dòng)重新連接广料。