2024-01-24更新。

草稿

Module: reactor-netty-http

Package: reactor.netty.http.server

设计目的(猜测)

定义路由表。

构建方法

1)通过静态方法`newRoutes()`构建

HttpServerRoutes httpServerRoutes = HttpServerRoutes.newRoutes();

2)通过`HttpServerRoutes`实例串联

HttpServerRoutes routes = HttpServerRoutes.newRoutes();

HttpServerRoutes newRoutes = routes.get("/foo", ((httpServerRequest, httpServerResponse) -> {

    // 做些处理

}));

需要注意的是,上述`newRoutes`将会同时包含`routes`中已经定义的路由记录和新定义的`GET /foo`路由策略。

使用方法

路由

DisposableServer disposableServer = HttpServer.create()

        // 注册路由

        .route(httpServerRoutes ->

                // GET /foo

                httpServerRoutes.get("/foo", (httpServerRequest, httpServerResponse) ->

                                httpServerResponse.sendString(Mono.just("GET /foo")))

                        // POST /bar

                        .post("/bar", (httpServerRequest, httpServerResponse) ->

                                httpServerResponse.sendString(Mono.just("POST /bar"))))

        .port(8080)

        .bindNow();

disposableServer.onDispose()

        .block();
curl -i 127.0.0.1:8080/foo
HTTP/1.1 200 OK
content-length: 8

GET /foo
curl -i 127.0.0.1:8080/bar -XPOST
HTTP/1.1 200 OK
content-length: 9

POST /bar

WebSockets

private static String[] lines = new String[] {

        "Line One",

        "Line Two",

        "Line Three",

};



private static void wsHttpServer() {

    DisposableServer disposableServer = HttpServer.create()

            // 注册路由

            .route(httpServerRoutes ->

                    // /ws

                    httpServerRoutes.ws("/ws", (websocketInbound, websocketOutbound) -> {

                                        // 创建ByteBuf的响应流

                                        Flux<ByteBuf> byteBufFlux = Flux.create(sink -> {

                                            for (String line : lines) {

                                                // 将字节数组包装为ByteBuf对象

                                                sink.next(Unpooled.wrappedBuffer((line + "\n").getBytes(StandardCharsets.UTF_8)));

                                            }

                                            // 响应结束

                                            sink.complete();

                                        });

                                        // 配置响应流

                                        return websocketOutbound.send(byteBufFlux);

                                    }))

            .port(8080)

            .bindNow();

    disposableServer.onDispose()

            .block();

}
curl -i -N -H "Connection: Upgrade" -H "Upgrade: websocket" -H "Sec-WebSocket-Key: SGVsbG8sIHdvcmxkIQ==" -H "Sec-WebSocket-Version: 13" 127.0.0.1:8080/ws
HTTP/1.1 101 Switching Protocols
upgrade: websocket
connection: upgrade
sec-websocket-accept: qGEgH3En71di5rrssAZTmtRTyFk=

�       Line One
�       Line Two
�
 Line Three
��Bye

如果确实需要使用`curl`来测试WebSocket服务器,可以仅仅用curl来检查初始的WebSocket握手响应。WebSocket连接开始于一个HTTP请求,这个请求会被升级为WebSocket协议。这个HTTP请求称为"握手"请求。

在这里:

  • -i 将显示响应中的HTTP头信息。

  • -N 禁用缓冲,即尽可能快的打印数据。

  • Sec-WebSocket-Key 是一个Base64编码的随机值,服务器将用这个值作为响应头`Sec-WebSocket-Accept`的一部分返回。

  • Sec-WebSocket-Version 表示WebSocket的版本,一般是13。

请注意,上面的命令只是测试WebSocket握手的HTTP请求和响应,实际的WebSocket通信无法通过`curl`完成。

websocat ws://127.0.0.1:8080/ws
Line One
Line Two
Line Three
^C

Websocat 是一个用 Rust 编程语言编写的命令行工具,它为 WebSockets 提供类似 socat 的功能,使用户能够从命令行快速且轻松地与 WebSocket 服务器和客户端建立连接,并进行数据的发送和接收。Websocat 支持多种运行模式,在这些模式中,它可以作为 WebSocket 客户端或服务器操作,以及通过代理连接,转发数据等。

这个工具非常有用,尤其当您需要测试或调试 WebSocket 服务、快速建立一次性的 WebSocket 连接或者将 WebSocket 数据流集成到其他命令行工具和脚本中时。其基本使用格式如下:

websocat [OPTIONS] <source> <sink>
  • <source>: 表示数据的来源,可以是 WebSocket URL、监听的地址或其他支持的协议。

  • <sink>: 表示数据的去向,通常是另一个 WebSocket 地址、文件、命令行工具等。

Websocat 支持的特性非常多,包括但不限于:

  • 与 WebSocket 服务器和客户端建立和接收连接。

  • 在 WebSocket 连接上转发标准输入(stdin)和输出(stdout)数据。

  • 将 WebSocket 数据转发到 Unix 套接字。

  • 在监听模式下作为 WebSocket 服务器。

  • 支持 SSL 连接(通过 wss:// URL )。

  • 可以通过代理(如 SOCKS5)进行连接。

  • 可以在数据流中执行额外的数据转换和处理。

SSE

private static String[] lines = new String[] {

        "Line One",

        "Line Two",

        "Line Three",

};



private static void sseHttpServer() {

    DisposableServer disposableServer = HttpServer.create()

            // 注册路由

            .route(httpServerRoutes ->

                    // GET /sse

                    httpServerRoutes.get("/sse", (httpServerRequest, httpServerResponse) -> {

                        // 创建ByteBuf的响应流

                        Flux<ByteBuf> byteBufFlux = Flux.create(sink -> {

                            for (String line : lines) {

                                // 将字节数组包装为ByteBuf对象

                                sink.next(Unpooled.wrappedBuffer((line + "\n").getBytes(StandardCharsets.UTF_8)));

                            }

                            // 响应结束

                            sink.complete();

                        });

                        // 配置响应流

                        return httpServerResponse

                                .sse()

                                .send(byteBufFlux

                                        // 每个元素延迟3秒

                                        .delayElements(Duration.ofSeconds(3)),

                                        // 对于数据流中的每一个元素都立即推送给客户端

                                        p -> true);

                    }))

            .port(8080)

            .bindNow();

    disposableServer.onDispose()

            .block();

}
curl -i 127.0.0.1:8080/sse
HTTP/1.1 200 OK
transfer-encoding: chunked
content-type: text/event-stream

Line One
Line Two
Line Three

delayElements() 方法是Reactor库中`Flux`类的一个方法,用于对流中的每个元素进行延迟操作。当你希望每个元素在被发送之前等待一定的时间时,可以使用此方法。delayElements() 会延迟每个元素的发射,但不会改变它们的顺序。