微服务
基础介绍(多个NestJS 程序之间的调用
介绍一下 多个Nest程序之间的调用方式 和实现 为了演示 功能,我们需要调整一下 程序,把它变成一个 monorepo 工程 具体到底如何做 基于other_tech 我们新 拉一个分支 命名为 monorepo , 具体的集成方法 请参考为的 另一个 github参考 哪里有文章和例子 https://github.com/BM-laoli/morerepo-test (有可能learn 这种方式已经过时,请参考 )https://juejin.cn/post/6964328103447363614#comment 来做pnpm下的 monorepo
Pnpm下的monorepo工程改造
参考文章 https://juejin.cn/post/6964328103447363614#comment
注意 以下内容 ,请不要直接拿去用,我仅仅做了演示为了图方便,在nest中如果你要实施 monorepo 请看 nest官方文档,官方文档提供了 一个适用 与Nest的方案 https://docs.nestjs.com/cli/monorepo. 当然你如果 想一意孤行 ,那么 哈哈哈 请参考下面的文档
- 新建文件夹 (子项目)
packages/m1 这个文件夹下📁 下就有一个新的Nestjs 项目了
├── src
│ ├── app.controller.ts
│ ├── app.module.ts
│ ├── app.service.ts
│ └── main.ts
├── package.json
├── nest-cli.json
├── tsconfig.build.json
├── tsconfig.json
└── yarn.lock
注意改一下去掉 private,如果你需要 请改一下包名(大部分情况下 你并不需要去改 因为我们是两个 独立的 service, 除非你要做 lib,若做lib Nest官方也有详细说明
{
"name": "nest-m1",
"version": "1.0.0",
"description": "Nest TypeScript starter repository",
"license": "MIT",
// 去掉 "private": true,
}
- pnpm 初始化
# 初始化
$ pnpm init -y
新建一个 pnpm-workspace.yaml
packages:
# all packages in subdirs of packages/ and components/
- 'packages/**'
- 依赖安装
# root 目录(我们原来从 另一个branch 过来的 一样install就好了,既得install 前 把 node_mmodules 先干掉 )
$ pnpm install -w # (意思是全局依赖)
# package/m1 目录
$ pnpm install
然后 按照相同 的逻辑 我们再去 cv 文件,新建一个 client 文件夹 这样就算搞定了!
Install
不管是 service / client 都需要使用这个依赖,注意这里说的 service 和client 是 指 微服务 和 请求他的其他服务(相对的 service | client
# $ npm i --save @nestjs/microservices
$ pnpm add @nestjs/microservices -w
Service 端
在 main 引入
const app = await NestFactory.createMicroservice<MicroserviceOptions>(
AppModule,
{
transport: Transport.TCP,
// 默认情况Nest 使用TCP,
// options: {
// host
// port
// retryAttempts 重试消息次数 默认 =0
// retryDelay 重试之间的间隔(ms)
// },
},
);
在 controller 中写些逻辑
需要特别强调的是 在Nest中 微服务通过模式识别消息和事件
- 请求-响应
- 基于事件
import {
Controller,
Get,
Inject,
RequestTimeoutException,
Scope,
} from '@nestjs/common';
import {
CONTEXT,
Ctx,
EventPattern,
MessagePattern,
NatsContext,
Payload,
RequestContext,
} from '@nestjs/microservices';
import { from, fromEvent, Observable } from 'rxjs';
import { AppService } from './app.service';
@Controller({
scope: Scope.REQUEST,
})
export class AppController {
constructor(
private readonly appService: AppService,
@Inject(CONTEXT) private ctx: RequestContext, // scope 的时候
) {}
// 在Nest中有两种 微服务的模式来识别消息和事件
// 下面就是 一个 (请求-响应)
// 注意哈 这个decorator 只在 controller 中用
@MessagePattern({ cmd: 'sum' })
accumulate(data: number[]): number {
return (data || []).reduce((a, b) => a + b);
}
// 异步/Observable 也是支持的
@MessagePattern({ cmd: 'sumSync' })
accumulateSync(data: number[]): Promise<number> {
return Promise.resolve((data || []).reduce((a, b) => a + b));
}
@MessagePattern({ cmd: 'sumObservable' })
accumulateObservable(data: number[]): Observable<number> {
return from([1, 2, 3, 4]);
}
// 以上是基于 请求+响应 的 下面咱们 来观察一下 基于 事件的
@EventPattern('user_created')
async cuser(data: any) {
console.log(this.ctx.pattern);
return 1;
}
// 我们可以为一个 事件注册多个处理程序 他们会依次触发
@EventPattern('user_created')
async cuser2(data: any) {
return 2;
}
// 如果你需要一些 请求的详细信息 可以传递一个context
@MessagePattern('time.use.*') // 通配符
getDate(@Payload() data: number[], @Ctx() context: NatsContext) {
return new Date().toLocaleDateString();
}
}
Client 端
关于client 的集成相对简单
注入+使用就好了
@Module({
imports: [
ClientsModule.register([
{
name: 'M1_SERVICE',
transport: Transport.TCP,
},
]),
],
controllers: [AppController, AppController2],
providers: [AppService],
})
使用
@Controller('m1')
export class AppController {
constructor(
private readonly appService: AppService,
@Inject('M1_SERVICE') private M1_client: ClientProxy,
) {}
@Get()
// t1(@Res() res: Response) {
t1() {
// this.M1_client.
// return this.accumulate();
// this.accumulate().subscribe((it) => {
// res.status(HttpStatus.OK).send(`${it}--`);
// });
// return this.accumulateSync();
// return this.accumulateObservable();
// return this.publish();
return this.publish2();
}
// 请求响应模式
accumulate() {
const pattern = { cmd: 'sum' };
const payload = [1, 2, 3];
// 注意这个默认都是返回一个 "冷Observable" 至于什么是热什么是冷 请你去rxjs官方一探究竟
return this.M1_client.send<number[]>(pattern, payload);
}
async accumulateSync() {
const pattern = { cmd: 'sumSync' };
const payload = [1, 2, 3, 4];
const value = await lastValueFrom(
this.M1_client.send<number[]>(pattern, payload),
);
return value;
}
accumulateObservable() {
const pattern = { cmd: 'sumObservable' };
const payload = [1, 2, 3, 4, 5];
// 模拟操作 可以使用 pipe(timeout(5000)) 这个操作符
return this.M1_client.send<number[]>(pattern, payload);
}
// 发送事件 注意啊这个返回的是 一个热的Observable,至于什么是热什么是冷 请你去rxjs官方一探究竟
async publish() {
this.M1_client.emit<number>('user_created', 666);
}
async publish2() {
this.M1_client.emit<number>('time.use.*', 2123);
}
}
你也许需要注意的一个点是 Scope 的情况
在 Nest 中,几乎所有内容都在传入请求之间共享,这可能会发生一些意想不到的问题。我们有一个数据库的连接池,具有全局状态的单例服务等。请记住,Node.js 不遵循请求/响应多线程无状态模型,其中每个请求都由单独的线程处理。因此,使用单一实例对于我们的应用程序是完全安全的。 但多数情况下 我们会有一些缓存策略 导致程序表现出意想不到的行为 故我们把模式改了,关键代码就是 scope 这个参数
@Controller({
path: 'm2',
scope: Scope.REQUEST,
})
export class AppController2 {
constructor(
@Inject('M1_SERVICE') private M1_client: ClientProxy,
@Inject(CONTEXT) private ctx: RequestContext, // scope 的时候
) {}
@Get()
t1() {
// this.M1_client.
}
// 请求响应模式
accumulate() {
const pattern = { cmd: 'sum' };
const payload = [1, 2, 3];
// 注意这个默认都是返回一个 "冷Observable"
this.ctx.pattern;
return this.M1_client.send<number[]>(pattern, payload);
}
}
集成其它的工具🔧
接下来的内容 主要增强 上述的 微服务能力;可以看出来,上述的设计比较简单,没有考虑到 多机器 多名称的情况,而且我们 也没有配置 port 如果是 k8s 等多机器部署,那么服务发现该如何处理,服务发现 等等问题..... 都没有讲到, 下面的工具 旨在完善 ,微服务间存在的问题
Redis
Transport.REDIS并利用了 Redis 的发布/订阅功能, 设置了一套发布/订阅的消息传递范例,
当然特性(优点与不足) 要讲一下
- 无法明确最终那个订阅者会收到消息(无法保证消息或事件将由至少一个服务处理)
- 每个微服务可以订阅任意数量的通道,一次性可订阅多个通道
- 通过通道交换的消息是 即发即弃, 如果没有人用 那会被删除掉
- 一条消息 可以被多个 订阅者订阅和接收
基础的用法和前文提到的 是一模一样的,不过要注意 不同的类型(我是指 ts 定义的类型) 提供的功能不一样
~ 详细见 MicroserviceOptions .d.ts 定义
/**
* @publicApi
*/
export interface RedisOptions {
transport?: Transport.REDIS;
options?: {
host?: string;
port?: number;
retryAttempts?: number;
retryDelay?: number;
serializer?: Serializer;
deserializer?: Deserializer;
} & IORedisOptions;
}
为了演示多个 microservices 之间的同时订阅,我们把 M1(在上节中它作为一个microservices ) , 再cv一份 重命名为 m2文件夹, 然后run
安装必要的依赖
npm i --save ioredis
pnpm add ioredis -w
修改 service 并且加一个 订阅器
const app = await NestFactory.createMicroservice<MicroserviceOptions>(
AppModule,
{
transport: Transport.REDIS,
options: {
host: 'localhost',
port: 6379,
},
},
);
++++
// redis
@MessagePattern('notifications')
getNotifications(@Payload() data: number[], @Ctx() context: RedisContext) {
console.log(`Channel: ${context.getChannel()}`); // notifications
console.log(222);
}
修改 client 换一个 事件发布
ClientsModule.register([
{
name: 'M1_SERVICE',
transport: Transport.REDIS,
options: {
host: 'localhost',
port: 6379,
},
},
]),
++++
// redis
async publishToRedis() {
this.M1_client.emit<number>('notifications', 2123);
}
最后说一下它的应用场景
在微服务架构中 Redis通常用作消息队列,适合处理高并发的消息传递; 但是Redis不是专门设计用来处理消息传递的,在需要保证消息顺序的前提下 redis 也许不是最好的选择。 下面的几种场景可以使用
应用场景 | des |
---|---|
异步任务处理 | 将需要异步执行的任务放入Redis消息队列中,处理任务的工作进程从队列中读取任务,完成后将结果写回Redis或其他存储系统中。这种方式可以避免任务处理过程中阻塞主线程,提高系统的并发处理能力。 |
实时消息推送 | 通过Redis消息队列将实时消息推送给客户端,例如在线聊天、股票行情等。消息发布者将消息写入Redis消息队列中,订阅者通过订阅Redis消息频道或使用Redis的PUBLISH命令来接收消息。 |
日志处理 | 将应用产生的日志信息写入Redis消息队列中,使用工作进程从队列中读取日志并进行处理,例如过滤、分析、存储等。这种方式可以避免日志处理过程中阻塞主线程,提高系统的性能和可靠性。 |
MQTT
MQTT 是一种开源的轻量级消息传递协 . 基于 MQTT 构建的通信系统由 发布服务器、代理和一个或多个客户端组成。它专为受限设备和低带宽、高延迟或不可靠的网络而设计。
总体来说 它成本低 而且效果好 延迟低,非常适合用在 物联网, 传感器 和服务器的通信
首先我们需要构建这样的一套基础设施 ,我们使用docker 去构建 ,再次强调因为MQTT是一个协议,所以我们需要搭建一个支持MQTT协议的服务器,使服务端和客户端能够通过这个MQTT服务器(broker)进行消息转发、通信
EMQ X (Erlang/Enterprise/Elastic MQTT Broker) 是基于 Erlang/OTP 平台开发的开源物联网 MQTT 消息服务器,它是目前比较好 MQTT 服务产品,多了不说了 自己去search吧
$ docker pull emqx
$ docker run -d --name emqx -p 1883:1883 -p 18083:18083 emqx/emqx
# 完事之后 管理端 在 :18083 admin/public 中登录管理
# 在工程 root 目录下安装 mqtt 依赖
$ pnpm add mqtt -w
接下来 开始集成, 把service 和 client的协议改了
~ service
const app = await NestFactory.createMicroservice<MicroserviceOptions>(
AppModule,
{
transport: Transport.MQTT,
options: {
url: 'mqtt://localhost:1883',
},
},
);
~ client
ClientsModule.register([
{
name: 'M1_SERVICE',
transport: Transport.MQTT,
options: {
url: 'mqtt://localhost:1883',
},
},
]),
然后我们 多加一些发布和订阅的代码( 实际上和 之前的那些类型,代码都是一样的版式 )
~ service
// MQTT
// @MessagePattern('sensors/+/temperature/+') 支持通配符
@MessagePattern('notificationsMQTT')
getNotificationsMQTT(@Payload() data: number[], @Ctx() context: MqttContext) {
console.log(`Topic: ${context.getTopic()}`); // notifications
console.log(222);
// 如果你需要访问 原始的mqtt数据包 请使用
console.log(context.getPacket());
}
// MQTT2
@MessagePattern('replace-emoji')
replaceEmoji(@Payload() data: string, @Ctx() context: MqttContext): string {
const {
properties: { userProperties },
// 如果你这里 是一个undefined 记得去给你的 配置 加一个 协议版本 protocolVersion: 5,
// issues https://github.com/nestjs/nest/issues/10016
} = context.getPacket();
console.log('2');
return userProperties['x-version'] === '1.0.0' ? '🐱' : '🐈';
}
~ Client
// MQTT
publishToMQTT() {
// this.M1_client.emit<number>('notificationsMQTT', {
// name: 6666,
// });
// 配置 消息选项
const userProperties = { 'x-version': '1.0.0' };
const record = new MqttRecordBuilder(':cat:')
.setProperties({ userProperties })
.setQoS(1)
.build();
console.log(record);
return this.M1_client.send('replace-emoji', record);
}
NATS
这个东西 也是一个消息传递系统,目前最新版本 使用Go开发 性能非常的NB,它是一个分布式的消息中间件
同样的代理 它也是有直接的消息协议,请先安装 这个系统
$ docker pull nats
$ docker run -d --name nats-main -p 4222:4222 -p 8222:8222 nats
# 安装ndoejs 依赖
$ pnpm add nats -w
特别要说明的: Nest Transport.NATS 不使用 NATS 内置的 请求-响应 传输机制,无论响应的Service在什么地方,它都会动态的返回 到 client程序.
但对于 基于事件 的传输模式,Nest 使用NATS 内置的机制,如果发布一个基于主题的message时,订阅这个主题的 都会收到此消息( 这种一对多 被称为 扇出)
然后 NATS 提供了一个内置的 分布式队列 ,如果要使用情况 下面的代码
~ Service
const app = await NestFactory.createMicroservice<MicroserviceOptions>(
AppModule,
{
transport: Transport.NATS,
options: {
servers: ['nats://localhost:4222'],
},
},
);
~ Client
ClientsModule.register([
{
name: 'M1_SERVICE',
transport: Transport.NATS,
options: {
servers: ['nats://localhost:4222'],
},
},
]),
然后就是多加几个 订阅 和publish 方法
~ Service
// NTAS
@MessagePattern('notificationsNATS')
notificationsNATS(@Payload() data: number[], @Ctx() context: NatsContext) {
console.log(`Subject: ${context.getSubject()}`);
}
// NATS 可以设置 标头
@MessagePattern('replace-emoji-NATS')
replaceEmojiNATS(
@Payload() data: string,
@Ctx() context: NatsContext,
): string {
const headers = context.getHeaders();
// 这个返回时一个是get 不要相信官方给写的 headers['x-version']
return headers.get('x-version') === '1.0.0' ? '🐱' : '🐈';
}
~ Client
import * as nats from 'nats';
// NAST
publishToNAST() {
// this.M1_client.emit<number>('notificationsNATS', {
// name: 6666,
// });
// 配置 消息选项
const headers = nats.headers();
headers.set('x-version', '1.0.0');
const record = new NatsRecordBuilder(':cat:').setHeaders(headers).build();
this.M1_client.send('replace-emoji-NATS', record).subscribe((it) => {
console.log(it);
});
}
RabbitMQ
RabbitMQ是一个开源的消息代理,它支持多种消息协议。
还是一样 我们先安装它
$ docker pull rabbitmq:3.11-management
$ docker run -d --hostname my-rabbit --name some-rabbit -p 5672:5672 -p 15672:15672 rabbitmq:3.11-management
# 安装nodejs 依赖
$ pnpm install amqplib amqp-connection-manager -w
:15672 是它的管理prot 默认用户名是 guest , guest
大部分使用方法和方法和 前面的消息系统使用方式是 一样的,RabbitMQ配置上有 比较大的区别
选项 | 作用 |
---|---|
urls | 连接网址 |
queue | 服务器将侦听的队列名称 |
prefetchCount | 设置通道的预取计数 |
isGlobalPrefetchCount | 启用每通道预取 |
noAck | 如果启用手动确认模式false |
queueOptions | 其他队列选项(在此处阅读更多内容)) |
socketOptions | 其他套接字选项(在此处阅读更多内容)) |
headers | 要与每条消息一起发送的标头 |
我们先把 连接方式改了
~ Service
const app = await NestFactory.createMicroservice<MicroserviceOptions>(
AppModule,
{
transport: Transport.RMQ,
options: {
urls: ['amqp://localhost:5672'],
queue: 'cats_queue',
queueOptions: {
durable: false,
// 在RabbitMQ中,队列可以被声明为持久化(durable)或非持久化(durable: false)。当队列被声明为持久化时,
// 它将在RabbitMQ服务器重启后仍然存在。但是,如果队列被声明为非持久化,那么它将在服务器重启后消失。
// 因此,如果您希望您的队列在服务器重启后仍然存在,您应该将其声明为持久化。
},
},
},
);
~ Client
ClientsModule.register([
{
name: 'M1_SERVICE',
transport: Transport.RMQ,
options: {
urls: ['amqp://localhost:5672'],
queue: 'cats_queue',
queueOptions: {
durable: false,
},
},
},
]),
然后我们简单的展示它的使用
~ Service
// RMQ
@MessagePattern('notificationsRMQ')
getNotificationsRMQ(@Payload() data: number[], @Ctx() context: RmqContext) {
console.log(`Pattern: ${context.getPattern()}`);
console.log(`Pattern: ${context.getMessage()}`); // 获取原始数据
// 要检索对 RabbitMQ 通道的引用 请参考
console.log(context.getChannelRef());
console.log(data);
// 如果 我们设置了 noAck: false,需要 手动的check一下
context.getChannelRef().ack(context.getMessage());
return '666';
}
// RMQ 同样的也支持 设置头信息等操作
@MessagePattern('replace-emoji-RMQ')
replaceEmojiRMQ(@Payload() data: string, @Ctx() context: RmqContext): string {
const {
properties: { headers },
} = context.getMessage();
return headers['x-version'] === '1.0.0' ? '🐱' : '🐈';
}
~ Client
// RMQ
publishToRMQ() {
this.M1_client.send('notificationsRMQ', {
name: 'RMQ',
}).subscribe((it) => {
console.log(it);
});
// const message = ':cat:';
// const record = new RmqRecordBuilder(message)
// .setOptions({
// headers: {
// ['x-version']: '1.0.0',
// },
// priority: 3,
// })
// .build();
// this.M1_client.send('replace-emoji-RMQ', record).subscribe((it) => {
// console.log(it);
// });
}
Kafka
警告 此小节内容完全超纲 200%,如果你要尝试 ,请做好心理准备,和前期的知识准备之后 再深入学习。 Kafka 是一个开源的分布式流处理平台,它能够实现实时的事件驱动应用程序开发。具有高吞吐量、低延迟的等特点 与Apache Storm和Spark很好地集成在一起,用于实时流数据分析。
下面的它的关键的三个功能
- 发布和订阅记录流,类似于消息队列或企业邮件系统。
- 以容错持久的方式存储记录流。
- 在记录流发生时对其进行处理。
先在 docker 上构建一个 kafka 基础设施(这里是踩坑的一个地方 Docker 到底如何部署 Kafka)
docker-compose.yml
version: "3.0"
services:
zookeeper:
image: zookeeper:3.5.5
restart: always
container_name: zookeeper
ports:
- "2181:2181"
expose:
- "2181"
environment:
- ZOO_MY_ID=1
kafka:
image: wurstmeister/kafka:2.12-2.2.1
restart: always
container_name: kafka
environment:
- KAFKA_BROKER_ID=1
- KAFKA_LISTENERS=PLAINTEXT://kafka:9090
- KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181
- KAFKA_MESSAGE_MAX_BYTES=2000000
ports:
- "9090:9090"
depends_on:
- zookeeper
run
$ docker-compose up -d
Creating network "kafka_default" with the default driver
Creating kafka_zookeeper_1 ... done
Creating kafka_kafka_1 ... done
# Nodejs 依赖
$ pnpm add kafkajs -w
理论和概念
在正式开始前 需要有几点说明
不了解 Kafka的一定一定要入门 Kafka 要不然 接下来你就非常的蒙圈!
Inject 的时候 使用的是 ClientKafka 而不是 ClientProxy
Kafka 使用 两个主题 来处理 “请求响应”模式; ClientKafka.send() 这个方法 通过将 “correlation id”、 “reply topic” 和 “ reply partition ” 与请求消息相关联来 发送带有 返回地址的消息。 这需要实例在发送消息之前 ,必须订阅回复主题并分配到至少一个分区。有点晕是吧?说人话就是 必须要 有订阅动作
每个 程序至少 有一个 topic 和 partition,要不然会报错 ;
- Incoming & Outgoing (传入/传出),
Nest 以对象的形式接收传入的 Kafka 消息,其中包含 、 和具有 类型值的属性。然后,Nest 通过将缓冲区转换为字符串来解析这些值。
在发布事件或发送消息时,Nest将在序列化过程之后发送传出的Kafka消息。这发生在传递给ClientKafka的emit()和send()方法的参数上,或从@MessagePattern方法的返回值上。该序列化通过使用JSON.stringify()或toString()原型方法来“字符串化”不是字符串或缓冲区的对象。
4, 发消息的时候 要带有key和 value ,这对于满足共同分区要求非常重要
return {
headers: {
realm
},
key: heroId,
value: items
}
- 命名约定
Kafka 微服务组件在 和 选项上附加了各自角色的描述,以防止 Nest 微服务客户端和服务器组件之间的冲突,
const app = await NestFactory.createMicroservice<MicroserviceOptions>(AppModule, {
transport: Transport.KAFKA,
options: {
client: {
clientId: 'hero', // hero-server
brokers: ['localhost:9092'],
},
consumer: {
groupId: 'hero-consumer' // hero-consumer-server
},
}
});
// 客户端也需要设置
- 关于失败的时候重试
请使用 throw new KafkaRetriableException('...'); 它会进行重试
- 提交偏移量
什么是 偏移量 ?
在 Kafka 中,偏移量(offset)是指分区中每条消息的唯一标识符。它表示了消息在分区中的位置。消费者可以使用偏移量来跟踪它已经消费了哪些消息。如果消费者崩溃或重新启动,它可以从上次提交的偏移量处继续读取数据。
在使用 Kafka 时,提交偏移量至关重要。默认情况下,消息将在特定时间后自动提交。有关更多信息,请访问 KafkaJS 文档。Nest 提供了一种手动提交偏移量的方法,其工作方式类似于 KafkaJS 实现.
const app = await NestFactory.createMicroservice<MicroserviceOptions>(AppModule, {
transport: Transport.KAFKA,
options: {
client: {
brokers: ['localhost:9092'],
},
run: {
autoCommit: false
}
}
});
@EventPattern('user.created')
async handleUserCreated(@Payload() data: IncomingMessage, @Ctx() context: KafkaContext) {
// business logic
const { offset } = context.getMessage();
const partition = context.getPartition();
const topic = context.getTopic();
await this.client.commitOffsets([{ topic, partition, offset }])
}
代码
了解完基础理论之后 我们来看相关的代码
调整协议 (我们新开了一个文件 不能使用原来的,原因请看第二点·没有人订阅程序是会抛出error的· )
~ Service
const app = await NestFactory.createMicroservice<MicroserviceOptions>(
AppModule,
{
transport: Transport.KAFKA,
options: {
client: {
clientId: 'hero',
brokers: ['192.168.101.3:9090'],
},
consumer: {
groupId: 'hero-consumer',
},
},
},
);
await app.listen();
~ Client
ClientsModule.register([
{
name: 'M1_SERVICE',
transport: Transport.KAFKA,
options: {
client: {
clientId: 'hero',
brokers: ['192.168.101.3:9090'],
},
consumer: {
groupId: 'hero-consumer',
},
},
},
]),
其他相关代码例子
~ KafkaService
import * as util from 'util';
import { KafkaMessage } from 'kafkajs';
interface Dragon {
id: number;
name: string;
}
type KillDragonMessage = Omit<KafkaMessage, 'value'> & {
value: Pick<Dragon, 'id'>;
};
// Simplified example: move method implementations to the service ofc
@Controller()
export class KafkaController {
private readonly logger = new Logger(KafkaController.name);
@MessagePattern('hero.kill.dragon')
onKillDragon(
@Payload() message: KillDragonMessage,
@Ctx() context: KafkaContext,
) {
const dragons = [
{ id: 1, name: 'Smaug' },
{ id: 2, name: 'Ancalagon The Black' },
{ id: 3, name: 'Glaurung' },
];
this.logger.log(`[hero.kill.dragon] message = ${util.inspect(message)}`);
// 简单的使用 直接返回 结果
// return dragons;
// 获取 原始信息
// context.getTopic()
// const originalMessage = context.getMessage();
// const partition = context.getPartition();
// const { headers, timestamp } = originalMessage;
// 下面的返回 可以满足 “共同分区要求” key value
return {
headers: {
realm: 'Nest',
},
key: message.value,
value: dragons,
};
}
}
~ KafkaController
import * as util from 'util';
interface Dragon {
id: number;
name: string;
}
@Injectable()
export class KafkaService implements OnModuleInit {
private readonly logger = new Logger(KafkaService.name);
// Dragon IDs from 1 to 3 are available
private readonly minDragons = 1;
private readonly maxDragons = 3;
constructor(@Inject('M1_SERVICE') private readonly client: ClientKafka) {}
async onModuleInit() {
this.client.subscribeToResponseOf('hero.kill.dragon');
// await this.client.connect();
}
private chooseDragonId(): number {
return Math.floor(
Math.random() * (this.maxDragons - this.minDragons + 1) + this.minDragons,
);
}
// 简单的使用
// async killDragon() {
// this.logger.log('Killing the dragon...');
// this.logger.log('Success! Sending message to Kafka...');
// const dragon: Pick<Dragon, 'id'> = { id: this.chooseDragonId() };
// const killedDragon = await lastValueFrom(
// this.client.send<Dragon>('hero.kill.dragon', dragon),
// );
// this.logger.log(`Consumer response: ${util.inspect(killedDragon)}`);
// return `${killedDragon} is dead! The kingdom is saved!`;
// }
// 测试 满足同时分区
async killDragon() {
const value = {
id: 0,
name: 'doc',
heroId: 1,
dragonId: 3,
};
const dragon: Pick<Dragon, 'id'> = value;
const killedDragon = await lastValueFrom(
this.client.send<Dragon>('hero.kill.dragon', dragon),
);
this.logger.log(`Consumer response: ${util.inspect(killedDragon)}`);
return `${killedDragon} is dead! The kingdom is saved!`;
}
}
gRPC
关于gRPC 我相信学习/接触过 Go的同学 颇有感触哈, 简而言之 gRPC 是一个现代、开源、高性能的 RPC 框架,可以在任何环境中运行。注意它上一个 RPC( 远程过程调用 )
如果你想了解更多 ,请看我的另一片golang 文章 link
- 安装 依赖
pnpm add @grpc/grpc-js @grpc/proto-loader -w
- 我们要使用到 .proto 文件 所以要设置 ts的编译项
// nest-cli.json
"compilerOptions": {
"assets": ["**/*.proto"]
}
// tsconfig.json
"assets": ["**/*.proto"],
"watchAssets": true
- 引入和启动 程序
~ Service
const app = await NestFactory.createMicroservice<MicroserviceOptions>(
AppModule,
{
transport: Transport.GRPC,
options: {
package: 'hero', // ['hero', 'hero2']
protoPath: join(__dirname, './hero/hero.proto'), // ['./hero/hero.proto', './hero/hero2.proto']
},
},
);
await app.listen();
~ Client
imports: [
ClientsModule.register([
{
name: 'HERO_PACKAGE',
transport: Transport.GRPC,
options: {
package: 'hero', // ['hero', 'hero2']
protoPath: join(__dirname, './hero/hero.proto'), // ['./hero/hero.proto', './hero/hero2.proto']
},
},
]),
],
- 部分简单的代码
~ Service
export interface Hero {
id: number;
name: string;
}
export interface HeroById {
id: number;
}
interface HeroService {
findOne(data: HeroById): Observable<Hero>;
findMany(upstream: Observable<HeroById>): Observable<Hero>;
}
@Controller()
export class GRPCController {
private readonly items: Hero[] = [
{ id: 1, name: 'John' },
{ id: 2, name: 'Doe' },
];
// 简单的例子
// 对应的就是 HeroService 里的FindOne 方法
// 这两个参数 都是可选项 如果 findOne 和FindOne 一样 (findOne
// 会自动转大驼峰 ,那么可以忽略 FindOne 参数
// 如果 HeroService 和 Class HeroService 一一致 那么这个 HeroService
// 参数也可以省去,但是不建议
@GrpcMethod('HeroService', 'FindOne')
findOne(
data: HeroById,
metadata: Metadata,
call: ServerUnaryCall<any, any>,
): Hero {
return this.items.find(({ id }) => id === data.id);
}
// 我们能不能实现 流 ?
// gRPC 本身支持长期实时连接,通常称为 .流对于聊天、观察或块数据传输等情况很有用
// Nest 有两种 方式实现
// RxJS + handler
@GrpcStreamMethod('HeroService')
findMany(data$: Observable<HeroById>): Observable<Hero> {
const hero$ = new Subject<Hero>();
const onNext = (heroById: HeroById) => {
const item = this.items.find(({ id }) => id === heroById.id);
hero$.next(item);
};
const onComplete = () => hero$.complete();
data$.subscribe({
next: onNext,
complete: onComplete,
});
return hero$.asObservable();
}
}
~ Client
export interface Hero {
id: number;
name: string;
}
export interface HeroById {
id: number;
}
interface HeroService {
findOne(data: HeroById): Observable<Hero>;
findMany(upstream: Observable<HeroById>): Observable<Hero>;
}
@Controller('hero')
export class GRPCController implements OnModuleInit {
private readonly items: Hero[] = [
{ id: 1, name: 'John' },
{ id: 2, name: 'Doe' },
];
private heroService: HeroService;
constructor(@Inject('HERO_PACKAGE') private readonly client: ClientGrpc) {}
onModuleInit() {
this.heroService = this.client.getService<HeroService>('HeroService');
}
@Get()
getHero(): Observable<any> {
// 对应的就是 HeroesService 里的FindOne 方法
// 注意 gRPC 客户端不会发送名称中包含下划线的字段 (要发请设置
//options.loader.keepcase = true
return this.heroService.findOne({ id: 1 });
}
@Get('stream')
getMany() {
const ids$ = new ReplaySubject<HeroById>();
ids$.next({ id: 1 });
ids$.next({ id: 2 });
ids$.complete();
const stream = this.heroService.findMany(ids$.asObservable());
return stream.pipe(toArray());
}
}
关于其它的操作
有关Error的操作
大部分的情况下 你只需呀记住一点 微服务应该抛出 RpcException 就差不多了,当然你可以在自定一些需要的 自定义的 Filter, 比如下面的样子
// 完全自己写 Filter
@Catch(RpcException)
export class ExceptionFilter implements RpcExceptionFilter<RpcException> {
// 注意 这个catch 必须返回一个 能够被 catch 的 Observable
catch(exception: RpcException, host: ArgumentsHost): Observable<any> {
return throwError(() => exception.getError());
}
}
// 仅仅是 扩展Nest自带的 Filter
@Catch()
export class AllExceptionsFilter extends BaseRpcExceptionFilter {
catch(exception: any, host: ArgumentsHost) {
return super.catch(exception, host);
}
}
// 使用的话 也非常的简单 直接+ 就好了, 与其它的 Filter 一样也可以用于全局/controller 级别,但是如果是 混合应用 那么全局的将不会生效
// 混合应用 https://docs.nestjs.com/faq/hybrid-application
@UseFilters(new ExceptionFilter())
@GrpcMethod('HeroService', 'FindOne')
findOne(
data: HeroById,
metadata: Metadata,
call: ServerUnaryCall<any, any>,
): Hero {
return this.items.find(({ id }) => id === data.id);
}
Pip
大部分的情况下 你只需呀记住一点 如果你要throw error 微服务应该抛出 RpcException , Pip 的所有概念 都和 原来的Nest 既有的保持一致 没有什么区别
@UsePipes(new ValidationPipe())
@MessagePattern({ cmd: 'sum' })
accumulate(data: number[]): number {
return (data || []).reduce((a, b) => a + b);
}
Guard
大部分的情况下 你只需呀记住一点 如果你要throw error 微服务应该抛出 RpcException ,其他的和 Nest 既有的Guard概念和用法保持一致
@UseGuards(AuthGuard)
@MessagePattern({ cmd: 'sum' })
accumulate(data: number[]): number {
return (data || []).reduce((a, b) => a + b);
}
Interceptors
大部分的情况下 你只需呀记住一点 如果你要throw error 微服务应该抛出 RpcException。其他的和 Nest 既有的 Interceptors 概念和用法保持一致
@UseInterceptors(new TransformInterceptor())
@MessagePattern({ cmd: 'sum' })
accumulate(data: number[]): number {
return (data || []).reduce((a, b) => a + b);
}
自定义 Transport(以及高级操作)
经过了 上面的诸多学习 我们看到了非常多的 由Nest提供开箱即用的 Transport,如果你有自定义的需求。请直接看这两篇优秀的文章(我就不翻译了)
这些文章的作者 都是John 他 Nest 的核心作者之一
- 这片文章,详细的说明了 Nest 底层的一些实现, 以及如何处理 自定义的序列化和反序列化
- 这篇是完全的自定义 Transport