- Published on
- 约 3710 字
SkyWalking Node 响应后链路追踪分析
- Authors

- Name
- 小辉辉
1. 背景
在 Node.js 服务接入 SkyWalking 后,我们希望确认一个常见场景的链路表现:
- 请求进入 Node.js 路由。
- 路由内先执行 PostgreSQL 查询。
- 调用
res.json(responseBody)返回响应。 - 响应后继续执行数据库查询或调用下游服务 HTTP 接口。
最初的直觉是:只要代码仍然写在同一个 Express handler 中,后续数据库查询和 HTTP 调用就应该继续挂在同一个 Trace 下。但实际实验发现,是否能挂载到同一个 Trace,不只取决于代码位置,而取决于 SkyWalking Node agent 基于 AsyncLocalStorage 保存的 span 栈是否仍能被后续异步任务继承。
本文基于以下两部分内容分析:
- 实际实验现象。
<skywalking-nodejs源码目录>/src中 SkyWalking Node agent 源码。
2. 关键概念
2.1 EntrySpan
EntrySpan 表示当前服务被外部调用,例如:
外部系统 / 前端 -> Node 服务 /external-upload-v2
Node 服务侧的入口 span 类似:
EntrySpan: POST /external-upload-v2
2.2 ExitSpan
ExitSpan 表示当前服务调用外部系统,例如:
Node 服务 -> PostgreSQL
Node 服务 -> 下游服务 /api/mock-process
对应 SkyWalking 里可能看到:
POST /external-upload-v2
├─ PostgreSQL pg/query
└─ POST /api/mock-process
└─ 下游服务 /api/mock-process
2.3 LocalSpan
LocalSpan 表示当前服务内部的一段本地逻辑,例如参数解析、数据转换、规则计算等。它既不是外部调用当前服务,也不是当前服务调用外部系统。
示例:
POST /external-upload-v2
├─ LocalSpan: parseExternalPayload
├─ LocalSpan: normalizeBusinessData
├─ PostgreSQL pg/query
└─ POST /api/mock-process
3. 实验现象
3.1 简单 /trace-demo 路由
代码形态:
app.get('/trace-demo', async (_req, res) => {
res.json({ status: 'ok', timestamp: new Date().toISOString() })
fetchDownstream('/api/mock-process', {
method: 'POST',
headers: {
'Content-Type': 'application/json',
'x-mock-user': 'mock-user-001',
'x-mock-tenant': 'mock-tenant-001',
},
body: JSON.stringify({
resourceId: 'mock-resource-001',
resourceName: 'mock-document.pdf',
resourceUrl: 'https://example.com/files/mock-document.pdf',
}),
})
})
实际 SkyWalking 链路:
GET /trace-demo
└─ POST /api/mock-process
└─ 下游服务 /api/mock-process
说明:虽然 fetchDownstream 写在 res.json 后面,但它紧跟着同步发起,仍然能拿到当前请求的上下文。
3.2 res.json 前增加 await pool.query
代码形态:
app.get('/trace-demo', async (_req, res) => {
await pool.query('select 1')
res.json({ status: 'ok', timestamp: new Date().toISOString() })
fetchDownstream('/api/mock-process', {
method: 'POST',
headers: {
'Content-Type': 'application/json',
'x-mock-user': 'mock-user-001',
'x-mock-tenant': 'mock-tenant-001',
},
body: JSON.stringify({
resourceId: 'mock-resource-001',
resourceName: 'mock-document.pdf',
resourceUrl: 'https://example.com/files/mock-document.pdf',
}),
})
})
实际 SkyWalking 链路:
GET /trace-demo
└─ PostgreSQL pg/query
说明:只要 res.json 之前经历过 await pool.query,res.json 后的下游调用就不再挂到当前 Trace 下。
3.3 下游调用放到 res.json 前
代码形态:
await pool.query('select * from business_record WHERE business_id = $1 and record_file_id = \'1\'', [businessId])
await fetchDownstream('/api/mock-process', {
method: 'POST',
headers: {
'Content-Type': 'application/json',
'x-mock-user': 'mock-user-001',
'x-mock-tenant': 'mock-tenant-001',
},
body: JSON.stringify({
resourceId: 'mock-resource-002',
resourceName: 'mock-document.pdf',
resourceUrl: 'https://example.com/files/mock-document.pdf',
}),
})
res.json(responseBody)
await fetchDownstream('/api/mock-process', {
method: 'POST',
headers: {
'Content-Type': 'application/json',
'x-mock-user': 'mock-user-001',
'x-mock-tenant': 'mock-tenant-001',
},
body: JSON.stringify({
resourceId: 'mock-resource-001',
resourceName: 'mock-document.pdf',
resourceUrl: 'https://example.com/files/mock-document.pdf',
}),
})
实际 SkyWalking 链路:
GET /your-route
├─ PostgreSQL pg/query
└─ POST /api/mock-process resourceId = mock-resource-002
└─ 下游服务 /api/mock-process
res.json 后的 resourceId = mock-resource-001 调用没有进入当前 Trace。
3.4 给路由增加延迟 res.json 中间件
验证中间件:
function deferJsonResponseForTrace(_req: Request, res: Response, next: NextFunction): void {
const originalJson = res.json.bind(res)
let jsonCalled = false
res.json = ((body?: unknown) => {
if (jsonCalled) {
return res
}
jsonCalled = true
setImmediate(() => {
if (!res.headersSent) {
originalJson(body)
}
})
return res
}) as Response['json']
next()
}
只挂载到 handleExternalUploadV2 对应路由:
router.post(
'/external-upload-v2',
mockPermissionCheck,
deferJsonResponseForTrace,
mockController.handleExternalUploadV2
)
验证结果:
POST /external-upload-v2
├─ PostgreSQL pg/query
├─ PostgreSQL pg/query
└─ POST /api/mock-process
└─ 下游服务 /api/mock-process
也就是:延迟 res.json 后,res.json 后面的 PG 查询和下游调用都可以继续挂载到同一个 Trace。
4. 源码分析
4.1 SkyWalking Node 使用 AsyncLocalStorage 保存 span 栈
源码位置:
<skywalking-nodejs源码目录>/src/trace/context/ContextManager.ts
关键代码:
type AsyncState = { spans: Span[] }
let store: {
getStore(): AsyncState | undefined
enterWith(s: AsyncState): void
}
if (async_hooks.AsyncLocalStorage) {
store = new async_hooks.AsyncLocalStorage()
}
ContextManager.current 会从当前 async store 的 span 栈顶取 context:
get current(): Context {
const asyncState = this.asyncState
if (asyncState.spans.length) return asyncState.spans[asyncState.spans.length - 1].context
if (SpanContext.nActiveSegments < config.maxBufferSize) return new SpanContext()
return new DummyContext()
}
这意味着 PG、HTTP、Axios 等插件创建 span 时,依赖的不是“当前代码还在哪个函数里”,而是当前异步上下文里的 spans 栈。
4.2 Express/HTTP 入口 span 的创建和清理
源码位置:
<skywalking-nodejs源码目录>/src/plugins/HttpPlugin.ts
入口请求会进入:
wrapHttpResponse(span, req, res, handler)
其中:
span.start()
随后包装 req.emit 和 res.emit:
wrapEmit(span, req, true, isSub12 ? stopper('end') : NaN)
wrapEmit(span, res, true, isSub12 ? NaN : stopper('close'))
对于 Node 12+,res 的 close 事件会触发 stop 判断。
wrapEmit 源码位置:
<skywalking-nodejs源码目录>/src/core/SwPlugin.ts
关键代码:
export const wrapEmit = (span: Span, ee: any, doError: boolean = true, stop: any = NaN) => {
const stopIsFunc = typeof stop === 'function'
const _emit = ee.emit
Object.defineProperty(ee, 'emit', {
configurable: true,
writable: true,
value: function (this: any): any {
const event = arguments[0]
span.resync()
try {
if (doError && event === 'error') span.error(arguments[1])
return _emit.apply(this, arguments)
} catch (err) {
span.error(err)
throw err
} finally {
if (stopIsFunc ? stop(event) : event === stop) span.stop()
else span.async()
}
},
})
}
也就是说,SkyWalking 并没有直接 patch res.json。真正的清理/结束发生在被包装后的 res.emit(...) 中:
- 普通响应事件:
span.async() - 停止事件,例如 Node 12+ 的
res.close:span.stop()
4.3 span.async() 和 span.stop() 都会 clear 当前 span
源码位置:
<skywalking-nodejs源码目录>/src/trace/context/SpanContext.ts
async:
async(span: Span) {
ContextManager.clear(span)
}
stop:
stop(span: Span): boolean {
span.finish(this.segment)
ContextManager.clear(span)
if (--this.nSpans === 0) {
this.finished = true
emitter.emit('segment-finished', this.segment)
return true
}
return false
}
4.4 clear 不是全局删除,而是复制当前 span 栈后删除
源码位置:
<skywalking-nodejs源码目录>/src/trace/context/ContextManager.ts
关键代码:
spansDup(): Span[] {
let asyncState = store.getStore()
if (!asyncState) {
asyncState = { spans: [] }
} else {
asyncState = { spans: [...asyncState.spans] }
}
store.enterWith(asyncState)
return asyncState.spans
}
clear(span: Span): void {
const spans = this.spansDup() // this needed to make sure async tasks created before this call will still have this span at the top of their span list
const idx = spans.indexOf(span)
if (idx !== -1) spans.splice(idx, 1)
}
这段注释非常关键:
this needed to make sure async tasks created before this call will still have this span at the top of their span list
翻译成当前问题就是:
clear 之前已经创建的异步任务,会继续保留当时的 span 栈。
clear 只影响当前执行链后续的 span 栈副本。
这也是延迟 res.json 后,后续逻辑还能挂到同一个 Trace 的关键原因。
4.5 PG 插件如何创建 span
源码位置:
<skywalking-nodejs源码目录>/src/plugins/PgPlugin.ts
关键代码:
const span = ContextManager.current.newExitSpan('pg/query', Component.POSTGRESQL)
span.start()
query = _query.call(this, config, values, callback)
if (query) {
if (Cursor && query instanceof Cursor) wrapEmit(span, query, true, 'end')
else if (typeof query.then === 'function') query = wrapPromise(span, query)
}
span.async()
return query
重点:
- PG 查询调用发生时,从
ContextManager.current创建 exit span。 - 如果当前 async context 里有入口 span,PG span 就会成为它的子 span。
- PG Promise 创建后,当前 span 会
async(),但已经创建的 Promise 会继承创建时的 async context。
4.6 HTTP 插件如何创建下游调用 span
源码位置:
<skywalking-nodejs源码目录>/src/plugins/HttpPlugin.ts
关键代码:
const span = ContextManager.current.newExitSpan(operation, Component.HTTP)
span.start()
const req: ClientRequest = _request.apply(this, arguments)
span
.inject()
.items.filter((item) => expect != '100-continue')
.forEach((item) => req.setHeader(item.key, item.value))
wrapEmit(span, req, true, 'close')
span.async()
return req
重点:
- 下游 HTTP 调用创建时,也从
ContextManager.current创建 exit span。 - 如果当前 async context 仍然有入口 span,就可以挂到原 Trace 下。
- 如果当前 async context 已经没有入口 span,就无法挂到原请求 Trace 下。
5. 为什么不延迟 res.json 时统计不到
不延迟时,代码类似:
await pool.query('select 1')
res.json(responseBody)
await pool.query('select 2')
await fetchDownstream('/api/mock-process', ...)
实际链路只看到:
POST /your-route
└─ PostgreSQL select 1
原因不是 res.json 这行代码本身被 SkyWalking patch,而是:
- 原始
res.json会同步进入 Express/Node 响应发送逻辑。 - 响应发送过程中会触发
res.emit(...)。 - SkyWalking 已经包装了
res.emit。 res.emit被触发后会执行span.async()或span.stop()。- 这会通过
ContextManager.clear(span)清理当前执行链上的入口 span。 - 后续
pool.query或fetchDownstream创建 span 时,从ContextManager.current取不到原入口 span,于是无法挂到原 Trace 下。
所以不延迟时的关键时序是:
handler 恢复
-> 原始 res.json 同步执行
-> res.emit 被触发
-> SkyWalking clear 当前执行链 span
-> 后续 pool.query / fetchDownstream 创建时已经没有父 span
6. 为什么延迟 res.json 后可以统计到
延迟后,代码仍然是:
res.json(responseBody)
await pool.query('select 2')
await fetchDownstream('/api/mock-process', ...)
但 res.json 被替换为:
res.json = ((body?: unknown) => {
setImmediate(() => {
if (!res.headersSent) {
originalJson(body)
}
})
return res
}) as Response['json']
真实时序不是“所有后续异步任务都创建完再调用 originalJson”。实际调试观察到的时序更接近:
业务代码调用 res.json()
-> 中间件注册 setImmediate(originalJson)
继续执行到 await pool.query('select 2')
-> PG 查询异步资源创建
-> PG Promise 继承当前 async context
-> handler 在 await 处让出
setImmediate 执行 originalJson()
-> res.emit 被触发
-> SkyWalking 在 setImmediate 这条执行链上 async/stop/clear
PG 查询完成
-> handler 从 PG Promise 的 async context 恢复
-> 这条 async context 是 originalJson 前创建的,仍保留入口 span
继续执行 fetchDownstream()
-> HTTP exit span 从当前 context 创建
-> 成功挂到原 Trace
换句话说:
延迟 res.json 的关键作用,不是等待所有后续任务创建完,
而是让第一个 await 对应的异步资源在原始响应逻辑执行前先创建并继承原请求上下文。
第一个 await pool.query 像一根“上下文接力棒”。它在 originalJson 之前创建,继承到了带入口 span 的 async store。后续 handler 从这个 Promise 恢复时,继续沿着这份 async store 执行,所以后面的 PG 查询和下游 HTTP 调用都能从 ContextManager.current 找到原 Trace。
7. 延迟后清理逻辑是否失效
不能简单说 SkyWalking 清理逻辑失效。更准确的说法是:
清理仍然执行了,但清理的是 originalJson 所在 setImmediate 执行链的上下文副本;
业务 handler 后续恢复使用的是第一个 await 创建时继承的另一份上下文副本。
延迟后会形成两条执行链:
业务链路:
handler -> await pool.query 创建异步资源 -> query resolve -> handler 继续 -> fetchDownstream
响应发送链路:
setImmediate -> originalJson -> res.emit -> SkyWalking span.async/span.stop
originalJson 触发的 span.async() / span.stop() 会调用 ContextManager.clear(span),但它清理的是 setImmediate 当前执行链上的 span 栈副本。
而业务 handler 后续从 PG Promise 恢复,恢复的是 PG Promise 创建时继承的 async store。这个 store 是在 originalJson 清理之前创建的,所以仍然保留入口 span。
因此从结果上看:
对响应发送链路:清理正常执行。
对业务后续链路:这次清理没有清到它使用的上下文副本。
8. 是否会导致 span 栈越来越大
正常短生命周期场景下,一般不会无限增长。
原因是 AsyncLocalStorage 的 store 跟异步资源生命周期相关。后续 PG 查询、HTTP 调用、Promise 链都结束后,相关 async resource 可以被释放,对应 store 也会随之释放。
但这个方案确实有风险边界。
8.1 风险一:长生命周期异步资源
如果在保留入口 span 的上下文里创建了长生命周期资源,例如:
setInterval(...)
长期 socket
常驻 EventEmitter listener
不结束的 Promise
后台 worker
这些资源可能长期持有当时的 async store,间接持有入口 span 和 segment 对象。
8.2 风险二:Trace 语义不准确
客户端可能已经收到响应,但 SkyWalking 仍显示响应后的 PG 查询和下游调用属于同一个请求 Trace。
这对“排查请求触发了哪些后续动作”有帮助,但对“客户端实际等待耗时”不准确。
例如:
客户端实际 200ms 收到响应
下游服务耗时 10s
SkyWalking Trace 里仍可能展示完整后置调用
8.3 风险三:已结束 segment 后继续创建 span
SpanContext.newSpan 里有处理 this.finished 的逻辑。如果 context 已经 finished 但仍有 parent,会创建新 segment 并通过 reference 关联旧 segment。这说明 agent 支持“已结束 segment 后继续创建子链路”的情况,但链路形态可能会变复杂。
9. 当前验证方案的定位
当前中间件适合用于验证和小范围临时观察:
function deferJsonResponseForTrace(_req: Request, res: Response, next: NextFunction): void {
const originalJson = res.json.bind(res)
let jsonCalled = false
res.json = ((body?: unknown) => {
if (jsonCalled) {
return res
}
jsonCalled = true
setImmediate(() => {
if (!res.headersSent) {
originalJson(body)
}
})
return res
}) as Response['json']
next()
}
建议只挂到指定路由:
router.post(
'/external-upload-v2',
mockPermissionCheck,
deferJsonResponseForTrace,
mockController.handleExternalUploadV2
)
不建议全局挂载。
10. 长期方案建议
10.1 如果要求严格同一 Trace
最稳妥的写法是把需要追踪的外部调用放在 res.json 前:
await pool.query(...)
await fetchDownstream(...)
res.json(responseBody)
优点:
- Trace 语义最准确。
- 不依赖 async context 副作用。
- SkyWalking 展示与请求真实处理流程一致。
缺点:
- 客户端响应会等待下游调用完成。
10.2 如果要求先响应再后台处理
更推荐将响应后的动作视为后台任务,不强行挂在原请求 Trace 下。
做法:
- 使用
businessId、taskId、resourceId、requestId等业务字段关联日志和 Trace。 - 后台任务独立成新的 Trace。
- 在日志中记录上下游业务 ID。
优点:
- 语义清晰。
- 响应耗时和后台耗时不会混在一起。
- 不依赖 SkyWalking agent 内部实现细节。
缺点:
- SkyWalking UI 上不是一条父子 Trace,需要通过业务 ID 关联。
10.3 如果既要先响应又要保留关联关系
可以考虑显式传播 trace context,而不是依赖延迟 res.json。
思路:
- 响应前保存当前 trace context。
- 后台任务中手动创建新 span 或新 segment。
- 使用 reference 关联原请求。
这种方案更接近正确的异步任务追踪模型,但需要确认当前 SkyWalking Node agent 是否暴露稳定的手动 API。
11. 最终结论
本次实验和源码分析得到的最终结论:
- SkyWalking Node agent 使用
AsyncLocalStorage保存当前 span 栈。 - PG、HTTP、Axios 插件创建 exit span 时,都会从
ContextManager.current获取当前 context。 - SkyWalking 没有直接 patch
res.json,而是包装了req.emit和res.emit。 - 原始
res.json会同步触发 Express/Node 响应逻辑,进而触发被包装的res.emit,导致当前执行链上的入口 span 被async()或stop()清理。 - 不延迟
res.json时,res.json后的 PG/HTTP 调用创建 span 时可能已经拿不到原请求上下文。 - 延迟
res.json后,第一个await对应的异步资源可以在原始响应逻辑执行前创建,并继承原请求 span 栈。 - 后续 handler 从这个异步资源恢复后,仍然能使用原请求上下文,所以后面的 PG 查询和下游 HTTP 调用都能挂到同一个 Trace。
- 这不是让 SkyWalking 清理逻辑失效,而是清理发生在响应发送链路的上下文副本上,没有清到业务后续恢复链路使用的上下文副本。
- 该方案适合验证和小范围临时使用,不建议作为全局长期方案。
- 长期设计上,应优先选择“响应前完成需要追踪的调用”,或将响应后的处理建模为独立后台任务并用业务 ID 关联。
