六、消息传递——了解不同类型

在前一章中,我们讨论了 Node.js 和创建服务器端应用所需的基本环境。 现在,我们将看看如何使用前面介绍过的通信技术来编写可伸缩的系统。 对于应用来说,消息传递是一种很好的解耦方式,但仍然可以一起工作。 这意味着我们可以通过进程或线程创建彼此独立工作的模块,并且仍然实现共同的目标。

在本章中,我们将涵盖以下主题:

  • 使用网络模块进行本地通信
  • 利用网络
  • 快速一瞥 HTTP/3

我们还将在关注正在开发的 HTTP/3 标准的同时,看看客户端/服务器通信的未来。 然后,我们将研究 QUIC 协议的实现,这是谷歌开发的协议,HTTP/3 借鉴了它的一些想法。

让我们开始吧!

技术要求

对于本章,您将需要以下技术要求:

使用网络模块进行本地通信

虽然许多应用可以在单个线程上运行,并利用事件循环来运行,但当我们编写服务器应用时,我们希望尝试和利用所有可用的核心。 我们可以通过使用进程线程来实现这一点。 在大多数情况下,我们希望使用线程,因为线程更轻,启动更快。

We can find out whether we need a process or a thread based on whether we need to have the subsystem still running if the main system dies. If we don't care, we should utilize a thread, but if we need to have that subsystem still running even after the main process dies, we should utilize a decoupled process. This is only one way of thinking about when to use a process or a thread, but it is a good indicator.

在浏览器和 Node.js 中,我们都有 web worker 来代替传统系统中的线程。 虽然它们与其他语言的线程有许多相同的概念,但它们不能共享状态(在我们的情况下,这是首选)。

There's a way to share state between workers. This can be done through SharedArrayBuffer. While we can utilize this to share state, we want to highlight that the event system and IPC are almost always fast enough to move state and coordinate between different pieces. Also, we don't have to deal with concepts such as locks.

要启动一个工作器,我们需要呼叫new Worker(<script here>)。 让我们回顾一下这个概念:

  1. 创建一个名为Main_Worker.js的文件,并添加以下代码:
import { Worker } from 'worker_threads';

const data = {what: 'this', huh: 'yeah'};
const worker = new Worker('./worker.js');
worker.postMessage(data);
worker.on('message', (data) => {
    worker.terminate();
});
worker.on('exit', (code) => {
    console.log('our worker stopped with the following code: ', 
     code);
});
  1. 创建一个名为worker.js的文件,并添加以下代码:
import { parentPort } from 'worker_threads'

parentPort.on('message', (msg) => {
    console.log('we received a message from our parent: ', msg);
    parentPort.postMessage({RECEIVED: true});
});

正如我们所看到的,这个系统与浏览器中的系统类似。 首先,我们从worker_threads模块导入 worker。 然后我们启动它。 线程将启动,这意味着我们向它发布消息并监听事件,类似于我们在前一章中处理进程的方式。

worker.js文件中,我们从worker_threads模块导入parentPort消息通道。 我们像父母一样监听和传递消息。 一旦收到消息,就声明已收到消息。 然后父程序终止我们,我们打印出我们已经被终止了。

现在,如果我们想将所有子系统紧密耦合在一起,那么这种形式的消息传递是完全合适的。 但是如果我们希望不同的线程有不同的作业呢? 我们可以有一个缓存数据的。 另一个可能会为我们提出要求。 最后,主线程(启动进程)可以移动所有这些数据并从命令行接收数据。

要做到这一切,我们可以简单地使用内置系统。 或者,我们可以利用我们在前一章看过的机制。 这不仅为我们提供了一个高度可伸缩的系统,而且还允许我们在需要时将这些不同的子系统从线程转变为进程。 如果需要的话,这也允许我们用另一种语言编写这些独立的子系统。 现在让我们回顾一下:

  1. 让我们来做这个系统。 我们将创建四个文件:main.jscache.jssend.jspackage.json。 我们的package.json文件应该是这样的:
{
    "name" : "Chapter6_Local",
    "version" : "0.0.1",
    "type" : "module",
    "main" : "main.js"
}
  1. 接下来,在cache.js文件中添加以下代码:
import net from 'net';
import pipeName from './helper.js';

let count = 0;
let cacheTable = new Map();
// these correspond to !!!BEGIN!!!, !!!END!!!, !!!GET!!!, and 
// !!!DELETE!!! respectively
const begin, end, get, del; //shortened for readability they will use the Buffer.from() methods
let currData = [];

const socket = new net.Socket().connect(pipeName());
socket.on('data', (data) => {
    if( data.toString('utf8') === 'WHOIS' ) {
        return socket.write('cache');
    }
    if( data.includes(get) ) {
        const loc = parseInt(data.slice(get.byteLength).toString('utf8'));
        const d = cacheTable.get(loc);
        if( typeof d !== 'undefined' ) {
            socket.write(begin.toString('utf8') + d + 
             end.toString('utf8'));
        }
    }
    if( data.includes(del) ) {
        if( data.byteLength === del.byteLength ) {
            cacheTable.clear();
        } else {
            const loc = parseInt(data.slice(del.byteLength).toString('utf8'));
            cacheTable.delete(loc);
        }
    }
    if( data.includes(begin) ) {
        currData.push(data.slice(begin.byteLength).toString('utf8'));
    }
    if( currData.length ) {
        currData.push(data.toString('utf8'));
    }
    if( data.includes(end) ) {
        currData.push(data.slice(0, data.byteLength - 
         end.byteLength).toString('utf8'));
        cacheTable.set(count, currData.join(''));
        currData = [];
    }
});

This is definitely not a foolproof mechanism for handling streaming data. !!!BEGIN!!! and other command messages could be chunked and we would never see them. While we are keeping this simple, remember that production-level streaming needs to handle all of these types of issues.

子模块检查消息上的不同报头。 根据每一种类型,我们将执行该类型的操作。 这可以看作是执行远程过程调用的一种简单方法。 下面的列表描述了我们根据每个事件所做的事情:

  • !!!BEGIN!!!:我们需要开始监听线上更多的数据,因为这意味着我们要存储数据。
  • !!!END!!!:一旦我们看到这个消息,我们可以把所有这些数据放在一起并存储它,基于我们在缓存中的计数。
  • !!!GET!!!:我们将尝试获取存储在服务器提供给我们的编号位置的文件。
  • !!!DELETE!!!:如果消息的长度和这个字符串一样长,这意味着我们想要从缓存中删除所有内容。 否则,我们将尝试删除消息后面指定位置的数据。

  • send.js文件中添加以下代码:

import net from 'net'
import https from 'https'
import pipeName from './helpers.js'

const socket = new net.Socket().connect(pipeName());
socket.on('data', (data) => {
    if( data.toString('utf8') === 'WHOIS' ) {
        return socket.write('send');
    }
    const all = [];
    https.get(data.toString('utf8'), (res) => {
        res.on('data', (data) => {
            all.push(data.toString('utf8'));
        });
        res.on('end', () => {
            socket.write('!!!BEGIN!!!');
            socket.write(all.join(''));
            socket.write('!!!END!!!');
        });
    }).on('error', (err) => {
        socket.write('!!!FALSE!!!');
    });
    console.log('we received data from the main application',  
     data.toString('utf8'));
});

对于我们拥有的每个子模块,我们处理可能遇到的特定命令。 如send子模块所示,除了WHOIS命令(它告诉主应用是谁连接到它的)之外,我们在网络上处理其他任何东西。 我们尝试从指定的地址获取文件并将其写回主应用,以便将其存储在缓存中。

我们还添加了自己的协议来发送数据。 虽然这个系统不是万无一失的,我们应该添加某种类型的锁(比如布尔值,这样我们就不会在完全发送当前数据之前尝试接收更多的数据),但它确实展示了我们如何在整个系统中发送数据。 在第 7 章Streams - Understanding Streams and Non-Blocking I/O中,我们将研究一个类似的概念,但我们将利用流,这样我们就不会在每个线程中使用太多的内存。

正如我们所看到的,我们只导入了https模块。 这意味着我们只允许向通过 HTTPS 服务的地址发出请求。 如果我们想支持 HTTP,我们必须导入http模块,然后检查用户输入的地址。 在我们的例子中,我们使它尽可能的简单。

当我们想要发送数据时,我们发送!!!BEGIN!!!消息来让接收者知道我们将要发送的数据不能放入单个帧中。 然后,我们以!!!END!!!信息结束我们的信息。

如果我们无法读取我们试图获取的端点或连接超时(这两种情况都会陷入错误状态),我们将发送一个!!!FALSE!!!消息,让接收者知道我们无法完全传输数据。

几乎所有的数据传输系统都在使用包装数据的概念。 如果没有帧,我们将不得不发送一个报头,说明数据传输的大小。 然而,这意味着在发送内容之前,我们需要知道内容的大小。 帧为我们提供了不发送消息长度的选项,因此我们可以处理无限大的消息。

Framing or even boxing the data is done everywhere. If we were to look at how packets are created, for example, the concept still applies. Understanding this concept is key to understanding lower levels of the communication stack. Another concept that is good to know about is that not all of this data is sent at once. It is sent in pieces. The amount that can be sent at one time is usually set at the operating system level. One of the only properties that we can set is the highWaterMark property on streams. This property allows us to say how much data we will hold in memory before we stop reading/writing.

缓存应用的行为类似于 send 子模块,只是它响应更多的命令。 如果我们得到一个get命令,我们可以尝试从缓存中获取该项并将其发送回主模块; 否则,我们只发回null。 如果我们得到一个delete命令,在没有其他参数的情况下,我们将删除整个缓存; 否则,就删除该特定位置上的项。 最后,如果我们得到开始或结束包装器,我们将处理数据并缓存它。

目前,我们无限地增加了缓存。 我们可以很容易地添加一个特定的时间阈值的概念,允许数据在缓存(时间生活TTL)或仅持有一定数量的记录,通常利用一个最近最少使用(LRU)破坏系统。 我们将在第 9 章实际示例-构建静态服务器中了解如何实现缓存策略。 请注意,这些概念在缓存和缓存策略中非常普遍。

返回到代码中,创建main.js并添加以下代码:

  1. 为状态变量创建占位符。 这些对应于我们的消息可能处于的各种状态以及通过套接字传递的数据:
// import required modules and methods
const table = new Map();
let currData = [];
// These three items correspond to the buffers for: !!!FALSE!!!, 
// !!!BEGIN!!!, and !!!END!!! respectively
const failure, begin, end;
const methodTable = new WeakMap();
  1. 创建方法来处理通过缓存传入的数据:
const cacheHandler = function(data) {
    if( data.includes(begin) || currData.length ) {
        currData.push(data.toString('utf8'));
    }
    if( data.includes(end) ) {
        currData.push(data.toString('utf8'));
        const final = currData.join('');
        console.log(final.substring(begin.byteLength, 
         final.byteLength - end.byteLength));
        currData = [];
    }
}
  1. 接下来,添加将处理来自我们的send工作人员的消息的方法:
const sendHandler = function(data) {
    if( data.equals(failure) ) { //failure }
    if( data.includes(begin) ) { 
     currData.push(data.toString('utf8')); }
    if( currData.length ) { currData.push(data.toString('utf8')); }
    if( data.includes(end) ) { 
        table.get('cache').write(currData.join(''));
        currData = [];
    }
}
  1. 创建最后两个 helper 方法。 它们将测试我们必须知道的 worker 的数量,当我们准备开始时,另一个将为每个 worker 套接字添加方法处理程序:
const testConnections = function() {
    return table.size === 2;
}
const setupHandler = function() {
    table.forEach((value, key) => {
        value.on('data', methodTable.get(value.bind(value));
    });
}
  1. 最后一个 large 方法将处理我们通过命令行接收到的所有消息:
const startCLIMode = function() {
    process.stdin.on('data', function(data) {
        const d = data.toString('utf8');
        const instruction = d.trim().split(/\s/ig);
        switch(instruction[0]) {
            case 'delete': {
                table.get('cache').write(`!!!DELETE!!!${instruction[1] || ''}`);
                break; }
            case 'get': {
                if( typeof instruction[1] === 'undefined' ) {
                    return console.log('get needs a number 
                     associated with it!');
                }
                table.get('cache').write(`!!!GET!!!${instruction[1]}`);
                break; }
            case 'grab': {
                table.get('send').write(instruction[1]);
                break; }
            case 'stop': {
                table.forEach((value, key) => value.end());
                process.exit();
                break; }
    }});
}
  1. 最后,创建服务器并启动工作器:
const server = net.createServer().listen(pipeName());
server.on('connection', (socket) => {
    socket.once('data', (data) => {
        const type = data.toString('utf8');
        table.set(type, socket);
        if( testConnections() ) {
            setupHandlers();
            startCLIMode();
        }
    });
    socket.once('close', () => {
        table.delete(type);
    });
    socket.write('WHOIS');
});

const cache = new Worker('./cache.js');
const send = new Worker('./send.js');

主文件的某些部分已被删除,以缩短本书的代码量。 完整的示例可以在本书的 GitHub 存储库中找到。

在这里,我们有一组助手,它们将处理来自缓存和发送子系统的消息。 我们还将套接字映射到处理程序。 使用WeakMap意味着如果这些子系统崩溃或以某种方式被删除,我们不需要清理。 我们还将子系统的名称映射到套接字,这样我们就可以轻松地将消息发送到正确的子系统。 最后,我们创建一个服务器并处理传入的连接。 在本例中,我们只想检查两个子系统。 一旦我们看到两个,我们就开始我们的程序。

在我们包装消息的方式上有一些缺陷,并且测试连接的数量来看看我们是否准备好了,这也不是处理程序的最佳方法。 然而,这确实允许我们创建一个稍微复杂的应用,以便我们可以快速测试这里看到的想法。 有了这个应用,我们现在能够缓存来自远程资源的各种文件,并在需要时获取它们。 这个系统类似于一些静态服务器的工作方式。

通过查看前面的应用,很容易看到我们如何利用本地连接来创建一个仅使用核心 Node.js 系统的消息传递系统。 同样有趣的是,我们可以将管道名中的listen方法参数替换为端口号,这样我们就可以将这个应用从使用命名管道/Unix 域套接字转变为使用 TCP 套接字。

在 Node.js 中有这些工作线程之前,我们必须用进程来分离所有东西。 一开始,我们只有分叉系统。 当我们开始创建更多进程时,这使得一些系统变得相当复杂。 为了帮助我们理解这个概念,我们创建了cluster模块。 使用cluster模块,在主/从架构中更容易管理进程。

理解集群模块

虽然cluster模块可能不会像过去那样频繁使用,因为我们在 Node.js 中有工作线程,但有一个概念仍然使它强大。 我们能够在应用中的各个工作线程之间共享服务器连接。 我们的主进程将使用一种策略,使我们只向一个从进程发送请求。 这允许我们处理相当多的同时运行在相同地址和端口上的连接。

有了这个概念,让我们利用cluster模块来实现前面的程序。 现在,我们将确保发送和缓存子系统与主进程绑定。 我们的子进程将被绑定到处理来自服务器的请求。 要记住的一件事是,如果父进程死亡,我们的子进程也将死亡。 如果我们不想要这种行为,当我们在主进程中调用 fork 时,我们可以传递detached : true选项。 这将允许工作线程仍然运行。 当我们使用cluster模块时,这通常不是我们想要的行为,但是知道它是可用的是好的。

We have split up the following program into more manageable chunks. To see the full program, head over to the code repository for this chapter.

现在,我们应该能够编写一个类似于 IPC 程序的程序。 让我们来看看:

  1. 首先,我们将导入在cluster模式下实现前面示例所需的所有 Node 模块:
import cluster from 'cluster';
import https from 'https';
import http from 'http';
import { URL } from 'url';
  1. 接下来,我们设置可以跨进程使用的常量:
const numWorkers = 2;
const CACHE = 0;
const SEND = 1;
const server = '127.0.0.1';
const port = 3000;
  1. 之后,我们添加一个if/else检查,看看我们是主进程还是从进程。 这两种类型的进程都使用同一个文件,所以我们需要一种方法来区分这两种进程:
if( cluster.isMaster ) {
    // do master work
} else {
    // handle incoming connections
}
  1. 现在,写主代码。 这将进入if/else语句的第一个块。 我们的主节点需要旋转从节点,以及初始化我们的缓存:
let count = 1; //where our current record is at. We start at 1
const cache = new Map();
for(let i = 0; i < numWorkers; i++ ) {
    const worker = cluster.fork();
    worker.on('message', (msg) => {
        // handle incoming cache request
    });
}
  1. 添加一些将处理每个请求的代码,就像我们在前面的示例中所做的那样。 记住,如果我们停止我们的主进程,它将摧毁所有的从进程。 如果我们接收到STOP命令,我们将直接杀死主进程:
// inside of the worker message handler
switch(msg.cmd) {
    case 'STOP': {
        process.exit();
        break;
    }
    case 'DELETE': {
        if( msg.opt != 'all' ) {
            cache.delete(parseInt(msg.opt);
        } else {
            cache.clear();
        }
        worker.send({cmd : 'GOOD' });
        break;
    }
    case 'GET': {
        worker.send(cache.get(parseInt(msg.opt));
        break;
    }
    case 'GRAB': {
        // grab the information
        break;
    }
}
  1. 编写GRABcase 语句。 为此,利用https模块向资源发出请求:
// inside the GRAB case statement
const buf = [];
https.get(msg.opt, (res) => {
    res.on('data', (data) => {
        buf.push(data.toString('utf8'));
    });
    res.on('end', () => {
        const final = buf.join('');
        cache.set(count, final);
        count += 1;
        worker.send({cmd : 'GOOD' });
    });
});

现在,我们将编写从属代码。 所有这些都将在else区块进行。 记住,我们可以在从服务器之间共享相同的服务器位置和端口。 我们还将通过传递给我们的 URL 的搜索参数处理所有传入的请求。 这就是我们从url模块导入URL类的原因。 让我们开始:

  1. 通过启动HTTP服务器启动从代码。 记住,它们将共享相同的位置和端口:
// inside of the else block
http.Server((req, res) => {
    const search = new URL(`${location}${req.url}`).searchParams;
    const command = search.get('command');
    const params = search.get('options');
    // handle the command
    handleCommand(res, command, params);
}).listen(port);
  1. 现在,我们可以处理传递给我们的命令。 这将类似于我们前面的例子,除了我们将通过Inter-Process Communication(IPC)与主进程通信,并通过 HTTP/2 服务器处理请求。 此处只显示get命令; 其余的可以在本章的 GitHub 库中找到:
const handleCommand = function(res, command, params=null) {
    switch(command) {
        case 'get': {
            process.send({cmd: 'GET', opt : params});
            process.once('message', (msg) => {
                res.writeHead(200, { 'Content-Type' : 'text/plain' });
                res.end(msg);
            });
            break;
        }
    }
}

在这里,我们可以看到两个工作器都创建了一个HTTP服务器。 当它们都创建单独的对象时,它们共享底层端口。 这是完全隐藏的我们,但这是通过cluster模块完成的。 如果我们尝试在自己的版本中使用child_processfork 方法做类似的事情,我们将得到一个错误声明EADDRINUSE

如果我们请求以 HTML 格式存储的数据,我们会看到它返回为纯文本。 这与writeHead方法有关。 我们告诉浏览器我们正在写text/plain。 浏览器获取该信息并利用它来查看需要如何解析数据。 因为它被告知数据是普通的,所以它只会在屏幕上显示它。 如果我们在获取 HTML 数据时将其更改为text/html,它将解析并尝试渲染它。

有了这两种方法,我们就能够编写出能够充分利用系统中所有核心的程序,同时还能够协同工作。 第一个架构为我们提供了一个很好的解耦系统,这也是大多数应用的编写方式,但是cluster模块为我们提供了一种很好的处理服务器的方式。 通过混合这两种方法,我们可以创建一个高吞吐量的服务器。 虽然在 Node.js 中构建这些客户机/服务器应用很容易,但有一些事情需要注意。

新开发人员的常见陷阱

虽然利用 Unix 域套接字/Windows 命名管道很好,但这两个系统之间有一些差异。 Node.js 试图隐藏这些细节,以便我们能够专注于我们想要编写的应用,但它们仍然会显示出来。 导致新开发者失败的两个最常见原因如下:

  • 当应用退出时,Windows 命名的管道将自动销毁。 Unix 域套接字不会。 这意味着当我们退出应用时,我们应该尝试使用fs模块,并通过unlinkunlinkSync方法解除文件的链接。 我们还应该在启动时检查它是否存在,以防我们没有优雅地退出。

  • Windows 的数据框架可以比 Unix 域套接字更大。 这意味着应用可以在 Windows 上运行,但在 Unix 系统上就会失败。 这就是我们创建框架系统的原因。 最好记住这一点,特别是当我们可能想要使用外部库来处理构建 IPC 系统的部分内容时。 这些系统中有些没有记住这一点,因此很容易出现 bug。

Node.js 的目标是完全跨操作系统兼容,但当涉及跨系统实际操作时,这些系统总是有一些轻微的怪癖。 如果我们想要确保它能工作,就像我们不能保证终端用户会使用什么浏览器一样,那么我们需要在我们所有的系统上测试它。

虽然开发跨一台计算机的服务器应用很常见,但我们仍然需要将所有这些应用连接起来。 当我们不再能够使用一台计算机时,我们将需要通过网络进行讨论。 接下来我们将看看这些协议。

利用网络

虽然构建可以在同一台机器上相互对话的应用可能很酷,但最终,我们需要与外部系统对话。 在我们的例子中,大多数系统都是浏览器,但也可能是其他服务器。 由于我们不能在这些通道上使用命名管道/Unix 域套接字,我们需要使用网络的各种协议。

从技术上讲,通过使用共享驱动器/文件系统共享,我们仍然可以跨服务器使用前面两个概念,但这不是一个好主意。 我们已经展示了可以将listen方法从指向文件改为指向端口。 在最坏的情况下,我们可以使用共享文件系统,但这远远不是最优的,应该将其转换为使用我们将在这里介绍的协议之一。

的协议,我们将把重点放在这两个低级协议被称为传输控制协议(TCP),*用户数据报协议【显示】(UDP)。 我们还将了解 web 的更高级别协议:Hyper Text Transfer protocolversion 2*(HTTP/2)。 通过这些协议,我们将能够创建可以通过网络访问的高可用性应用。****

**# TCP / UDP

TCP 和 UDP 是我们在 Node.js 中可以访问的两个低级网络协议。 它们都允许我们发送和接收消息,但它们在几个关键领域有所不同。 首先,TCP 需要有一个连接的接收方和发送方。 正因为如此,我们不能只在一个频道上广播而不关心是否有人在听。

其次,在 TCP 需要握手过程的基础上,它也为我们提供了可靠的传输。 这意味着我们知道当我们发送数据时,它应该到达另一端(显然,有可能会失败,但我们不打算看那个)。 最后,TCP 保证了交付的顺序。 如果我们通过一个通道将数据发送给一个接收器,它将按照我们发送数据的顺序获取数据。 由于这些原因,我们在需要保证交货和订货时就使用 TCP。

TCP actually doesn't necessarily need to send the data in order. Everything is sent in packets. They can actually go to different servers and the routing logic can mean that later packets arrive at the receiver earlier than later ones. However, our receiver's network card reorders them for us so that it looks like we are getting them sequentially. There are many other cool aspects that go into TCP, including the transmission of data, that are outside the scope of this book, but anyone can look up networking and look at more of these concepts and how they are implemented.

话虽如此,TCP 似乎是我们一直想要使用的东西。 我们为什么不用能保证交货的东西呢? 此外,如果我们可以循环遍历所有当前连接并将数据发送给每个人,那么我们就不需要广播。 然而,由于所有这些保证,这使得 TCP 更重、更慢。 这对我们需要尽快发送数据的系统来说并不好。 对于这种类型的数据传输,我们可以利用 UDP。 UDP 给了我们一种无状态传输。 无状态传输意味着我们可以在一个通道上发送数据,它会将数据发送出去,然后忘记它。 我们不需要连接到一个地址; 相反,我们可以只发送数据(只要没有其他人绑定到该地址和端口)。 我们甚至可以建立一个多播系统,任何人都可以监听这个地址,这样就可以接收到数据。

以下是需要这类传播的一些地区:

  • 向证券交易所发出买入/卖出指令。 由于数据移动得很快,我们只关心最新的信息。 因此,如果我们没有收到一些买入/卖出订单,这真的没有关系。

  • 视频游戏的玩家位置数据。 我们只能这么快地更新游戏。 如果我们已经知道玩家的移动方向和速度,我们还可以插入或推断玩家在屏幕上的位置。 正因为如此,我们可以以任何速度接收一个玩家的位置,并计算出他们应该在哪里(这有时被称为服务器的滴答率)。

  • 电信数据不一定在乎我们是否发送了所有的数据,只要我们发送了大部分数据。 我们不需要保证传输完整的视频/音频信号,因为我们仍然可以用大部分数据提供很棒的图像。

这些只是 UDP 派上用场的几个领域。 了解了这两个系统后,我们将通过构建一个高度简化和不切实际的库存应用来了解它们。 行为如下:

  1. 服务器将发布新的股票符号和可用的股票数量。 然后,它将通过 UDP 将一个已知端口上的信息发送给所有人。
  2. 服务器将存储与客户机位置相关的所有信息。 这样,客户就无法操纵他们可能持有的股票数量。
  3. 客户端将向服务器发送一个买入或卖出订单。 服务器将判断它是否可以处理请求。 所有这些流量都将通过 TCP 传输,因为我们需要保证我们知道服务器收到了我们的消息。
  4. 服务器将响应一个错误或一条成功消息,告诉客户端他们的书已经更新。
  5. 服务器将爆炸,一个购买或出售股票发生在 UDP 通道。

这个应用看起来如下:

import dgram from 'dgram';
import { Socket } from 'net';
const multicastAddress = '239.192.0.0';
const sendMessageBadOutput = 'message needs to be formatted as follows: BUY|SELL <SYMBOL> <NUMBER>';
const recvClient = dgram.createSocket({type : 'udp4', reuseAddr: true }); //1.
const sendClient = new Socket().connect(3000, "127.0.0.1");
// receiving client code seen below
process.stdin.setEncoding('utf8');
process.stdin.on('data', (msg) => {
    const input = msg.split(' ');
    if( input.length !== 3 ) {
        console.log(sendMessageBadOutput);
        return;
    }
    const num = parseInt(input[2]);
    if( num.toString() === 'NaN' ) {
        console.log(sendMessageBadOutput);
        return;
    }
    sendClient.write(msg);
});
sendClient.on('data', (data) => {
    console.log(data.toString('utf8'));
});

除了我们正在使用的新模块:dgram模块外,前面的大部分程序应该都很熟悉。 这个模块允许我们发送数据,同时利用 UDP。

这里,我们正在创建一个使用 UDP4 (UDP over IPv4,或者我们通常知道的 IP 地址)的套接字。 我们还声明我们正在重用地址和端口。 我们这样做是为了在本地进行测试。 我们不希望在其他情况下出现这种情况:

recvClient.on('connect', () => {
    console.log('client is connected to the server');
});
recvClient.on('message', (msg) => {
    console.log('client received message', msg.toString('utf8'));
});
recvClient.bind(3000, () => {
    recvClient.addMembership(multicastAddress);
});

我们绑定到端口3000,因为服务器将在那里发送数据。 然后,我们声明要将自己添加到多播地址中。 为了使多播工作,服务器需要通过多播地址发送数据。 这些地址通常是操作系统设置的特定地址。 每个操作系统都可以决定使用什么地址,但是我们选择的地址应该适用于任何操作系统。

一旦我们收到消息,就把它打印出来。 同样,这看起来很熟悉。 Node.js 是基于事件和流的,为了一致性,它们通常被命名为相同的东西。

这个程序的其他部分处理用户输入,然后通过我们创建新套接字时打开的 TCP 通道发送数据(这应该与之前的 IPC 程序类似,除了传递端口和 IP 地址)。

这个应用的服务器稍微复杂一些,因为它包含股票应用的所有逻辑。 我们将把它分解成几个步骤:

  1. 创建一个名为main.js的文件,并将dgramnet模块导入其中:
import dgram from 'dgram';
import net from 'net';
  1. 为我们的多播地址添加一些常量,为错误输入添加错误消息,为股票报价器和客户添加Maps:
const multicastAddress = '239.192.0.0';
const badListingNumMessage = 'to list a new ticker the following format needs to be followed <SYMBOL>
<NUMBER>';
const symbolTable = new Map();
const clientTable = new Map();
  1. 接下来,我们创建两个服务器。 第一个用于侦听 UDP 消息,第二个用于接收 TCP 消息。 我们将利用 TCP 服务器来处理客户端请求。 TCP 是可靠的,而 UDP 不是:
const server = dgram.createSocket({type : 'udp4', reuseAddr : true}).bind(3000);
const recvServer = net.createServer().listen(3000, '127.0.0.1');
  1. 然后,我们需要在 TCP 服务器上为任何连接设置一个侦听器。 一旦我们有了客户端连接,我们将用一个临时表设置它们,这样我们就可以存储它们的投资组合:
recvServer.on('connection', (socket) => {
    const temp = new Map();
    clientTable.set(socket, temp);
});
  1. 现在,为客户机设置一个数据侦听器。 当我们接收到数据时,我们将按照以下格式解析消息SELL/BUY <Ticker> <Number>:
// inside of the connection callback for recvServer
socket.on('data', (msg) => {
    const input = msg.toString('utf8').split(' ');
    const buyOrSell = input[0];
    const tickerSymbol = input[1];
    const num = parseInt(input[2]);
});
  1. 基于此解析,我们检查客户端是否可以执行该操作。 如果他们可以,我们将改变他们的投资组合,并向他们发出信息,说明改变是成功的:
// inside the socket 'data' handler
const numHeld = symbolTable.get(input[1]);
if( buyOrSell === "BUY" && (num <= 0 || numHeld - num <= 0) ) {
    socket.write("ERROR!");
    return;
} 
const clientBook = clientTable.get(socket);
const clientAmount = clientBook.get(tickerSymbol);
if( buyOrSell === "SELL" && clientAmount - num < 0 ) {
    socket.write("ERROR!");
    return;
}
if( buyOrSell === "BUY" ) {
    clientBook.set(tickerSymbol, clientAmount + num);
    symbolTable.set(tickerSymbol, numHeld - num);
} else if( buyOrSell === "SELL" ) {
    clientBook.set(tickerSymbol, clientAmount - num);
    symbolTable.set(tickerSymbol, numHeld + num);
}
socket.write(`successfully processed request. You now hold ${clientBook.get(tickerSymbol)}` of ${tickerSymbol}`);
  1. 一旦我们告诉客户端我们已经处理了他们的请求,我们可以通过 UDP 服务器写入到所有的客户端:
// after the socket.write from above
const msg = Buffer.from(`${tickerSymbol} ${symbolTable.get(tickerSymbol)}`);
server.send(msg, 0, msg.byteLength, 3000, multicastAddress);
  1. 最后,我们需要通过标准输入处理来自服务器的新股票报价器。 一旦我们处理了请求,我们将数据发送到 UDP 服务器上,以便每个客户端都知道新的股票:
process.stdin.setEncoding('utf8');
process.stdin.on('data', (data) => {
    const input = data.split(' ');
    const num = parseInt(input[1]);
    symbolTable.set(input[0], num);
    for(const client of clientTable) {
        client[1].set(input[0], 0);
    }

    server.send(Buffer.from(data), 0, data.length, 3000, multicastAddress);
});

为了清晰起见,几乎所有的错误逻辑都被删除了,但是您可以在本书的 GitHub 存储库中找到它。 如前面的示例所示,利用所有接口将数据发送到其他点(无论是应用的其他部分还是侦听数据的远程客户机)是非常简单的。 它们都使用几乎完全相同的接口,只是在实现细节上略有不同。 只要记住,如果需要交付保证,就应该使用 TCP; 否则,UDP 是一个不错的选择。

接下来,我们将看看 HTTP/2 标准,以及与netdgramhttp/https模块相比,Node.js 中的服务器系统有什么不同。

HTTP / 2

虽然该技术于 2015 年推出,但采用速度很慢。 HTTP/2 建立在 HTTP/1.1 协议之上,允许在以前的系统中产生问题的各种特性。 这使我们能够使用一个 TCP 连接来接收不同的请求。 这在 HTTP/1.1 中是不可能的,它导致了一个叫做行头阻塞的问题。 这意味着我们实际上只能处理这么多 TCP 连接,如果我们有一个长时间运行的 TCP 连接,它可能会阻塞它之后的所有请求。

HTTP/2 也让我们能够推送服务器端资源。 这意味着,如果服务器知道浏览器需要某个资源(比如 CSS 文件),它可以在需要它之前将它推到服务器。 最后,HTTP/2 为我们提供了内置的流媒体功能。 这意味着我们能够使用一个连接并将数据作为流向下发送,而不需要一次发送所有数据。

HTTP/2 还为我们提供了其他好处,但这些是主要的。 虽然httphttps模块可能还会被使用一段时间,但 Node.js 中的http2模块应该被用于任何新的应用。

Node.js 中的http2模块与httphttps模块有几个不同之处。 虽然它不遵循其他 IPC/网络模块提供的标准,但它确实给了我们一些通过 HTTP/2 发送数据的好方法。 其中一种方法允许我们直接从文件系统流文件,而不需要为文件创建一个管道并将其发送给发送者。 下面的代码可以看到这些差异的一个例子:

import http2 from 'http2';
import fs from 'fs';
const server = http2.createSecureServer({
    key : fs.readFileSync('server.key.pem'),
    cert : fs.readFileSync('server.crt.pem')
});
server.on('error', (err) => console.error(err));
server.on('stream', (stream, headers) => {
    stream.respond({
        'content-type': 'text/plain',
        ':status' : 200
    });
    stream.end('Hello from Http2 server');
});
server.listen(8081, '127.0.0.1');

首先,请注意服务器需要一个私钥和一个公共证书。 这些是用来确保建立的连接是安全的,这意味着没有人可以看到正在发送的内容。 为了能够做到这一点,我们需要一个像openssl这样的工具来创建这些密钥和证书。 在 Windows 10 和其他 Unix 操作系统中,我们可以免费获得这些功能。 否则,需要下载 Cygwin(http://www.cygwin.com/)。 使用openssl,我们可以运行以下命令:

> openssl req -x509 -newkey rsa:4096 -keyout server.key.pem -out server.crt.pem -days 365

此命令生成服务器和客户端安全通信所需的私钥和公共证书。 我们不会在这里详细讨论如何实现这一点,但是关于如何使用 SSL/TLS 实现这一点的信息可以在:https://www.cloudflare.com/learning/ssl/transport-layer-security-tls/上找到。

生成证书和密钥后,我们可以读入它们,以便服务器可以开始运行。 我们还将注意到,我们响应的不是消息事件或请求事件,而是流事件。 HTTP/2 利用流而不是试图一次发送所有数据。 虽然 Node.js 为我们将请求和响应封装在流中,但这不是操作系统层可能的处理方式。 HTTP/2 立即利用了流。 这就是为什么事件被称为流的原因。

接下来,我们不调用writeHead方法,而只是响应流。 当我们想要发送信息时,我们使用respond方法并以这种方式发送头部。 我们还会注意到一些头文件以冒号作为前缀。 这是特定于http2模块的,如果在发送特定报头时发现问题,在前面加冒号可以解决问题。

除了我们在这里讨论的内容,这看起来应该很像我们用 Node.js 编写的普通 HTTP(s)服务器。 还有一些其他的好处,我们得到了与http2模块,然而,其中之一是响应一个文件,而不是必须读取该文件并以那种方式发送它。 这可以在以下代码中看到:

import http2 from 'http2';
import fs from 'fs';
import path from 'path';

const basePath = process.env.npm_package_config_static; //1.
const supportedTypes = new Set(['.ico', '.html', '.css', '.js']);
const server = http2.createSecureServer({
    key : fs.readFileSync(process.env.npm_package_config_key),
    cert : fs.readFileSync(process.env.npm_package_config_cert),
    allowHTTP1 : true //2.
});
server.on('error', (err) => console.error(err));
server.on('stream', (stream, header) => {
    const fileLoc = header[':path'];
    const extension = path.extname(fileLoc); //3.
    if(!supportedTypes.has(extension)) {
        stream.respond({
            ':status' : 400,
            'content-type' : 'application/json'
        });
        stream.end(JSON.stringify({
            error : 'unsupported data type!',
            extension
        }));
        return;
    }
    stream.respondWithFile( //4.
        path.join(process.cwd(), basePath, fileLoc),
        {
            ':status' : 200,
            'content-type' :
                extension === ".html" ?
                'text/html' :
                extension === ".css" ?
                'text/css' :
                'text/javascript'
        },
        {
            onError : (err) => { //5.
                if( err.code === 'ENOENT') {
                    stream.respond({ ':status' : 404 });
                } else {
                    stream.respond({ ':status' : 500 });
                }
                stream.end();
            }
        }
    )
});
server.listen(80, '127.0.0.1');

项目编号是关键的兴趣点,它们的工作如下:

  1. 我们正在从package.json文件中读取信息,就像我们在前一章中所做的那样。 我们还通过npm run <script>命令运行这个。 请参阅前一章,了解如何做到这一点,以及如何在程序中使用package.json文件中的配置数据。
  2. 我们已经为服务器设置了一个特定的配置选项。 如果连接到我们的客户端不能使用 HTTP/2,那么我们将自动将所有内容转换回协商的协议,例如 HTTP/1.1。
  3. 我们从 URL 获取扩展名。 通过这种方式,我们可以看到是否支持该文件类型并发送适当的文件; 否则,我们将返回一个 400 错误消息,并声明这是一个错误请求。
  4. 这个方法允许我们传入一个路径。 然后,核心系统将为我们处理发送文件。 我们需要做的就是确保正确设置内容类型,以便浏览器能够为我们解释数据。
  5. 如果在任何地方出现错误,比如文件不存在,我们将用正确的状态进行响应,比如 404 或 500 错误。

虽然我们在这里展示的只是http2模块的一小部分,但这展示了http2模块的不同之处,以及我们如何快速建立一个模块。 如有必要,请参阅https://Node.js.org/dist/latest-v12.x/docs/api/http2.html,了解http2模块与http模块的不同之处,以及它所附带的所有功能。 现在,我们来看看 web 的未来状态,看看 Node.js 中的 HTTP/3。

快速一瞥 HTTP/3

虽然我们讨论的是进程、线程和其他计算机之间通信的当前状态,但有一种新的方式可以传递信息。 新的标准被称为 HTTP/3,它与前两次迭代有显著的不同。

QUIC 协议

Quick UDP Internet Connections(QUIC)由谷歌于 2012 年推出。 该协议类似于 TCP、Transport Layer Security(TLS)和 HTTP/2 协议,但都是通过 UDP 传输的。 这意味着 TCP 内建的大量开销已经被一种发送数据的新方法所取代。 最重要的是,由于 TLS 内置在协议中,这意味着向已经定义的协议添加安全性的开销已经被消除了。

QUIC 目前被谷歌用于 YouTube 等内容。 虽然 QUIC 从未获得广泛的吸引力,但它帮助产生了创建 HTTP/3 标准委员会的小组,并帮助指导委员会利用 UDP 作为协议的基础层。 它还展示了如何将安全性内置到协议中,并引导 HTTP/3 将其内置到协议中。

Other companies have started to implement the QUIC protocol while HTTP/3 is being developed. One notable inclusion to this list is Cloudflare. Their blogpost on implementing QUIC can be found here: https://blog.cloudflare.com/the-road-to-quic/.

虽然 HTTP/3 还没有添加到 Node.js 中,但有一些包实现了 QUIC 协议。

看看 node-quic

虽然 QUIC 现在不是最容易使用的,而且唯一的官方实现是在 Chromium 源代码中编写的,但已经有其他实现允许我们使用这个协议。 node-quic模块已经被弃用,取而代之的是试图直接内建到 Node.js 中的 QUIC 实现,但我们仍然可以使用它来看看将来我们可能如何利用 QUIC 甚至 HTTP/3。

首先,我们需要运行npm install node-quic命令安装模块。 这样,我们就能够编写一个简单的客户机-服务器应用。 客户端应该如下所示:

import quic from 'node-quic'

const port = 3000;
const address = '127.0.0.1';
process.stdin.setEncoding('utf8');
process.stdin.on('data', (data) => {
    quic.send(port, address, data.trim())
        .onData((data) => {
            console.log('we received the following back: ', data);
        });
});

我们会注意到发送数据的方式类似于我们在 UDP 系统中这样做; 也就是说,我们可以发送数据,而不需要实际绑定到端口和地址。 除此之外,系统的运行方式类似于使用httphttp2模块编写其他应用时的运行方式。 这里需要注意的一件有趣的事情是,当我们从quic流接收数据时,数据会自动转换为字符串。

前一个客户端的服务器如下所示:

import quic from 'node-quic'

const port = 3000;
const address = '127.0.0.1';
quic.listen(port, address)
    .then(() => {})
    .onError((err) => console.error(err))
    .onData((data, stream, buffer) => {
        console.log('we received data:', data);
        if( data === 'quit' ) {
            console.log('we are going to stop listening for data');
            quic.stopListening();
        } else {
            stream.write("Thank you for the data!");
        }
    });

同样,它看起来应该与我们编写的其他应用很相似。 这里的一个主要区别是,这个模块是在编写时考虑到承诺的。 除此之外,我们接收的数据是一个字符串,所以如果我们通过运行stopListening方法接收到quit,我们将关闭自己。 否则,我们就写入想要发送到 steam 的数据,类似于我们使用 HTTP/2 协议所做的。

To stay on top of the implementation status for HTTP/3, it is recommended that you check out the following link and check it periodically: https://quicwg.org/.

正如我们所看到的,在这个模块中使用 QUIC 协议非常简单。 这对内部应用也很有用。 请注意,无论是 QUC 协议还是 HTTP/3 标准都还没有完全完成,而且可能还需要几年的时间。 这并不是说您不应该使用它们——只是说,在标准不断变化的同时,事情可能会发生相当快的变化。

总结

在不同的系统(线程、进程甚至其他计算机)之间发送数据,是我们作为开发人员所要做的。 我们可以使用许多工具来做到这一点,我们已经研究了其中的大多数。 只要记住,虽然一个选项可以使应用看起来简单,但这并不总是意味着它是最好的选择。 当涉及到系统分解时,我们通常希望将特定的任务分配给一个单元,并使用某种形式的 IPC(如命名管道)进行通信。 如果我们需要将任务转移到另一台计算机,我们总是可以将它切换到 TCP。

有了这些 IPC 和 web 协议,我们将能够轻松处理 Node.js 中的大多数问题,并在 web 应用中同时编写客户端和服务器端代码。 然而,Node.js 不仅仅是为 web 应用而构建的。 我们几乎可以做任何其他语言可以做的事情,甚至拥有其他语言拥有的大部分工具。 本章应该有助于澄清并帮助巩固如何将 Node.js 构建到已经开发好的应用生态系统中。

考虑到所有这些,我们将研究流,以及如何在 Node.js 中实现我们自己的流。**