背景

在 Ads-Admin-V2.0 需求中,有一个功能点是将报表数据导出为 csv 。其中,要求在导出的 csv 头部添加一些导出信息的元信息,如 Time Range 和 Download Time 等,即上图中灰色文字部分。
导出流程梳理

- node 收到导出请求,查询 clickhouse,得到可读流 chReadableStream;
- node 将可读流数据转换 csv 格式(利用 csv 库提供的 stringfy 方法,本质是一个转换流),得到 csvReadableStream;
- node 将 csv 可读流上传到 S3;
在实现上,整个导出过程是异步的,前端在发出导出请求后,会很快收到一个 taskId, 后续需通过 taskId 轮询导出结果。
问题描述
实现上面三个主流程的功能,是比较顺利的,很快调通并实现了 csv 的生成与上传。然而,在我们想要为 csv 加头部 meta 信息时,就遇到了麻烦。
首先, node-csv 这个包没有提供直接的 api 接口,当时提了相关的 issue 给维护者,并得到了以下回复。

大概意思就是,我们没有提供,需要你自己在 csv 流关闭前,把头部 meta 数据写进流。
按照这个思路,实现了一个双工流
import { Duplex, DuplexOptions } from 'stream';
/**
* 支持初始化数据的双工流
*/
export class InitDataDuplex extends Duplex {
constructor(initData: Buffer | string, options: DuplexOptions) {
super(options);
this.push(initData.toString());
}
_read(size: number): void {}
_write(
chunk: any,
encoding: BufferEncoding,
callback: (error?: Error | null) => void
): void {
if (Buffer.isBuffer(chunk)) {
chunk = chunk.toString();
}
this.push(chunk);
callback();
}
}
使用上:
import { stringify } from 'csv';
const addCsvMeta = new InitDataDuplex(csvMetaString);
const chReadStream = this.ckService.query(sql);
const csvStream = chReadStream
.pipe(
stringify({
header: true,
columns: columnName,
})
)
.pipe(addCsvMeta);
this.ossService.upload({
bucket: S3_BUCKET,
name: csvName,
body: csvStream,
});
当加上 addCsvMeta 的逻辑后,s3 上就不见被导出的文件了;去掉则正常。
排查之路
分析日志
当有不正常的期望发生时,做服务的第一反应应该是去看日志。在可能出错的地方都加了日志,但没有发现什么错误。而且发现,上传 s3 的成功和失败的回调,都没有被执行,这个异步 promise 似乎没有完成。
分析代码
没有日志,只能进一步分析代码。将流的最后一步, pipe 到 process.stdout , 可以正常输出,为了更直观,将流直接 pipe 到一个 creaeWriteStream(写成本地 csv),可以正常生成带头部 meta 信息的本地文件。这时,就可以差不多断定是 s3 上传这一步出了问题。
深度断点
那到底是出了什么问题?为什么加上 addCsvMeta 就有问题了?为了验证经过 addCsvMeta 后,数据流发生了哪些变异,我打印并对比了正常与非正常情况的流。

发现确实有了些变化。
为了验证,我们进一步打断点处理。
在上层业务代码里看不出什么,只能在 aws 提供的 s3 上传里断。

发现上传失败的情况下,没有触发 end 事件, 正常上传的时候,都会触发 end 事件。
查阅资料,在自定义的双工流里加入了这个 _final 方法。
_final () { this.push(null);}
终于可以了!

总结
分析思路很重要。