Skip to content

微服务

基础介绍(多个NestJS 程序之间的调用

介绍一下 多个Nest程序之间的调用方式 和实现 为了演示 功能,我们需要调整一下 程序,把它变成一个 monorepo 工程 具体到底如何做 基于other_tech 我们新 拉一个分支 命名为 monorepo , 具体的集成方法 请参考为的 另一个 github参考 哪里有文章和例子 https://github.com/BM-laoli/morerepo-test (有可能learn 这种方式已经过时,请参考 )https://juejin.cn/post/6964328103447363614#comment 来做pnpm下的 monorepo

Pnpm下的monorepo工程改造

参考文章 https://juejin.cn/post/6964328103447363614#comment

注意 以下内容 ,请不要直接拿去用,我仅仅做了演示为了图方便,在nest中如果你要实施 monorepo 请看 nest官方文档,官方文档提供了 一个适用 与Nest的方案 https://docs.nestjs.com/cli/monorepo. 当然你如果 想一意孤行 ,那么 哈哈哈 请参考下面的文档

  1. 新建文件夹 (子项目)

packages/m1 这个文件夹下📁 下就有一个新的Nestjs 项目了

shell
├── src
│   ├── app.controller.ts
│   ├── app.module.ts
│   ├── app.service.ts
│   └── main.ts
├── package.json
├── nest-cli.json
├── tsconfig.build.json
├── tsconfig.json
└── yarn.lock

注意改一下去掉 private,如果你需要 请改一下包名(大部分情况下 你并不需要去改 因为我们是两个 独立的 service, 除非你要做 lib,若做lib Nest官方也有详细说明

json
{
  "name": "nest-m1",
  "version": "1.0.0",
  "description": "Nest TypeScript starter repository",
  "license": "MIT",
  // 去掉 "private": true,
}
  1. pnpm 初始化
shell
# 初始化
$ pnpm init -y

新建一个 pnpm-workspace.yaml

yaml
packages:
  # all packages in subdirs of packages/ and components/
  - 'packages/**'
  1. 依赖安装
shell
# root 目录(我们原来从 另一个branch 过来的 一样install就好了,既得install 前 把 node_mmodules 先干掉 )
$ pnpm install -w  # (意思是全局依赖)

# package/m1 目录
$ pnpm install

然后 按照相同 的逻辑 我们再去 cv 文件,新建一个 client 文件夹 这样就算搞定了!

Install

不管是 service / client 都需要使用这个依赖,注意这里说的 service 和client 是 指 微服务 和 请求他的其他服务(相对的 service | client

shell
# $  npm i --save @nestjs/microservices
$ pnpm add @nestjs/microservices -w

Service 端

在 main 引入

ts
  const app = await NestFactory.createMicroservice<MicroserviceOptions>(
    AppModule,
    {
      transport: Transport.TCP,
      // 默认情况Nest 使用TCP,
      // options: {
      // host
      // port
      // retryAttempts 重试消息次数 默认 =0
      // retryDelay 重试之间的间隔(ms)
      // },
    },
  );

在 controller 中写些逻辑

需要特别强调的是 在Nest中 微服务通过模式识别消息和事件

  • 请求-响应
  • 基于事件
ts
import {
  Controller,
  Get,
  Inject,
  RequestTimeoutException,
  Scope,
} from '@nestjs/common';
import {
  CONTEXT,
  Ctx,
  EventPattern,
  MessagePattern,
  NatsContext,
  Payload,
  RequestContext,
} from '@nestjs/microservices';
import { from, fromEvent, Observable } from 'rxjs';
import { AppService } from './app.service';

@Controller({
  scope: Scope.REQUEST,
})
export class AppController {
  constructor(
    private readonly appService: AppService,
    @Inject(CONTEXT) private ctx: RequestContext, // scope 的时候
  ) {}

  // 在Nest中有两种 微服务的模式来识别消息和事件

  // 下面就是 一个 (请求-响应)
  // 注意哈 这个decorator 只在 controller 中用
  @MessagePattern({ cmd: 'sum' })
  accumulate(data: number[]): number {
    return (data || []).reduce((a, b) => a + b);
  }

  // 异步/Observable  也是支持的
  @MessagePattern({ cmd: 'sumSync' })
  accumulateSync(data: number[]): Promise<number> {
    return Promise.resolve((data || []).reduce((a, b) => a + b));
  }

  @MessagePattern({ cmd: 'sumObservable' })
  accumulateObservable(data: number[]): Observable<number> {
    return from([1, 2, 3, 4]);
  }

  // 以上是基于 请求+响应 的 下面咱们 来观察一下 基于 事件的
  @EventPattern('user_created')
  async cuser(data: any) {
    console.log(this.ctx.pattern);
    return 1;
  }
  // 我们可以为一个 事件注册多个处理程序 他们会依次触发
  @EventPattern('user_created')
  async cuser2(data: any) {
    return 2;
  }
  // 如果你需要一些 请求的详细信息 可以传递一个context
  @MessagePattern('time.use.*') // 通配符
  getDate(@Payload() data: number[], @Ctx() context: NatsContext) {
    return new Date().toLocaleDateString();
  }
}

Client 端

关于client 的集成相对简单

注入+使用就好了

ts
@Module({
  imports: [
    ClientsModule.register([
      {
        name: 'M1_SERVICE',
        transport: Transport.TCP,
      },
    ]),
  ],
  controllers: [AppController, AppController2],
  providers: [AppService],
})

使用

ts

@Controller('m1')
export class AppController {
  constructor(
    private readonly appService: AppService,
    @Inject('M1_SERVICE') private M1_client: ClientProxy,
  ) {}

  @Get()
  // t1(@Res() res: Response) {
  t1() {
    // this.M1_client.
    // return this.accumulate();
    // this.accumulate().subscribe((it) => {
    //   res.status(HttpStatus.OK).send(`${it}--`);
    // });
    // return this.accumulateSync();
    // return this.accumulateObservable();
    // return this.publish();
    return this.publish2();
  }

  // 请求响应模式
  accumulate() {
    const pattern = { cmd: 'sum' };
    const payload = [1, 2, 3];
    // 注意这个默认都是返回一个 "冷Observable" 至于什么是热什么是冷 请你去rxjs官方一探究竟
    return this.M1_client.send<number[]>(pattern, payload);
  }

  async accumulateSync() {
    const pattern = { cmd: 'sumSync' };
    const payload = [1, 2, 3, 4];

    const value = await lastValueFrom(
      this.M1_client.send<number[]>(pattern, payload),
    );

    return value;
  }

  accumulateObservable() {
    const pattern = { cmd: 'sumObservable' };
    const payload = [1, 2, 3, 4, 5];

    // 模拟操作 可以使用 pipe(timeout(5000)) 这个操作符
    return this.M1_client.send<number[]>(pattern, payload);
  }

  // 发送事件 注意啊这个返回的是 一个热的Observable,至于什么是热什么是冷 请你去rxjs官方一探究竟
  async publish() {
    this.M1_client.emit<number>('user_created', 666);
  }

  async publish2() {
    this.M1_client.emit<number>('time.use.*', 2123);
  }
}

你也许需要注意的一个点是 Scope 的情况

在 Nest 中,几乎所有内容都在传入请求之间共享,这可能会发生一些意想不到的问题。我们有一个数据库的连接池,具有全局状态的单例服务等。请记住,Node.js 不遵循请求/响应多线程无状态模型,其中每个请求都由单独的线程处理。因此,使用单一实例对于我们的应用程序是完全安全的。 但多数情况下 我们会有一些缓存策略 导致程序表现出意想不到的行为 故我们把模式改了,关键代码就是 scope 这个参数

ts

@Controller({
  path: 'm2',
  scope: Scope.REQUEST,
})
export class AppController2 {
  constructor(
    @Inject('M1_SERVICE') private M1_client: ClientProxy,
    @Inject(CONTEXT) private ctx: RequestContext, // scope 的时候
  ) {}

  @Get()
  t1() {
    // this.M1_client.
  }
  // 请求响应模式
  accumulate() {
    const pattern = { cmd: 'sum' };
    const payload = [1, 2, 3];
    // 注意这个默认都是返回一个 "冷Observable"
    this.ctx.pattern;
    return this.M1_client.send<number[]>(pattern, payload);
  }
}

集成其它的工具🔧

接下来的内容 主要增强 上述的 微服务能力;可以看出来,上述的设计比较简单,没有考虑到 多机器 多名称的情况,而且我们 也没有配置 port 如果是 k8s 等多机器部署,那么服务发现该如何处理,服务发现 等等问题..... 都没有讲到, 下面的工具 旨在完善 ,微服务间存在的问题

Redis

Transport.REDIS并利用了 Redis 的发布/订阅功能, 设置了一套发布/订阅的消息传递范例,

当然特性(优点与不足) 要讲一下

  1. 无法明确最终那个订阅者会收到消息(无法保证消息或事件将由至少一个服务处理)
  2. 每个微服务可以订阅任意数量的通道,一次性可订阅多个通道
  3. 通过通道交换的消息是 即发即弃, 如果没有人用 那会被删除掉
  4. 一条消息 可以被多个 订阅者订阅和接收

img

基础的用法和前文提到的 是一模一样的,不过要注意 不同的类型(我是指 ts 定义的类型) 提供的功能不一样

ts
~ 详细见 MicroserviceOptions .d.ts 定义
/**
 * @publicApi
 */
export interface RedisOptions {
    transport?: Transport.REDIS;
    options?: {
        host?: string;
        port?: number;
        retryAttempts?: number;
        retryDelay?: number;
        serializer?: Serializer;
        deserializer?: Deserializer;
    } & IORedisOptions;
}

为了演示多个 microservices 之间的同时订阅,我们把 M1(在上节中它作为一个microservices ) , 再cv一份 重命名为 m2文件夹, 然后run

安装必要的依赖

shell
npm i --save ioredis
pnpm add ioredis -w

修改 service 并且加一个 订阅器

ts
  const app = await NestFactory.createMicroservice<MicroserviceOptions>(
    AppModule,
    {
      transport: Transport.REDIS,
      options: {
        host: 'localhost',
        port: 6379,
      },
    },
  );

++++
  // redis
  @MessagePattern('notifications')
  getNotifications(@Payload() data: number[], @Ctx() context: RedisContext) {
    console.log(`Channel: ${context.getChannel()}`); // notifications
    console.log(222);
  }

修改 client 换一个 事件发布

ts
  ClientsModule.register([
      {
        name: 'M1_SERVICE',
        transport: Transport.REDIS,
        options: {
          host: 'localhost',
          port: 6379,
        },
      },
    ]),
  ++++
  // redis
  async publishToRedis() {
    this.M1_client.emit<number>('notifications', 2123);
  }

最后说一下它的应用场景

在微服务架构中 Redis通常用作消息队列,适合处理高并发的消息传递; 但是Redis不是专门设计用来处理消息传递的,在需要保证消息顺序的前提下 redis 也许不是最好的选择。 下面的几种场景可以使用

应用场景des
异步任务处理将需要异步执行的任务放入Redis消息队列中,处理任务的工作进程从队列中读取任务,完成后将结果写回Redis或其他存储系统中。这种方式可以避免任务处理过程中阻塞主线程,提高系统的并发处理能力。
实时消息推送通过Redis消息队列将实时消息推送给客户端,例如在线聊天、股票行情等。消息发布者将消息写入Redis消息队列中,订阅者通过订阅Redis消息频道或使用Redis的PUBLISH命令来接收消息。
日志处理将应用产生的日志信息写入Redis消息队列中,使用工作进程从队列中读取日志并进行处理,例如过滤、分析、存储等。这种方式可以避免日志处理过程中阻塞主线程,提高系统的性能和可靠性。

MQTT

MQTT 是一种开源的轻量级消息传递协 . 基于 MQTT 构建的通信系统由 发布服务器、代理和一个或多个客户端组成。它专为受限设备和低带宽、高延迟或不可靠的网络而设计。

总体来说 它成本低 而且效果好 延迟低,非常适合用在 物联网, 传感器 和服务器的通信

首先我们需要构建这样的一套基础设施 ,我们使用docker 去构建 ,再次强调因为MQTT是一个协议,所以我们需要搭建一个支持MQTT协议的服务器,使服务端和客户端能够通过这个MQTT服务器(broker)进行消息转发、通信

EMQ X (Erlang/Enterprise/Elastic MQTT Broker) 是基于 Erlang/OTP 平台开发的开源物联网 MQTT 消息服务器,它是目前比较好 MQTT 服务产品,多了不说了 自己去search吧

shell
$ docker pull emqx
$ docker run -d --name emqx -p 1883:1883 -p 18083:18083 emqx/emqx

# 完事之后 管理端 在 :18083  admin/public 中登录管理 

# 在工程 root 目录下安装 mqtt 依赖
$  pnpm add mqtt -w

接下来 开始集成, 把service 和 client的协议改了

ts
~ service
  const app = await NestFactory.createMicroservice<MicroserviceOptions>(
    AppModule,
    {
      transport: Transport.MQTT,
      options: {
        url: 'mqtt://localhost:1883',
      },
    },
  );

~ client
    ClientsModule.register([
      {
        name: 'M1_SERVICE',
        transport: Transport.MQTT,
        options: {
          url: 'mqtt://localhost:1883',
         
        },
      },
    ]),

然后我们 多加一些发布和订阅的代码( 实际上和 之前的那些类型,代码都是一样的版式 )

ts
~ service
  // MQTT
  // @MessagePattern('sensors/+/temperature/+') 支持通配符
  @MessagePattern('notificationsMQTT')
  getNotificationsMQTT(@Payload() data: number[], @Ctx() context: MqttContext) {
    console.log(`Topic: ${context.getTopic()}`); // notifications
    console.log(222);

    // 如果你需要访问 原始的mqtt数据包 请使用
    console.log(context.getPacket());
  }

  // MQTT2
  @MessagePattern('replace-emoji')
  replaceEmoji(@Payload() data: string, @Ctx() context: MqttContext): string {
    const {
      properties: { userProperties },
      // 如果你这里 是一个undefined 记得去给你的 配置 加一个 协议版本  protocolVersion: 5,
      // issues  https://github.com/nestjs/nest/issues/10016
    } = context.getPacket();

    console.log('2');

    return userProperties['x-version'] === '1.0.0' ? '🐱' : '🐈';
  }

~ Client
// MQTT
  publishToMQTT() {
    // this.M1_client.emit<number>('notificationsMQTT', {
    //   name: 6666,
    // });

    // 配置 消息选项
    const userProperties = { 'x-version': '1.0.0' };
    const record = new MqttRecordBuilder(':cat:')
      .setProperties({ userProperties })
      .setQoS(1)
      .build();
    console.log(record);

    return this.M1_client.send('replace-emoji', record);
  }

NATS

这个东西 也是一个消息传递系统,目前最新版本 使用Go开发 性能非常的NB,它是一个分布式的消息中间件

同样的代理 它也是有直接的消息协议,请先安装 这个系统

shell
$ docker pull nats
$ docker run -d --name nats-main -p 4222:4222 -p 8222:8222 nats

# 安装ndoejs 依赖
$ pnpm add nats -w

特别要说明的: Nest Transport.NATS 不使用 NATS 内置的 请求-响应 传输机制,无论响应的Service在什么地方,它都会动态的返回 到 client程序.

但对于 基于事件 的传输模式,Nest 使用NATS 内置的机制,如果发布一个基于主题的message时,订阅这个主题的 都会收到此消息( 这种一对多 被称为 扇出)

然后 NATS 提供了一个内置的 分布式队列 ,如果要使用情况 下面的代码

ts
~ Service
  const app = await NestFactory.createMicroservice<MicroserviceOptions>(
    AppModule,
    {
      transport: Transport.NATS,
      options: {
        servers: ['nats://localhost:4222'],
      },
    },
  );

~ Client
  ClientsModule.register([
      {
        name: 'M1_SERVICE',
        transport: Transport.NATS,
        options: {
          servers: ['nats://localhost:4222'],
        },
      },
    ]),

然后就是多加几个 订阅 和publish 方法

ts
~ Service
  // NTAS
  @MessagePattern('notificationsNATS')
  notificationsNATS(@Payload() data: number[], @Ctx() context: NatsContext) {
    console.log(`Subject: ${context.getSubject()}`);
  }

  // NATS 可以设置 标头
  @MessagePattern('replace-emoji-NATS')
  replaceEmojiNATS(
    @Payload() data: string,
    @Ctx() context: NatsContext,
  ): string {
    const headers = context.getHeaders();
    // 这个返回时一个是get 不要相信官方给写的 headers['x-version']
    return headers.get('x-version') === '1.0.0' ? '🐱' : '🐈';
  }

~  Client
import * as nats from 'nats';

  // NAST
  publishToNAST() {
    // this.M1_client.emit<number>('notificationsNATS', {
    //   name: 6666,
    // });

    // 配置 消息选项
    const headers = nats.headers();
    headers.set('x-version', '1.0.0');
    const record = new NatsRecordBuilder(':cat:').setHeaders(headers).build();

    this.M1_client.send('replace-emoji-NATS', record).subscribe((it) => {
      console.log(it);
    });
  }

RabbitMQ

RabbitMQ是一个开源的消息代理,它支持多种消息协议。

还是一样 我们先安装它

shell
$ docker pull rabbitmq:3.11-management
$ docker run -d --hostname my-rabbit --name some-rabbit -p 5672:5672 -p 15672:15672 rabbitmq:3.11-management

# 安装nodejs 依赖
$ pnpm install amqplib amqp-connection-manager -w

:15672 是它的管理prot 默认用户名是 guest , guest

大部分使用方法和方法和 前面的消息系统使用方式是 一样的,RabbitMQ配置上有 比较大的区别

选项作用
urls连接网址
queue服务器将侦听的队列名称
prefetchCount设置通道的预取计数
isGlobalPrefetchCount启用每通道预取
noAck如果启用手动确认模式false
queueOptions其他队列选项(在此处阅读更多内容))
socketOptions其他套接字选项(在此处阅读更多内容))
headers要与每条消息一起发送的标头

我们先把 连接方式改了

ts
~ Service
  const app = await NestFactory.createMicroservice<MicroserviceOptions>(
    AppModule,
    {
      transport: Transport.RMQ,
      options: {
        urls: ['amqp://localhost:5672'],
        queue: 'cats_queue',
        queueOptions: {
          durable: false,
          // 在RabbitMQ中,队列可以被声明为持久化(durable)或非持久化(durable: false)。当队列被声明为持久化时,
          // 它将在RabbitMQ服务器重启后仍然存在。但是,如果队列被声明为非持久化,那么它将在服务器重启后消失。
          // 因此,如果您希望您的队列在服务器重启后仍然存在,您应该将其声明为持久化。
        },
      },
    },
  );

~ Client
    ClientsModule.register([
      {
        name: 'M1_SERVICE',
        transport: Transport.RMQ,
        options: {
          urls: ['amqp://localhost:5672'],
          queue: 'cats_queue',
          queueOptions: {
            durable: false,
          },
        },
      },
    ]),

然后我们简单的展示它的使用

ts
~ Service
 // RMQ
  @MessagePattern('notificationsRMQ')
  getNotificationsRMQ(@Payload() data: number[], @Ctx() context: RmqContext) {
    console.log(`Pattern: ${context.getPattern()}`);
    console.log(`Pattern: ${context.getMessage()}`); // 获取原始数据
    // 要检索对 RabbitMQ 通道的引用 请参考
    console.log(context.getChannelRef());
    console.log(data);

    // 如果 我们设置了 noAck: false,需要 手动的check一下
    context.getChannelRef().ack(context.getMessage());
    return '666';
  }

  // RMQ 同样的也支持 设置头信息等操作
  @MessagePattern('replace-emoji-RMQ')
  replaceEmojiRMQ(@Payload() data: string, @Ctx() context: RmqContext): string {
    const {
      properties: { headers },
    } = context.getMessage();
    return headers['x-version'] === '1.0.0' ? '🐱' : '🐈';
  }

~ Client
  // RMQ
  publishToRMQ() {
    this.M1_client.send('notificationsRMQ', {
      name: 'RMQ',
    }).subscribe((it) => {
      console.log(it);
    });
    // const message = ':cat:';
    // const record = new RmqRecordBuilder(message)
    //   .setOptions({
    //     headers: {
    //       ['x-version']: '1.0.0',
    //     },
    //     priority: 3,
    //   })
    //   .build();

    // this.M1_client.send('replace-emoji-RMQ', record).subscribe((it) => {
    //   console.log(it);
    // });
  }

Kafka

警告 此小节内容完全超纲 200%,如果你要尝试 ,请做好心理准备,和前期的知识准备之后 再深入学习。 Kafka 是一个开源的分布式流处理平台,它能够实现实时的事件驱动应用程序开发。具有高吞吐量、低延迟的等特点 与Apache Storm和Spark很好地集成在一起,用于实时流数据分析。

下面的它的关键的三个功能

  • 发布和订阅记录流,类似于消息队列或企业邮件系统。
  • 以容错持久的方式存储记录流。
  • 在记录流发生时对其进行处理。

先在 docker 上构建一个 kafka 基础设施(这里是踩坑的一个地方 Docker 到底如何部署 Kafka

docker-compose.yml

yml
version: "3.0"
services:
    zookeeper:
        image: zookeeper:3.5.5
        restart: always
        container_name: zookeeper
        ports:
            - "2181:2181"
        expose:
            - "2181"
        environment:
            - ZOO_MY_ID=1
    kafka:
        image: wurstmeister/kafka:2.12-2.2.1
        restart: always
        container_name: kafka
        environment:
            - KAFKA_BROKER_ID=1
            - KAFKA_LISTENERS=PLAINTEXT://kafka:9090
            - KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181
            - KAFKA_MESSAGE_MAX_BYTES=2000000
        ports:
            - "9090:9090"
        depends_on:
            - zookeeper

run

shell
$ docker-compose up -d
Creating network "kafka_default" with the default driver
Creating kafka_zookeeper_1 ... done
Creating kafka_kafka_1 ... done

# Nodejs 依赖
$ pnpm add kafkajs -w

理论和概念

在正式开始前 需要有几点说明

不了解 Kafka的一定一定要入门 Kafka 要不然 接下来你就非常的蒙圈!

  1. Inject 的时候 使用的是 ClientKafka 而不是 ClientProxy

  2. Kafka 使用 两个主题 来处理 “请求响应”模式; ClientKafka.send() 这个方法 通过将 “correlation id”、 “reply topic” 和 “ reply partition ” 与请求消息相关联来 发送带有 返回地址的消息。 这需要实例在发送消息之前 ,必须订阅回复主题并分配到至少一个分区。有点晕是吧?说人话就是 必须要 有订阅动作

每个 程序至少 有一个 topic 和 partition,要不然会报错 ;

  1. Incoming & Outgoing (传入/传出),

Nest 以对象的形式接收传入的 Kafka 消息,其中包含 、 和具有 类型值的属性。然后,Nest 通过将缓冲区转换为字符串来解析这些值。

在发布事件或发送消息时,Nest将在序列化过程之后发送传出的Kafka消息。这发生在传递给ClientKafka的emit()和send()方法的参数上,或从@MessagePattern方法的返回值上。该序列化通过使用JSON.stringify()或toString()原型方法来“字符串化”不是字符串或缓冲区的对象。

4, 发消息的时候 要带有key和 value ,这对于满足共同分区要求非常重要

ts
return {
      headers: {
        realm
      },
      key: heroId,
      value: items
    }
  1. 命名约定

Kafka 微服务组件在 和 选项上附加了各自角色的描述,以防止 Nest 微服务客户端和服务器组件之间的冲突,

ts
const app = await NestFactory.createMicroservice<MicroserviceOptions>(AppModule, {
  transport: Transport.KAFKA,
  options: {
    client: {
      clientId: 'hero', // hero-server
      brokers: ['localhost:9092'],
    },
    consumer: {
      groupId: 'hero-consumer' // hero-consumer-server
    },
  }
});
// 客户端也需要设置
  1. 关于失败的时候重试

请使用 throw new KafkaRetriableException('...'); 它会进行重试

  1. 提交偏移量

什么是 偏移量 ?

在 Kafka 中,偏移量(offset)是指分区中每条消息的唯一标识符。它表示了消息在分区中的位置。消费者可以使用偏移量来跟踪它已经消费了哪些消息。如果消费者崩溃或重新启动,它可以从上次提交的偏移量处继续读取数据。

在使用 Kafka 时,提交偏移量至关重要。默认情况下,消息将在特定时间后自动提交。有关更多信息,请访问 KafkaJS 文档。Nest 提供了一种手动提交偏移量的方法,其工作方式类似于 KafkaJS 实现.

ts
const app = await NestFactory.createMicroservice<MicroserviceOptions>(AppModule, {
  transport: Transport.KAFKA,
  options: {
    client: {
      brokers: ['localhost:9092'],
    },
    run: {
      autoCommit: false
    }
  }
});

@EventPattern('user.created')
async handleUserCreated(@Payload() data: IncomingMessage, @Ctx() context: KafkaContext) {
  // business logic
  
  const { offset } = context.getMessage();
  const partition = context.getPartition();
  const topic = context.getTopic();
  await this.client.commitOffsets([{ topic, partition, offset }])
}

代码

了解完基础理论之后 我们来看相关的代码

调整协议 (我们新开了一个文件 不能使用原来的,原因请看第二点·没有人订阅程序是会抛出error的· )

ts
~ Service 
 const app = await NestFactory.createMicroservice<MicroserviceOptions>(
    AppModule,
    {
      transport: Transport.KAFKA,
      options: {
        client: {
          clientId: 'hero',
          brokers: ['192.168.101.3:9090'],
        },
        consumer: {
          groupId: 'hero-consumer',
        },
      },
    },
  );
  await app.listen();

~ Client
    ClientsModule.register([
      {
        name: 'M1_SERVICE',
        transport: Transport.KAFKA,
        options: {
          client: {
            clientId: 'hero',
            brokers: ['192.168.101.3:9090'],
          },
          consumer: {
            groupId: 'hero-consumer',
          },
        },
      },
    ]),

其他相关代码例子

ts
~ KafkaService
  import * as util from 'util';
  import { KafkaMessage } from 'kafkajs';

  interface Dragon {
    id: number;
    name: string;
  }

  type KillDragonMessage = Omit<KafkaMessage, 'value'> & {
    value: Pick<Dragon, 'id'>;
  };

  // Simplified example: move method implementations to the service ofc
  @Controller()
  export class KafkaController {
    private readonly logger = new Logger(KafkaController.name);

    @MessagePattern('hero.kill.dragon')
    onKillDragon(
      @Payload() message: KillDragonMessage,
      @Ctx() context: KafkaContext,
    ) {
      const dragons = [
        { id: 1, name: 'Smaug' },
        { id: 2, name: 'Ancalagon The Black' },
        { id: 3, name: 'Glaurung' },
      ];
      this.logger.log(`[hero.kill.dragon] message = ${util.inspect(message)}`);

      // 简单的使用 直接返回 结果
      // return dragons;

      // 获取 原始信息
      // context.getTopic()
      // const originalMessage = context.getMessage();
      // const partition = context.getPartition();
      // const { headers, timestamp } = originalMessage;

      // 下面的返回 可以满足 “共同分区要求” key value
      return {
        headers: {
          realm: 'Nest',
        },
        key: message.value,
        value: dragons,
      };
    }
  }

  ~ KafkaController
  import * as util from 'util';

  interface Dragon {
    id: number;
    name: string;
  }

  @Injectable()
  export class KafkaService implements OnModuleInit {
    private readonly logger = new Logger(KafkaService.name);

    // Dragon IDs from 1 to 3 are available
    private readonly minDragons = 1;
    private readonly maxDragons = 3;

    constructor(@Inject('M1_SERVICE') private readonly client: ClientKafka) {}

    async onModuleInit() {
      this.client.subscribeToResponseOf('hero.kill.dragon');

      // await this.client.connect();
    }

    private chooseDragonId(): number {
      return Math.floor(
        Math.random() * (this.maxDragons - this.minDragons + 1) + this.minDragons,
      );
    }

    // 简单的使用
    // async killDragon() {
    //   this.logger.log('Killing the dragon...');
    //   this.logger.log('Success! Sending message to Kafka...');

    //   const dragon: Pick<Dragon, 'id'> = { id: this.chooseDragonId() };
    //   const killedDragon = await lastValueFrom(
    //     this.client.send<Dragon>('hero.kill.dragon', dragon),
    //   );

    //   this.logger.log(`Consumer response: ${util.inspect(killedDragon)}`);

    //   return `${killedDragon} is dead! The kingdom is saved!`;
    // }

    // 测试 满足同时分区
    async killDragon() {
      const value = {
        id: 0,
        name: 'doc',
        heroId: 1,
        dragonId: 3,
      };

      const dragon: Pick<Dragon, 'id'> = value;
      const killedDragon = await lastValueFrom(
        this.client.send<Dragon>('hero.kill.dragon', dragon),
      );

      this.logger.log(`Consumer response: ${util.inspect(killedDragon)}`);

      return `${killedDragon} is dead! The kingdom is saved!`;
    }
  }

gRPC

关于gRPC 我相信学习/接触过 Go的同学 颇有感触哈, 简而言之 gRPC 是一个现代、开源、高性能的 RPC 框架,可以在任何环境中运行。注意它上一个 RPC( 远程过程调用 )

如果你想了解更多 ,请看我的另一片golang 文章 link

  1. 安装 依赖
shell
pnpm add @grpc/grpc-js @grpc/proto-loader -w
  1. 我们要使用到 .proto 文件 所以要设置 ts的编译项
ts
// nest-cli.json
"compilerOptions": {
    "assets": ["**/*.proto"]
  }

// tsconfig.json
"assets": ["**/*.proto"],
"watchAssets": true
  1. 引入和启动 程序
ts
~ Service
  const app = await NestFactory.createMicroservice<MicroserviceOptions>(
    AppModule,
    {
      transport: Transport.GRPC,
      options: {
        package: 'hero', // ['hero', 'hero2']
        protoPath: join(__dirname, './hero/hero.proto'), // ['./hero/hero.proto', './hero/hero2.proto']
      },
    },
  );
  await app.listen();

~ Client
  imports: [
    ClientsModule.register([
      {
        name: 'HERO_PACKAGE',
        transport: Transport.GRPC,
        options: {
          package: 'hero', // ['hero', 'hero2']
          protoPath: join(__dirname, './hero/hero.proto'), // ['./hero/hero.proto', './hero/hero2.proto']
        },
      },
    ]),
  ],
  1. 部分简单的代码
ts
~ Service

export interface Hero {
  id: number;
  name: string;
}

export interface HeroById {
  id: number;
}

interface HeroService {
  findOne(data: HeroById): Observable<Hero>;
  findMany(upstream: Observable<HeroById>): Observable<Hero>;
}

@Controller()
export class GRPCController {
  private readonly items: Hero[] = [
    { id: 1, name: 'John' },
    { id: 2, name: 'Doe' },
  ];

  // 简单的例子
  // 对应的就是 HeroService 里的FindOne 方法
  // 这两个参数 都是可选项 如果 findOne 和FindOne 一样 (findOne
  // 会自动转大驼峰 ,那么可以忽略 FindOne 参数
  // 如果 HeroService 和 Class HeroService 一一致 那么这个 HeroService
  // 参数也可以省去,但是不建议

  @GrpcMethod('HeroService', 'FindOne')
  findOne(
    data: HeroById,
    metadata: Metadata,
    call: ServerUnaryCall<any, any>,
  ): Hero {
    return this.items.find(({ id }) => id === data.id);
  }

  // 我们能不能实现 流 ?
  // gRPC 本身支持长期实时连接,通常称为 .流对于聊天、观察或块数据传输等情况很有用
  // Nest 有两种 方式实现

  // RxJS + handler
  @GrpcStreamMethod('HeroService')
  findMany(data$: Observable<HeroById>): Observable<Hero> {
    const hero$ = new Subject<Hero>();

    const onNext = (heroById: HeroById) => {
      const item = this.items.find(({ id }) => id === heroById.id);
      hero$.next(item);
    };
    const onComplete = () => hero$.complete();
    data$.subscribe({
      next: onNext,
      complete: onComplete,
    });

    return hero$.asObservable();
  }
}


~ Client

export interface Hero {
  id: number;
  name: string;
}

export interface HeroById {
  id: number;
}

interface HeroService {
  findOne(data: HeroById): Observable<Hero>;
  findMany(upstream: Observable<HeroById>): Observable<Hero>;
}

@Controller('hero')
export class GRPCController implements OnModuleInit {
  private readonly items: Hero[] = [
    { id: 1, name: 'John' },
    { id: 2, name: 'Doe' },
  ];
  private heroService: HeroService;
  constructor(@Inject('HERO_PACKAGE') private readonly client: ClientGrpc) {}

  onModuleInit() {
    this.heroService = this.client.getService<HeroService>('HeroService');
  }

  @Get()
  getHero(): Observable<any> {
    // 对应的就是 HeroesService 里的FindOne 方法
    // 注意 gRPC 客户端不会发送名称中包含下划线的字段 (要发请设置
    //options.loader.keepcase = true
    return this.heroService.findOne({ id: 1 });
  }

  @Get('stream')
  getMany() {
    const ids$ = new ReplaySubject<HeroById>();
    ids$.next({ id: 1 });
    ids$.next({ id: 2 });
    ids$.complete();

    const stream = this.heroService.findMany(ids$.asObservable());
    return stream.pipe(toArray());
  }
}

关于其它的操作

有关Error的操作

大部分的情况下 你只需呀记住一点 微服务应该抛出 RpcException 就差不多了,当然你可以在自定一些需要的 自定义的 Filter, 比如下面的样子

ts
// 完全自己写 Filter 
@Catch(RpcException)
export class ExceptionFilter implements RpcExceptionFilter<RpcException> {

  // 注意 这个catch 必须返回一个 能够被 catch 的 Observable
  catch(exception: RpcException, host: ArgumentsHost): Observable<any> {
    return throwError(() => exception.getError());
  }
}

// 仅仅是 扩展Nest自带的 Filter
@Catch()
export class AllExceptionsFilter extends BaseRpcExceptionFilter {
  catch(exception: any, host: ArgumentsHost) {
    return super.catch(exception, host);
  }
}

// 使用的话 也非常的简单 直接+ 就好了, 与其它的 Filter 一样也可以用于全局/controller 级别,但是如果是 混合应用 那么全局的将不会生效
// 混合应用 https://docs.nestjs.com/faq/hybrid-application
@UseFilters(new ExceptionFilter())
@GrpcMethod('HeroService', 'FindOne')
  findOne(
    data: HeroById,
    metadata: Metadata,
    call: ServerUnaryCall<any, any>,
  ): Hero {
    return this.items.find(({ id }) => id === data.id);
  }

Pip

大部分的情况下 你只需呀记住一点 如果你要throw error 微服务应该抛出 RpcException , Pip 的所有概念 都和 原来的Nest 既有的保持一致 没有什么区别

ts
@UsePipes(new ValidationPipe())
@MessagePattern({ cmd: 'sum' })
accumulate(data: number[]): number {
  return (data || []).reduce((a, b) => a + b);
}

Guard

大部分的情况下 你只需呀记住一点 如果你要throw error 微服务应该抛出 RpcException ,其他的和 Nest 既有的Guard概念和用法保持一致

ts
@UseGuards(AuthGuard)
@MessagePattern({ cmd: 'sum' })
accumulate(data: number[]): number {
  return (data || []).reduce((a, b) => a + b);
}

Interceptors

大部分的情况下 你只需呀记住一点 如果你要throw error 微服务应该抛出 RpcException。其他的和 Nest 既有的 Interceptors 概念和用法保持一致

ts
@UseInterceptors(new TransformInterceptor())
@MessagePattern({ cmd: 'sum' })
accumulate(data: number[]): number {
  return (data || []).reduce((a, b) => a + b);
}

自定义 Transport(以及高级操作)

经过了 上面的诸多学习 我们看到了非常多的 由Nest提供开箱即用的 Transport,如果你有自定义的需求。请直接看这两篇优秀的文章(我就不翻译了)
这些文章的作者 都是John 他 Nest 的核心作者之一

  1. 这片文章,详细的说明了 Nest 底层的一些实现, 以及如何处理 自定义的序列化和反序列化

自定义序列化和反序列化

  1. 这篇是完全的自定义 Transport

自定义的 Transport