实现一个RPC调用sdk

如何实现一个RPC调用sdk

Posted by nolan on August 14, 2022

公司内部基于 grpc 框架进一步封装了一个自有框架,这里称作 spex,但是没有提供 nodejs 调用的 sdk,只是提供了调用规范。

需要开发一个 nodejs 版本的 spex client, 基本功能包括:

  1. cli 能力(支持根据 proto 文件生成 js 和 ts 调用代码)
  2. 调用 spex service 能力(暂不支持作为被调方)
    1. 并发性: 单个 TCP 连接 UDS,多路复用提高并发性
    2. 稳定性: 提供连接池功能,当单个 TCP 连接出现异常或者服务端断开,有预创建好的 TCP 及时切换请求
    3. 集成 tracing、全链路压测和 PFB 的功能
    4. 集成日志、监控上报(耗时、失败次数、QPS 等)

cli 能力的实现

根据 proto 文件生成 js 和 ts 调用文件,默认 pbjs 和 pbts 支持生成:

  1. 只需要内置支持从 microkit 迁移到 spex 需要添加的 service.proto 和 http.proto
  2. 编解码对于 Map<int64, …>也是支持的,有两种情况, Request 结构中带 Map<int64,…>,默认便支持; Response 结构中带 Map<int64, …>, node client 收到是 protobufjs 提供的 longToHash 转换的 hash 串,需要通过 longFromHash 转换回来即可
  3. 第二点 Response 中带 Map<int64, …>的情况,因为现在后端提供的 proto 文件不存在这个情况,后续提供方法解码即可

client 调用能力的实现

image

Client 和 Agent 的数据通信

总的来说,发送每个数据包内容:${包长度}${Header 长度}${Header内容}${body}

SpexHeader 的数据结构

message SpexHeader {
  optional bytes id = 1;  // max=100
  optional uint32 flag = 2;
  optional string key = 3;  // max=100
  optional string command = 4;  // max=100
  optional string source = 5;  // max=100
  optional uint64 timestamp = 6; // Unix timestamp in microseconds
  optional uint32 version = 7;
  optional uint32 error = 8;  // Only used for Response
  optional SpexHeaderQoS qos = 9;
  optional string destination = 10;
  optional uint32 content_type = 11; // enum SpexHeaderContentType
  optional string span_context = 12; // span propagation
  optional int64 session_id = 13;
  optional Status status = 14;
}

Client 初始化流程

1. 创建 TCP 连接
  1. 生成 instanceId
    1. instanceId 规则
    2. ${serviceName}.${region}.${env}.${tag}.${sduId}.${uniqueId   uuidv4()}
  2. SpexHeader id 的规则
    1. nanoid(32)
2. RegisterRouting
  1. 生成 instanceId,带上 serviceKey 和 serviceName 发送命令 sp.exchange.register_routing 注册到 Agent
  2. 等待注册完成
3.基于 TCP 连接多路复用发起并发请求

多路复用: 基于 TCP 长连接并发请求,根据 requestId 对应 Resposne 处理

连接池: 这里直接参考 grpc-js/subchannel-pool.ts 的连接池功能 目的是当 TCP 连接出现异常时,有新的准备好(创建、注册、保活)的连接可以继续并发处理用户的请求,影响用户最小化

目的是当 TCP 连接出现异常时,有新的准备好(创建、注册、保活)的连接可以继续并发处理用户的请求,影响用户最小化

设计原则: 连接池对 node spex client 使用方透明,不需要业务考虑创建、销毁、是否重试等情况(Node Spex Client 不提供重试功能);默认保持一个可用的连接(自动创建、注册、保活),定义连接的 ID,方便在日志和请求上区分;

切换连接条件:

  • 收到 close 事件(服务度关闭或者客户端异常)
  • 发生队头阻塞异常时
  • 原来连接销毁
  • 失败请求记录
  • 回收监听函数等

题外话:谈一下如何创建一个 rpc 服务

image

一般的 rpc 框架都会提供一个 cli,用来根据协议去自动生成 protobuf 文件。

step1: 创建 IDL

// 创建服务名为 `seller.fulfillment` 的 IDL
// spcli proto create <project>-<module> [-t|test] [-n|namespace]  [-p|package] [-o|output]
// list at most 10 services match pattern
// -test: `test mode` create a test service
// -namespace: `namespace` specify namespace of command in idl
// -package: `package` specify package of idl
// -output: `output` generate idl from template
$ > spcli proto create seller-fulfillment -t


// output
[info] create seller-fulfillment.proto succeed   // font color: grey
[info] service information as follow:      // font color: green
[info] name:  seller-fulfillment           // font color: green
[info] api namespace:  seller.fulfillment  // font color: green

step1: 编辑 IDL,添加接口

下面是通过 step1 命令生成的 IDL 模板,进行二次编辑(包括改文件名为 hello.proto)。


syntax = "proto2";

package ;
import "spex/protobuf/service.proto";

// Code generated by spcli. DO NOT EDIT.
//
// namespace 
//
// commands {
//   .
// }
//

service  {

  option (service.service) = {
    servicename:
  };

  rpc  (Request) returns (Response) {
  }
}

message Request{
}
message Response{
}

step3: 创建服务

// 创建服务名为 `seller.fulfillment` 的 IDL
// spcli service apply -f <idl>
$ > spcli service apply -f <idl>

// output
[info] create seller-fulfillment succeed   // font color: grey
[info] service information as follow:      // font color: green
[info] name:  seller-fulfillment           // font color: green
[info] key:  f234w4k358d7x3                // font color: green
[info] api namespace:  seller.fulfillment  // font color: green


// 查询服务名为 `seller-fulfillment` 的服务
// spcli service list <pattern> [-d|detail]
// list at most 10 services match pattern
// -detail: `detail mode` will list keys of service: config key, service key
$ > spcli service list seller-fulfillment [-d|detail]

// output
[info] services find as follow...      // font color: grey

[info] seller-fulfillment:                 // font color: green
[info] key:  f234w4k358d7x3                // font color: grey
[info] api namespace:  seller.fulfillment  // font color: grey

[info] seller-fulfillment:                 // font color: green
[info] key:  f234w4k358d7x3                // font color: grey
[info] api namespace:  seller.fulfillment  // font color: grey

step4: 编译 IDL

// 编译 IDL
// spcli proto gen [-c|compatible]
// -c: generate http style client,only for `backend = http`
$ > spcli proto gen hello.proto

// output
[info] generate hello.proto succeed   // font color: grey

[info] files as follow:
[info] .
[info] ├── hello              // font color: green for changed file, gray for no change file
[info] │   ├── client.go [new]
[info] │   ├── server.go
[info] │   ├── hello_gw.pb.go
[info] │   └── hello.pb.go

step5: 编写并启动服务

// 编写服务
func main () {
   // Read service information from `environment` or `config file` as following variable
   var (
      name = ""
      key = ""
      namespace = ""
   )

   var hello.Hello hdlr

   // Create server
   server := hello.NewServer(hdlr, server.WithName(name), server.WithKey(key), server.WithNamespace(namespace))

   // Run server
   server.Run()
}


// bootstrap service
$ > go run main.go

IDL 说明

spex 基础 proto: spex/protobuf/http.proto

//  http.proto
syntax = "proto2";

package http;

message HttpRule {
  // Determines the URL pattern is matched by this rules. This pattern can be
  // used with any of the {get|put|post|delete|patch} methods.
  oneof pattern {
    // Maps to HTTP GET. Used for listing and getting information about resources.
    string get = 2;

    // Maps to HTTP PUT. Used for replacing a resource.
    string put = 3;

    // Maps to HTTP POST. Used for creating a resource or performing an action.
    string post = 4;

    // Maps to HTTP DELETE. Used for deleting a resource.
    string delete = 5;

    // Maps to HTTP PATCH. Used for updating a resource.
    string patch = 6;
  }
}

message Pair {
  optional string key = 1;
  repeated string value = 2;
}

message Request {
  optional uint32 version = 1;
  optional string uri = 2;
  optional string method = 3;
  repeated Pair headers = 4;
  optional bytes body = 5;
  // optional google.protobuf.Any meta = 6;
}

message Response {
  optional uint32 version = 1;
  optional uint32 status = 2;
  repeated Pair headers = 3;
  optional bytes body = 4;
  // optional google.protobuf.Any meta = 5;
}

spex 基础 proto: spex/protobuf/service.proto

// service.proto

syntax = "proto2";

package service;

import "google/protobuf/descriptor.proto";
import "spex/protobuf/http.proto";

message Service {
    enum Backend {
        SPEX = 0;
        HTTP = 1;
        GRPC = 2;
    }
    optional Backend backend = 1;
    optional string namespace = 2;
    optional string servicename = 3;
}

extend google.protobuf.ServiceOptions {
    optional Service service = 82295727;
}

extend google.protobuf.MethodOptions {
    optional http.HttpRule http = 72295728;
}

业务 proto: hello.proto

通过 spcli proto create 命令生成模板,再进行编辑

// hello.proto
syntax = "proto2";

package seller.fulfillmentorder;
import "spex/protobuf/service.proto";

// Code generated by spcli. DO NOT EDIT.
//
// namespace seller.fulfillmentorder
//
// commands {
//   seller.fulfillmentorder.echo
// }
//

service Hello {

  option (service.service) = {
    backend: "http" // 可以是上面 enum 中的 http、grpc、spex
    servicename:"seller-fulfillmentorder"
  };

  rpc Echo (http.Request) returns (http.Response) {
    option (service.http) = {
      get: "/echo/:message"
    };
  }
}