[/metrics] — 数据写入代码整体逻辑
# 一、入口与分流机制 Controller
结论先行
/metrics 写入有两条路径:
首条 metric 的
appId
为 amssmoketestfake →putMetricsSkipCache
(跳过缓存 ,直接落库)。直写旁路其它
appId
→putMetrics
(进入 insertCache 队列,批量落库)。常规写入
@Path("/metrics")
@POST
@Consumes({MediaType.APPLICATION_JSON})
public TimelinePutResponse postMetrics(@Context HttpServletRequest req,
@Context HttpServletResponse res,
TimelineMetrics metrics) {
init(res);
if (metrics == null) return new TimelinePutResponse();
try {
if (CollectionUtils.isNotEmpty(metrics.getMetrics())
&& metrics.getMetrics().get(0).getAppId().equals(SMOKETEST_METRIC_APP_ID)) {
return timelineMetricStore.putMetricsSkipCache(metrics);
} else {
return timelineMetricStore.putMetrics(metrics);
}
} catch (Exception e) {
LOG.error("Error saving metrics.", e);
throw new WebApplicationException(e, Response.Status.INTERNAL_SERVER_ERROR);
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
为什么需要“直写旁路”?
压测/自检/链路排障时,需要去除缓存影响来观察端到端时延与真实写入能力;因此为特定 appId 预留不入队、直接落库的快速路径。
# 二、两条路径本质差异与适用场景 对比
维度 | putMetricsSkipCache(直写) | putMetrics(缓存写) |
---|---|---|
路径 | 绕过 insertCache ,直接 commitMetrics | 入队 insertCache ,消费线程批量 commitMetricsFromCache |
写入延迟 | 最低(近实时) | 取决于队列深度/批量阈值/提交周期 |
抗抖动 | 较弱,易出现写放大 | 削峰填谷、背压可控,稳定性更强 |
典型用途 | 烟囱测试、链路连通、关键指标时延验证 | 生产常态、吞吐优先、成本可控 |
# 三、Service 主流程(含过滤与元数据) Service
# 1、核心方法骨架
public void insertMetricRecordsWithMetadata(TimelineMetricMetadataManager metadataManager,
TimelineMetrics metrics, boolean skipCache)
throws SQLException, IOException {
List<TimelineMetric> timelineMetrics = metrics.getMetrics();
if (timelineMetrics == null || timelineMetrics.isEmpty()) {
LOG.debug("Empty metrics insert request.");
return;
}
for (Iterator<TimelineMetric> it = timelineMetrics.iterator(); it.hasNext();) {
TimelineMetric tm = it.next();
boolean acceptMetric = TimelineMetricsFilter.acceptMetric(tm);
if (supportMultipleClusterMetrics && StringUtils.isEmpty(tm.getInstanceId())) {
tm.setInstanceId(defaultInstanceId);
}
if (metadataManager != null) {
metadataManager.putIfModifiedTimelineMetricMetadata(
metadataManager.createTimelineMetricMetadata(tm, acceptMetric));
metadataManager.putIfModifiedHostedAppsMetadata(tm.getHostName(), tm.getAppId());
if (!tm.getAppId().equals("FLUME_HANDLER")) {
metadataManager.putIfModifiedHostedInstanceMetadata(tm.getInstanceId(), tm.getHostName());
}
}
if (!acceptMetric) it.remove();
}
if (!skipCache && cacheEnabled) {
if (insertCache.size() >= cacheSize) {
commitMetricsFromCache();
}
insertCache.put(metrics); // 可能阻塞(背压点)
} else {
commitMetrics(metrics); // 直写
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
# 2、请求体与方法内取值
{
"metrics": [
{
"instanceid": "",
"hostname": "dev1",
"metrics": {
"1758095682384": 7,
"1758095682484": 8,
"1758095682584": 9,
"1758095682684": 99,
"1758095682784": 10
},
"starttime": 1758095682384,
"appid": "amssmoketestfake",
"metricname": "ttbigdata.test"
}
]
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
List<TimelineMetric> timelineMetrics = metrics.getMetrics();
1
// Make sure to add code blocks to your code group
# 3、过滤与元数据回填机制
黑/白名单过滤:
TimelineMetricsFilter.acceptMetric(tm)
决定保留与否;不通过则剔除。过滤原理可以参考
元数据回填:
putIfModifiedTimelineMetricMetadata(...)
:指标维度元数据putIfModifiedHostedAppsMetadata(host, appId)
:宿主-应用映射putIfModifiedHostedInstanceMetadata(instanceId, host)
:宿主-实例映射(非 FLUME_HANDLER 才写) 策略为“若有变更再写”:先写元数据缓存,必要时再落库,保证初启/瞬时流量也能尽快补齐元信息。
# 四、multi-cluster 兜底与配置读取 配置
# 1、multi-cluster 兜底逻辑
当启用 timeline.metrics.support.multiple.clusters
且 instanceId
为空时,会补齐 defaultInstanceId
;常规单集群默认
不开启,因此不会触发补偿。
# 2、配置读取与常见键位
- 多集群:
timeline.metrics.support.multiple.clusters
- 缓存开关:常见为
timeline.metrics.cache.enabled
(命名以环境为准) - 队列容量/批量/提交周期:与
insertCache
、消费者线程相关(命名依实现而定)
说明
若搜索不到对应 key,则按默认值处理(如 multi-cluster 默认 false),具体以源码/默认配置为准。
# 五、缓存与直写的触发条件(含背压点位) Cache
if (!skipCache && cacheEnabled) {
if (insertCache.size() >= cacheSize) {
commitMetricsFromCache(); // 触发一次批量提交(可视为“快照式”减压)
}
insertCache.put(metrics); // 背压点:队列满时阻塞,保护下游
} else {
commitMetrics(metrics); // 直写:无等待,竞争落库资源
}
1
2
3
4
5
6
7
8
2
3
4
5
6
7
8
关键点
- amssmoketestfake →
skipCache=true
→ 必走直写。 - 常规 appId → 由
cacheEnabled
决定:true 入队,false 直写。
# 1、常见问题定位(速查)
症状 | 可能原因 | 快速验证 | 处置建议 |
---|---|---|---|
写入端延迟高 | 队列背压/批量过大 | 观察 insertCache 长度与消费者速率 | 降低批量、增线程、提队列 |
丢点 | 命中过滤/上游丢失 | 查 acceptMetric 与请求体完整性 | 放宽过滤、补齐字段 |
热点 Region | 单 key 过热/批量合并不均 | HBase RegionServer 指标 | 调整 rowkey 设计或批量策略 |
元数据告警 | 初启时回填未完成 | 观察 putIfModified* 触发情况 | 等待回填或预热元数据 |
# 2、配置建议(参考)
- 生产常态:优先缓存写;逐步调高队列容量/消费者线程/批量阈值,在 P95/P99 可接受前提下压低总写放大。
- 链路验证:短时启用直写(或使用 amssmoketestfake),定位端到端瓶颈后回归缓存策略。
- 01
- [/metrics/aggregated] — 聚合数据范围 检查点09-19
- 02
- [/metrics] — 反向分析接口参数 请求抓包09-17
- 03
- [/metrics] — 普通指标写入方法 POST09-17