2024-01-24更新。
草稿
Module: reactor-netty-http
Package: reactor.netty.http.server
设计目的(猜测)
定义路由表。
构建方法
1)通过静态方法`newRoutes()`构建
HttpServerRoutes httpServerRoutes = HttpServerRoutes.newRoutes();
2)通过`HttpServerRoutes`实例串联
HttpServerRoutes routes = HttpServerRoutes.newRoutes();
HttpServerRoutes newRoutes = routes.get("/foo", ((httpServerRequest, httpServerResponse) -> {
// 做些处理
}));
需要注意的是,上述`newRoutes`将会同时包含`routes`中已经定义的路由记录和新定义的`GET /foo`路由策略。
使用方法
路由
DisposableServer disposableServer = HttpServer.create()
// 注册路由
.route(httpServerRoutes ->
// GET /foo
httpServerRoutes.get("/foo", (httpServerRequest, httpServerResponse) ->
httpServerResponse.sendString(Mono.just("GET /foo")))
// POST /bar
.post("/bar", (httpServerRequest, httpServerResponse) ->
httpServerResponse.sendString(Mono.just("POST /bar"))))
.port(8080)
.bindNow();
disposableServer.onDispose()
.block();
curl -i 127.0.0.1:8080/foo
HTTP/1.1 200 OK
content-length: 8
GET /foo
curl -i 127.0.0.1:8080/bar -XPOST
HTTP/1.1 200 OK
content-length: 9
POST /bar
WebSockets
private static String[] lines = new String[] {
"Line One",
"Line Two",
"Line Three",
};
private static void wsHttpServer() {
DisposableServer disposableServer = HttpServer.create()
// 注册路由
.route(httpServerRoutes ->
// /ws
httpServerRoutes.ws("/ws", (websocketInbound, websocketOutbound) -> {
// 创建ByteBuf的响应流
Flux<ByteBuf> byteBufFlux = Flux.create(sink -> {
for (String line : lines) {
// 将字节数组包装为ByteBuf对象
sink.next(Unpooled.wrappedBuffer((line + "\n").getBytes(StandardCharsets.UTF_8)));
}
// 响应结束
sink.complete();
});
// 配置响应流
return websocketOutbound.send(byteBufFlux);
}))
.port(8080)
.bindNow();
disposableServer.onDispose()
.block();
}
curl -i -N -H "Connection: Upgrade" -H "Upgrade: websocket" -H "Sec-WebSocket-Key: SGVsbG8sIHdvcmxkIQ==" -H "Sec-WebSocket-Version: 13" 127.0.0.1:8080/ws
HTTP/1.1 101 Switching Protocols
upgrade: websocket
connection: upgrade
sec-websocket-accept: qGEgH3En71di5rrssAZTmtRTyFk=
� Line One
� Line Two
�
Line Three
��Bye
如果确实需要使用`curl`来测试WebSocket服务器,可以仅仅用curl来检查初始的WebSocket握手响应。WebSocket连接开始于一个HTTP请求,这个请求会被升级为WebSocket协议。这个HTTP请求称为"握手"请求。
在这里:
-
-i
将显示响应中的HTTP头信息。 -
-N
禁用缓冲,即尽可能快的打印数据。 -
Sec-WebSocket-Key
是一个Base64编码的随机值,服务器将用这个值作为响应头`Sec-WebSocket-Accept`的一部分返回。 -
Sec-WebSocket-Version
表示WebSocket的版本,一般是13。
请注意,上面的命令只是测试WebSocket握手的HTTP请求和响应,实际的WebSocket通信无法通过`curl`完成。
websocat ws://127.0.0.1:8080/ws
Line One
Line Two
Line Three
^C
Websocat 是一个用 Rust 编程语言编写的命令行工具,它为 WebSockets 提供类似 socat 的功能,使用户能够从命令行快速且轻松地与 WebSocket 服务器和客户端建立连接,并进行数据的发送和接收。Websocat 支持多种运行模式,在这些模式中,它可以作为 WebSocket 客户端或服务器操作,以及通过代理连接,转发数据等。
这个工具非常有用,尤其当您需要测试或调试 WebSocket 服务、快速建立一次性的 WebSocket 连接或者将 WebSocket 数据流集成到其他命令行工具和脚本中时。其基本使用格式如下:
websocat [OPTIONS] <source> <sink>
-
<source>
: 表示数据的来源,可以是 WebSocket URL、监听的地址或其他支持的协议。 -
<sink>
: 表示数据的去向,通常是另一个 WebSocket 地址、文件、命令行工具等。
Websocat 支持的特性非常多,包括但不限于:
-
与 WebSocket 服务器和客户端建立和接收连接。
-
在 WebSocket 连接上转发标准输入(stdin)和输出(stdout)数据。
-
将 WebSocket 数据转发到 Unix 套接字。
-
在监听模式下作为 WebSocket 服务器。
-
支持 SSL 连接(通过
wss://
URL )。 -
可以通过代理(如 SOCKS5)进行连接。
-
可以在数据流中执行额外的数据转换和处理。
SSE
private static String[] lines = new String[] {
"Line One",
"Line Two",
"Line Three",
};
private static void sseHttpServer() {
DisposableServer disposableServer = HttpServer.create()
// 注册路由
.route(httpServerRoutes ->
// GET /sse
httpServerRoutes.get("/sse", (httpServerRequest, httpServerResponse) -> {
// 创建ByteBuf的响应流
Flux<ByteBuf> byteBufFlux = Flux.create(sink -> {
for (String line : lines) {
// 将字节数组包装为ByteBuf对象
sink.next(Unpooled.wrappedBuffer((line + "\n").getBytes(StandardCharsets.UTF_8)));
}
// 响应结束
sink.complete();
});
// 配置响应流
return httpServerResponse
.sse()
.send(byteBufFlux
// 每个元素延迟3秒
.delayElements(Duration.ofSeconds(3)),
// 对于数据流中的每一个元素都立即推送给客户端
p -> true);
}))
.port(8080)
.bindNow();
disposableServer.onDispose()
.block();
}
curl -i 127.0.0.1:8080/sse
HTTP/1.1 200 OK
transfer-encoding: chunked
content-type: text/event-stream
Line One
Line Two
Line Three
delayElements()
方法是Reactor库中`Flux`类的一个方法,用于对流中的每个元素进行延迟操作。当你希望每个元素在被发送之前等待一定的时间时,可以使用此方法。delayElements()
会延迟每个元素的发射,但不会改变它们的顺序。