Published on
3710

SkyWalking Node 响应后链路追踪分析

Authors
  • avatar
    Name
    小辉辉
    Twitter

1. 背景

在 Node.js 服务接入 SkyWalking 后,我们希望确认一个常见场景的链路表现:

  1. 请求进入 Node.js 路由。
  2. 路由内先执行 PostgreSQL 查询。
  3. 调用 res.json(responseBody) 返回响应。
  4. 响应后继续执行数据库查询或调用下游服务 HTTP 接口。

最初的直觉是:只要代码仍然写在同一个 Express handler 中,后续数据库查询和 HTTP 调用就应该继续挂在同一个 Trace 下。但实际实验发现,是否能挂载到同一个 Trace,不只取决于代码位置,而取决于 SkyWalking Node agent 基于 AsyncLocalStorage 保存的 span 栈是否仍能被后续异步任务继承。

本文基于以下两部分内容分析:

  • 实际实验现象。
  • <skywalking-nodejs源码目录>/src 中 SkyWalking Node agent 源码。

2. 关键概念

2.1 EntrySpan

EntrySpan 表示当前服务被外部调用,例如:

外部系统 / 前端 -> Node 服务 /external-upload-v2

Node 服务侧的入口 span 类似:

EntrySpan: POST /external-upload-v2

2.2 ExitSpan

ExitSpan 表示当前服务调用外部系统,例如:

Node 服务 -> PostgreSQL
Node 服务 -> 下游服务 /api/mock-process

对应 SkyWalking 里可能看到:

POST /external-upload-v2
├─ PostgreSQL pg/query
└─ POST /api/mock-process
   └─ 下游服务 /api/mock-process

2.3 LocalSpan

LocalSpan 表示当前服务内部的一段本地逻辑,例如参数解析、数据转换、规则计算等。它既不是外部调用当前服务,也不是当前服务调用外部系统。

示例:

POST /external-upload-v2
├─ LocalSpan: parseExternalPayload
├─ LocalSpan: normalizeBusinessData
├─ PostgreSQL pg/query
└─ POST /api/mock-process

3. 实验现象

3.1 简单 /trace-demo 路由

代码形态:

app.get('/trace-demo', async (_req, res) => {
  res.json({ status: 'ok', timestamp: new Date().toISOString() })

  fetchDownstream('/api/mock-process', {
    method: 'POST',
    headers: {
      'Content-Type': 'application/json',
      'x-mock-user': 'mock-user-001',
      'x-mock-tenant': 'mock-tenant-001',
    },
    body: JSON.stringify({
      resourceId: 'mock-resource-001',
      resourceName: 'mock-document.pdf',
      resourceUrl: 'https://example.com/files/mock-document.pdf',
    }),
  })
})

实际 SkyWalking 链路:

GET /trace-demo
└─ POST /api/mock-process
   └─ 下游服务 /api/mock-process

说明:虽然 fetchDownstream 写在 res.json 后面,但它紧跟着同步发起,仍然能拿到当前请求的上下文。

3.2 res.json 前增加 await pool.query

代码形态:

app.get('/trace-demo', async (_req, res) => {
  await pool.query('select 1')

  res.json({ status: 'ok', timestamp: new Date().toISOString() })

  fetchDownstream('/api/mock-process', {
    method: 'POST',
    headers: {
      'Content-Type': 'application/json',
      'x-mock-user': 'mock-user-001',
      'x-mock-tenant': 'mock-tenant-001',
    },
    body: JSON.stringify({
      resourceId: 'mock-resource-001',
      resourceName: 'mock-document.pdf',
      resourceUrl: 'https://example.com/files/mock-document.pdf',
    }),
  })
})

实际 SkyWalking 链路:

GET /trace-demo
└─ PostgreSQL pg/query

说明:只要 res.json 之前经历过 await pool.queryres.json 后的下游调用就不再挂到当前 Trace 下。

3.3 下游调用放到 res.json

代码形态:

await pool.query('select * from business_record WHERE business_id = $1 and record_file_id = \'1\'', [businessId])

await fetchDownstream('/api/mock-process', {
  method: 'POST',
  headers: {
    'Content-Type': 'application/json',
    'x-mock-user': 'mock-user-001',
    'x-mock-tenant': 'mock-tenant-001',
  },
  body: JSON.stringify({
    resourceId: 'mock-resource-002',
    resourceName: 'mock-document.pdf',
    resourceUrl: 'https://example.com/files/mock-document.pdf',
  }),
})

res.json(responseBody)

await fetchDownstream('/api/mock-process', {
  method: 'POST',
  headers: {
    'Content-Type': 'application/json',
    'x-mock-user': 'mock-user-001',
    'x-mock-tenant': 'mock-tenant-001',
  },
  body: JSON.stringify({
    resourceId: 'mock-resource-001',
    resourceName: 'mock-document.pdf',
    resourceUrl: 'https://example.com/files/mock-document.pdf',
  }),
})

实际 SkyWalking 链路:

GET /your-route
├─ PostgreSQL pg/query
└─ POST /api/mock-process    resourceId = mock-resource-002
   └─ 下游服务 /api/mock-process

res.json 后的 resourceId = mock-resource-001 调用没有进入当前 Trace。

3.4 给路由增加延迟 res.json 中间件

验证中间件:

function deferJsonResponseForTrace(_req: Request, res: Response, next: NextFunction): void {
  const originalJson = res.json.bind(res)
  let jsonCalled = false

  res.json = ((body?: unknown) => {
    if (jsonCalled) {
      return res
    }

    jsonCalled = true
    setImmediate(() => {
      if (!res.headersSent) {
        originalJson(body)
      }
    })

    return res
  }) as Response['json']

  next()
}

只挂载到 handleExternalUploadV2 对应路由:

router.post(
  '/external-upload-v2',
  mockPermissionCheck,
  deferJsonResponseForTrace,
  mockController.handleExternalUploadV2
)

验证结果:

POST /external-upload-v2
├─ PostgreSQL pg/query
├─ PostgreSQL pg/query
└─ POST /api/mock-process
   └─ 下游服务 /api/mock-process

也就是:延迟 res.json 后,res.json 后面的 PG 查询和下游调用都可以继续挂载到同一个 Trace。

4. 源码分析

4.1 SkyWalking Node 使用 AsyncLocalStorage 保存 span 栈

源码位置:

<skywalking-nodejs源码目录>/src/trace/context/ContextManager.ts

关键代码:

type AsyncState = { spans: Span[] }

let store: {
  getStore(): AsyncState | undefined
  enterWith(s: AsyncState): void
}

if (async_hooks.AsyncLocalStorage) {
  store = new async_hooks.AsyncLocalStorage()
}

ContextManager.current 会从当前 async store 的 span 栈顶取 context:

get current(): Context {
  const asyncState = this.asyncState

  if (asyncState.spans.length) return asyncState.spans[asyncState.spans.length - 1].context

  if (SpanContext.nActiveSegments < config.maxBufferSize) return new SpanContext()

  return new DummyContext()
}

这意味着 PG、HTTP、Axios 等插件创建 span 时,依赖的不是“当前代码还在哪个函数里”,而是当前异步上下文里的 spans 栈。

4.2 Express/HTTP 入口 span 的创建和清理

源码位置:

<skywalking-nodejs源码目录>/src/plugins/HttpPlugin.ts

入口请求会进入:

wrapHttpResponse(span, req, res, handler)

其中:

span.start()

随后包装 req.emitres.emit

wrapEmit(span, req, true, isSub12 ? stopper('end') : NaN)
wrapEmit(span, res, true, isSub12 ? NaN : stopper('close'))

对于 Node 12+,resclose 事件会触发 stop 判断。

wrapEmit 源码位置:

<skywalking-nodejs源码目录>/src/core/SwPlugin.ts

关键代码:

export const wrapEmit = (span: Span, ee: any, doError: boolean = true, stop: any = NaN) => {
  const stopIsFunc = typeof stop === 'function'
  const _emit = ee.emit

  Object.defineProperty(ee, 'emit', {
    configurable: true,
    writable: true,
    value: function (this: any): any {
      const event = arguments[0]

      span.resync()

      try {
        if (doError && event === 'error') span.error(arguments[1])

        return _emit.apply(this, arguments)
      } catch (err) {
        span.error(err)

        throw err
      } finally {
        if (stopIsFunc ? stop(event) : event === stop) span.stop()
        else span.async()
      }
    },
  })
}

也就是说,SkyWalking 并没有直接 patch res.json。真正的清理/结束发生在被包装后的 res.emit(...) 中:

  • 普通响应事件:span.async()
  • 停止事件,例如 Node 12+ 的 res.closespan.stop()

4.3 span.async()span.stop() 都会 clear 当前 span

源码位置:

<skywalking-nodejs源码目录>/src/trace/context/SpanContext.ts

async

async(span: Span) {
  ContextManager.clear(span)
}

stop

stop(span: Span): boolean {
  span.finish(this.segment)
  ContextManager.clear(span)

  if (--this.nSpans === 0) {
    this.finished = true
    emitter.emit('segment-finished', this.segment)
    return true
  }

  return false
}

4.4 clear 不是全局删除,而是复制当前 span 栈后删除

源码位置:

<skywalking-nodejs源码目录>/src/trace/context/ContextManager.ts

关键代码:

spansDup(): Span[] {
  let asyncState = store.getStore()

  if (!asyncState) {
    asyncState = { spans: [] }
  } else {
    asyncState = { spans: [...asyncState.spans] }
  }

  store.enterWith(asyncState)

  return asyncState.spans
}

clear(span: Span): void {
  const spans = this.spansDup() // this needed to make sure async tasks created before this call will still have this span at the top of their span list
  const idx = spans.indexOf(span)

  if (idx !== -1) spans.splice(idx, 1)
}

这段注释非常关键:

this needed to make sure async tasks created before this call will still have this span at the top of their span list

翻译成当前问题就是:

clear 之前已经创建的异步任务,会继续保留当时的 span 栈。
clear 只影响当前执行链后续的 span 栈副本。

这也是延迟 res.json 后,后续逻辑还能挂到同一个 Trace 的关键原因。

4.5 PG 插件如何创建 span

源码位置:

<skywalking-nodejs源码目录>/src/plugins/PgPlugin.ts

关键代码:

const span = ContextManager.current.newExitSpan('pg/query', Component.POSTGRESQL)

span.start()

query = _query.call(this, config, values, callback)

if (query) {
  if (Cursor && query instanceof Cursor) wrapEmit(span, query, true, 'end')
  else if (typeof query.then === 'function') query = wrapPromise(span, query)
}

span.async()

return query

重点:

  • PG 查询调用发生时,从 ContextManager.current 创建 exit span。
  • 如果当前 async context 里有入口 span,PG span 就会成为它的子 span。
  • PG Promise 创建后,当前 span 会 async(),但已经创建的 Promise 会继承创建时的 async context。

4.6 HTTP 插件如何创建下游调用 span

源码位置:

<skywalking-nodejs源码目录>/src/plugins/HttpPlugin.ts

关键代码:

const span = ContextManager.current.newExitSpan(operation, Component.HTTP)

span.start()

const req: ClientRequest = _request.apply(this, arguments)

span
  .inject()
  .items.filter((item) => expect != '100-continue')
  .forEach((item) => req.setHeader(item.key, item.value))

wrapEmit(span, req, true, 'close')

span.async()

return req

重点:

  • 下游 HTTP 调用创建时,也从 ContextManager.current 创建 exit span。
  • 如果当前 async context 仍然有入口 span,就可以挂到原 Trace 下。
  • 如果当前 async context 已经没有入口 span,就无法挂到原请求 Trace 下。

5. 为什么不延迟 res.json 时统计不到

不延迟时,代码类似:

await pool.query('select 1')

res.json(responseBody)

await pool.query('select 2')
await fetchDownstream('/api/mock-process', ...)

实际链路只看到:

POST /your-route
└─ PostgreSQL select 1

原因不是 res.json 这行代码本身被 SkyWalking patch,而是:

  1. 原始 res.json 会同步进入 Express/Node 响应发送逻辑。
  2. 响应发送过程中会触发 res.emit(...)
  3. SkyWalking 已经包装了 res.emit
  4. res.emit 被触发后会执行 span.async()span.stop()
  5. 这会通过 ContextManager.clear(span) 清理当前执行链上的入口 span。
  6. 后续 pool.queryfetchDownstream 创建 span 时,从 ContextManager.current 取不到原入口 span,于是无法挂到原 Trace 下。

所以不延迟时的关键时序是:

handler 恢复
  -> 原始 res.json 同步执行
  -> res.emit 被触发
  -> SkyWalking clear 当前执行链 span
  -> 后续 pool.query / fetchDownstream 创建时已经没有父 span

6. 为什么延迟 res.json 后可以统计到

延迟后,代码仍然是:

res.json(responseBody)

await pool.query('select 2')
await fetchDownstream('/api/mock-process', ...)

res.json 被替换为:

res.json = ((body?: unknown) => {
  setImmediate(() => {
    if (!res.headersSent) {
      originalJson(body)
    }
  })

  return res
}) as Response['json']

真实时序不是“所有后续异步任务都创建完再调用 originalJson”。实际调试观察到的时序更接近:

业务代码调用 res.json()
  -> 中间件注册 setImmediate(originalJson)

继续执行到 await pool.query('select 2')
  -> PG 查询异步资源创建
  -> PG Promise 继承当前 async context
  -> handler 在 await 处让出

setImmediate 执行 originalJson()
  -> res.emit 被触发
  -> SkyWalking 在 setImmediate 这条执行链上 async/stop/clear

PG 查询完成
  -> handler 从 PG Promise 的 async context 恢复
  -> 这条 async context 是 originalJson 前创建的,仍保留入口 span

继续执行 fetchDownstream()
  -> HTTP exit span 从当前 context 创建
  -> 成功挂到原 Trace

换句话说:

延迟 res.json 的关键作用,不是等待所有后续任务创建完,
而是让第一个 await 对应的异步资源在原始响应逻辑执行前先创建并继承原请求上下文。

第一个 await pool.query 像一根“上下文接力棒”。它在 originalJson 之前创建,继承到了带入口 span 的 async store。后续 handler 从这个 Promise 恢复时,继续沿着这份 async store 执行,所以后面的 PG 查询和下游 HTTP 调用都能从 ContextManager.current 找到原 Trace。

7. 延迟后清理逻辑是否失效

不能简单说 SkyWalking 清理逻辑失效。更准确的说法是:

清理仍然执行了,但清理的是 originalJson 所在 setImmediate 执行链的上下文副本;
业务 handler 后续恢复使用的是第一个 await 创建时继承的另一份上下文副本。

延迟后会形成两条执行链:

业务链路:
handler -> await pool.query 创建异步资源 -> query resolve -> handler 继续 -> fetchDownstream

响应发送链路:
setImmediate -> originalJson -> res.emit -> SkyWalking span.async/span.stop

originalJson 触发的 span.async() / span.stop() 会调用 ContextManager.clear(span),但它清理的是 setImmediate 当前执行链上的 span 栈副本。

而业务 handler 后续从 PG Promise 恢复,恢复的是 PG Promise 创建时继承的 async store。这个 store 是在 originalJson 清理之前创建的,所以仍然保留入口 span。

因此从结果上看:

对响应发送链路:清理正常执行。
对业务后续链路:这次清理没有清到它使用的上下文副本。

8. 是否会导致 span 栈越来越大

正常短生命周期场景下,一般不会无限增长。

原因是 AsyncLocalStorage 的 store 跟异步资源生命周期相关。后续 PG 查询、HTTP 调用、Promise 链都结束后,相关 async resource 可以被释放,对应 store 也会随之释放。

但这个方案确实有风险边界。

8.1 风险一:长生命周期异步资源

如果在保留入口 span 的上下文里创建了长生命周期资源,例如:

setInterval(...)
长期 socket
常驻 EventEmitter listener
不结束的 Promise
后台 worker

这些资源可能长期持有当时的 async store,间接持有入口 span 和 segment 对象。

8.2 风险二:Trace 语义不准确

客户端可能已经收到响应,但 SkyWalking 仍显示响应后的 PG 查询和下游调用属于同一个请求 Trace。

这对“排查请求触发了哪些后续动作”有帮助,但对“客户端实际等待耗时”不准确。

例如:

客户端实际 200ms 收到响应
下游服务耗时 10s
SkyWalking Trace 里仍可能展示完整后置调用

8.3 风险三:已结束 segment 后继续创建 span

SpanContext.newSpan 里有处理 this.finished 的逻辑。如果 context 已经 finished 但仍有 parent,会创建新 segment 并通过 reference 关联旧 segment。这说明 agent 支持“已结束 segment 后继续创建子链路”的情况,但链路形态可能会变复杂。

9. 当前验证方案的定位

当前中间件适合用于验证和小范围临时观察:

function deferJsonResponseForTrace(_req: Request, res: Response, next: NextFunction): void {
  const originalJson = res.json.bind(res)
  let jsonCalled = false

  res.json = ((body?: unknown) => {
    if (jsonCalled) {
      return res
    }

    jsonCalled = true
    setImmediate(() => {
      if (!res.headersSent) {
        originalJson(body)
      }
    })

    return res
  }) as Response['json']

  next()
}

建议只挂到指定路由:

router.post(
  '/external-upload-v2',
  mockPermissionCheck,
  deferJsonResponseForTrace,
  mockController.handleExternalUploadV2
)

不建议全局挂载。

10. 长期方案建议

10.1 如果要求严格同一 Trace

最稳妥的写法是把需要追踪的外部调用放在 res.json 前:

await pool.query(...)
await fetchDownstream(...)
res.json(responseBody)

优点:

  • Trace 语义最准确。
  • 不依赖 async context 副作用。
  • SkyWalking 展示与请求真实处理流程一致。

缺点:

  • 客户端响应会等待下游调用完成。

10.2 如果要求先响应再后台处理

更推荐将响应后的动作视为后台任务,不强行挂在原请求 Trace 下。

做法:

  • 使用 businessIdtaskIdresourceIdrequestId 等业务字段关联日志和 Trace。
  • 后台任务独立成新的 Trace。
  • 在日志中记录上下游业务 ID。

优点:

  • 语义清晰。
  • 响应耗时和后台耗时不会混在一起。
  • 不依赖 SkyWalking agent 内部实现细节。

缺点:

  • SkyWalking UI 上不是一条父子 Trace,需要通过业务 ID 关联。

10.3 如果既要先响应又要保留关联关系

可以考虑显式传播 trace context,而不是依赖延迟 res.json

思路:

  1. 响应前保存当前 trace context。
  2. 后台任务中手动创建新 span 或新 segment。
  3. 使用 reference 关联原请求。

这种方案更接近正确的异步任务追踪模型,但需要确认当前 SkyWalking Node agent 是否暴露稳定的手动 API。

11. 最终结论

本次实验和源码分析得到的最终结论:

  1. SkyWalking Node agent 使用 AsyncLocalStorage 保存当前 span 栈。
  2. PG、HTTP、Axios 插件创建 exit span 时,都会从 ContextManager.current 获取当前 context。
  3. SkyWalking 没有直接 patch res.json,而是包装了 req.emitres.emit
  4. 原始 res.json 会同步触发 Express/Node 响应逻辑,进而触发被包装的 res.emit,导致当前执行链上的入口 span 被 async()stop() 清理。
  5. 不延迟 res.json 时,res.json 后的 PG/HTTP 调用创建 span 时可能已经拿不到原请求上下文。
  6. 延迟 res.json 后,第一个 await 对应的异步资源可以在原始响应逻辑执行前创建,并继承原请求 span 栈。
  7. 后续 handler 从这个异步资源恢复后,仍然能使用原请求上下文,所以后面的 PG 查询和下游 HTTP 调用都能挂到同一个 Trace。
  8. 这不是让 SkyWalking 清理逻辑失效,而是清理发生在响应发送链路的上下文副本上,没有清到业务后续恢复链路使用的上下文副本。
  9. 该方案适合验证和小范围临时使用,不建议作为全局长期方案。
  10. 长期设计上,应优先选择“响应前完成需要追踪的调用”,或将响应后的处理建模为独立后台任务并用业务 ID 关联。