TT Bigdata TT Bigdata
首页
  • 部署专题

    • 常规安装
    • 一键部署
  • 组件专题

    • 安装教程
    • 魔改分享
  • 版本专题

    • 更新说明
    • BUG临时处理
  • Ambari-Env

    • 环境准备
    • 开始使用
  • 组件编译

    • 专区—Ambari
    • 专区—Bigtop-官方组件
    • 专区—Bigtop-扩展组件
  • 报错解决

    • 专区—Ambari
    • 专区—Bigtop
  • 其他技巧

    • APT仓库增量更新
    • Maven镜像加速
    • Gradle镜像加速
    • Bower镜像加速
    • 虚拟环境思路
    • R环境安装+一键安装脚本
    • Ivy配置私有镜像仓库
    • Node.js 多版本共存方案
    • Ambari Web本地启动
    • Npm镜像加速
    • PostgreSQL快速安装
    • Temurin JDK 23快速安装
  • 成神之路

    • 专区—Ambari
    • 专区—Ambari-Metrics
    • 专区—Bigtop
  • 集成案例

    • Redis集成教学
    • Dolphin集成教学
    • Doris集成教学
    • 持续整理...
  • 核心代码

    • 各组件代码
    • 通用代码模板
  • 国产化&其他系统

    • Rocky系列
    • Ubuntu系列
  • Grafana监控方案

    • Ambari-Metrics插件
    • Infinity插件
  • 支持&共建

    • 蓝图愿景
    • 合作共建
登陆
GitHub (opens new window)

JaneTTR

数据酿造智慧,每一滴都是沉淀!
首页
  • 部署专题

    • 常规安装
    • 一键部署
  • 组件专题

    • 安装教程
    • 魔改分享
  • 版本专题

    • 更新说明
    • BUG临时处理
  • Ambari-Env

    • 环境准备
    • 开始使用
  • 组件编译

    • 专区—Ambari
    • 专区—Bigtop-官方组件
    • 专区—Bigtop-扩展组件
  • 报错解决

    • 专区—Ambari
    • 专区—Bigtop
  • 其他技巧

    • APT仓库增量更新
    • Maven镜像加速
    • Gradle镜像加速
    • Bower镜像加速
    • 虚拟环境思路
    • R环境安装+一键安装脚本
    • Ivy配置私有镜像仓库
    • Node.js 多版本共存方案
    • Ambari Web本地启动
    • Npm镜像加速
    • PostgreSQL快速安装
    • Temurin JDK 23快速安装
  • 成神之路

    • 专区—Ambari
    • 专区—Ambari-Metrics
    • 专区—Bigtop
  • 集成案例

    • Redis集成教学
    • Dolphin集成教学
    • Doris集成教学
    • 持续整理...
  • 核心代码

    • 各组件代码
    • 通用代码模板
  • 国产化&其他系统

    • Rocky系列
    • Ubuntu系列
  • Grafana监控方案

    • Ambari-Metrics插件
    • Infinity插件
  • 支持&共建

    • 蓝图愿景
    • 合作共建
登陆
GitHub (opens new window)
  • 试读&介绍

  • Ambari-Metrics解读【简写AMS】

    • 源码下载及环境初始化
    • 项目目录及模块解读
    • AMS-Collector剖析

    • AMS-Collector表结构实战

    • AMS-Collector-元数据-接口实战

    • AMS-Collector-指标查询-接口实战

      • [/metrics] — 监控数据接口查询方法
      • [/metrics] — 请求参数概括及详解索引
      • [/metrics] — Service 代码整体逻辑概览
      • [/metrics] — metricNames 生命周期
      • [/metrics] — seriesAggregateFunc生命周期
        • 一、聚合函数定义
        • 二、应用逻辑:聚合函数实例化
        • 三、抽象类 apply 逻辑
        • 四、运行示例猜想
          • Step1: 合并同一时间戳的值
          • Step2: 应用 AVG 聚合函数
          • Step3: 生成新 TimelineMetric
          • Step4: 替换不同聚合函数效果对比
        • 五、运行示例实践
          • 1. 不带聚合函数的请求
          • 2. 带聚合函数的请求(AVG)
          • 3. 对比结论
      • [/metrics] — getUuidsForGetMetricQuery精讲
      • [/metrics] — applyTopNCondition精讲
      • [/metrics] — getAggregateMetricRecords精讲
      • [/metrics] — getMetricRecords精讲
      • [/metrics] — 临时指标精讲
    • AMS-Collector-普通指标写入-接口实战

    • AMS-Collector-聚合指标写入-接口实战

  • GOD-Ambari-Metrics
  • Ambari-Metrics解读【简写AMS】
  • AMS-Collector-指标查询-接口实战
JaneTTR
2025-09-15
目录

[/metrics] — seriesAggregateFunc生命周期

# 一、聚合函数定义

在 Ambari-Metrics 的 /metrics 查询中,如果传入了 seriesAggregateFunction 参数,最终会走到下面的代码逻辑。
聚合函数的定义非常明确,只有四种:

public enum SeriesAggregateFunction {
  AVG, MIN, MAX, SUM;

  public static boolean isPresent(String functionName) {
    try {
      SeriesAggregateFunction.valueOf(functionName.toUpperCase());
    } catch (IllegalArgumentException e) {
      return false;
    }
    return true;
  }

  public static SeriesAggregateFunction getFunction(String functionName) throws Function.FunctionFormatException {
    try {
      return SeriesAggregateFunction.valueOf(functionName.toUpperCase());
    } catch (NullPointerException | IllegalArgumentException e) {
      throw new Function.FunctionFormatException(
            "Function should be sum, avg, min, max. Got " + functionName, e);
    }
  }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21

注意

这意味着用户只能选择 sum、avg、min、max 四种聚合方式。 如果传入其他字符串,比如 count,就会抛出异常。

# 二、应用逻辑:聚合函数实例化

获取到函数类型之后,会由工厂类 TimelineMetricsSeriesAggregateFunctionFactory 选择对应的实现类。

调用链路如下图所示:

image-20250915104035875

具体代码:


package org.apache.ambari.metrics.core.timeline.function;

import org.apache.ambari.metrics.core.timeline.aggregators.Function;

public class TimelineMetricsSeriesAggregateFunctionFactory {
  private TimelineMetricsSeriesAggregateFunctionFactory() {
  }

  public static TimelineMetricsSeriesAggregateFunction newInstance(SeriesAggregateFunction func) {
    switch (func) {
    case AVG:
      return new TimelineMetricsSeriesAvgAggregateFunction();
    case MIN:
      return new TimelineMetricsSeriesMinAggregateFunction();
    case MAX:
      return new TimelineMetricsSeriesMaxAggregateFunction();
    case SUM:
      return new TimelineMetricsSeriesSumAggregateFunction();
    default:
      throw new Function.FunctionFormatException("Function should be sum, avg, min, max. Got " +
          func.name());
    }
  }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25

这里的 seriesAggrFuncInstance 是如下几种之一:

  • TimelineMetricsSeriesAvgAggregateFunction
  • TimelineMetricsSeriesMinAggregateFunction
  • TimelineMetricsSeriesMaxAggregateFunction
  • TimelineMetricsSeriesSumAggregateFunction

提示

它们都继承自抽象类 AbstractTimelineMetricsSeriesAggregateFunction,并通过实现 applyFunction(values) 来定义各自的计算方式。

# 三、抽象类 apply 逻辑

抽象类统一了数据处理流程,不同函数只是计算方式不同。核心代码如下:

image-20250915104852164

@Override
public TimelineMetric apply(TimelineMetrics timelineMetrics) {
  Set<String> metricNameSet = new TreeSet<>();
  Set<String> hostNameSet = new TreeSet<>();
  Set<String> appIdSet = new TreeSet<>();
  Set<String> instanceIdSet = new TreeSet<>();
  TreeMap<Long, List<Double>> metricValues = new TreeMap<>();

  // Step1: 遍历所有输入的 Metric
  for (TimelineMetric timelineMetric : timelineMetrics.getMetrics()) {
    metricNameSet.add(timelineMetric.getMetricName());
    addToSetOnlyNotNull(hostNameSet, timelineMetric.getHostName());
    addToSetOnlyNotNull(appIdSet, timelineMetric.getAppId());
    addToSetOnlyNotNull(instanceIdSet, timelineMetric.getInstanceId());

    // Step2: 按 timestamp 聚合所有的数值
    for (Map.Entry<Long, Double> metricValue : timelineMetric.getMetricValues().entrySet()) {
      Long timestamp = metricValue.getKey();
      Double value = metricValue.getValue();
      if (!metricValues.containsKey(timestamp)) {
        metricValues.put(timestamp, new LinkedList<Double>());
      }
      metricValues.get(timestamp).add(value);
    }
  }

  // Step3: 对同一时间戳的多值应用聚合函数
  TreeMap<Long, Double> aggregatedMetricValues = new TreeMap<>();
  for (Map.Entry<Long, List<Double>> metricValue : metricValues.entrySet()) {
    List<Double> values = metricValue.getValue();
    if (values.size() == 0) {
      throw new IllegalArgumentException("count of values should be more than 0");
    }
    aggregatedMetricValues.put(metricValue.getKey(), applyFunction(values));
  }

  // Step4: 组装成新的 TimelineMetric
  TimelineMetric timelineMetric = new TimelineMetric();
  timelineMetric.setMetricName(getMetricName(metricNameSet.iterator()));
  timelineMetric.setHostName(joinStringsWithComma(hostNameSet.iterator()));
  timelineMetric.setAppId(joinStringsWithComma(appIdSet.iterator()));
  timelineMetric.setInstanceId(joinStringsWithComma(instanceIdSet.iterator()));
  if (aggregatedMetricValues.size() > 0) {
    timelineMetric.setStartTime(aggregatedMetricValues.firstKey());
  }
  timelineMetric.setMetricValues(aggregatedMetricValues);
  return timelineMetric;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48

笔记

从这里我们可以看到,聚合逻辑完全是先收集 → 后计算 → 再组装的模式,保证了框架的可扩展性。 未来若要新增聚合函数,只需要扩展 applyFunction 即可。

# 四、运行示例猜想

为了便于理解聚合函数的执行效果,我们先构造一个简单场景:

假设查询的是 CPU 使用率,同时输入了两个主机的数据:

metricName: cpu_usage
host: host1
values: { 1000: 40.0, 2000: 60.0 }

metricName: cpu_usage
host: host2
values: { 1000: 20.0, 2000: 80.0 }
1
2
3
4
5
6
7

# Step1: 合并同一时间戳的值

1000 → [40.0, 20.0]
2000 → [60.0, 80.0]
1
2

# Step2: 应用 AVG 聚合函数

1000 → avg(40.0, 20.0) = 30.0
2000 → avg(60.0, 80.0) = 70.0
1
2

# Step3: 生成新 TimelineMetric

{
  "metricName": "cpu_usage",
  "hostName": "host1,host2",
  "metricValues": {
    "1000": 30.0,
    "2000": 70.0
  }
}
1
2
3
4
5
6
7
8

# Step4: 替换不同聚合函数效果对比

  • MAX → 1000 → 40.0, 2000 → 80.0
  • MIN → 1000 → 20.0, 2000 → 60.0
  • SUM → 1000 → 60.0, 2000 → 140.0

提示

由此可见,seriesAggregateFunction 会在同一时间戳维度上,将多主机的数值进行归并计算。

# 五、运行示例实践

上面是理论推演,接下来我们用 真实的 curl 请求来验证。

# 1. 不带聚合函数的请求

curl --location --request GET 'http://dev1:6188/ws/v1/timeline/metrics?metricNames=load_five1&appId=HOST&hostname=dev1,dev2&startTime=1757640600000&endTime=1757640900000' \
--header 'User-Agent: Apifox/1.0.0 (https://apifox.com)' \
--header 'Accept: */*' \
--header 'Host: dev1:6188' \
--header 'Connection: keep-alive'
1
2
3
4
5

我们来看下参数:

image-20250915125645487

返回结果如下(每个主机单独返回一条数据):

{
  "metrics": [
    {
      "metricname": "load_five",
      "appid": "HOST",
      "hostname": "dev1",
      "timestamp": 0,
      "starttime": 1757640899000,
      "metrics": {
        "1757640899000": 1.3068129595588236
      },
      "metadata": {}
    },
    {
      "metricname": "load_five",
      "appid": "HOST",
      "hostname": "dev2",
      "timestamp": 0,
      "starttime": 1757640899000,
      "metrics": {
        "1757640899000": 0.2931640625
      },
      "metadata": {}
    }
  ]
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26

这里 dev1 和 dev2 的数值分别展示,没有做聚合。

# 2. 带聚合函数的请求(AVG)

我们在参数中加入 seriesAggregateFunction=avg:

image-20250915125956885

对应的请求:

curl --location --request GET 'http://dev1:6188/ws/v1/timeline/metrics?metricNames=load_five&appId=HOST&hostname=dev1,dev2&startTime=1757640600000&endTime=1757640900000&seriesAggregateFunction=avg' \
--header 'User-Agent: Apifox/1.0.0 (https://apifox.com)' \
--header 'Accept: */*' \
--header 'Host: dev1:6188' \
--header 'Connection: keep-alive'
1
2
3
4
5

返回结果如下:

{
  "metrics": [
    {
      "metricname": "AVG(load_five)",
      "appid": "HOST",
      "instanceid": "",
      "hostname": "dev1,dev2",
      "timestamp": 0,
      "starttime": 1757640899000,
      "metrics": {
        "1757640899000": 0.7999885110294118
      },
      "metadata": {}
    }
  ]
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16

这里可以看到:

  • metricName 被自动改为 AVG(load_five)
  • hostname 合并成 dev1,dev2
  • 指标值为 两台主机的平均值 (1.3068 + 0.2931) / 2 ≈ 0.7999

# 3. 对比结论

  • 不带聚合函数 → 返回多条指标,每台主机一条
  • 带聚合函数 → 返回一条合并结果,按时间戳聚合

笔记

这验证了我们前面“运行示例猜想”的推演: seriesAggregateFunction 的确会将多个主机的指标值收敛到一条曲线,极大地方便了跨主机监控的对比与分析。

#Ambari#Ambari-Metrics#TimelineService#聚合函数#Controller
[/metrics] — metricNames 生命周期
[/metrics] — getUuidsForGetMetricQuery精讲

← [/metrics] — metricNames 生命周期 [/metrics] — getUuidsForGetMetricQuery精讲→

最近更新
01
[/metrics/aggregated] — 聚合数据范围 检查点
09-19
02
[/metrics] — 反向分析接口参数 请求抓包
09-17
03
[/metrics] — 普通指标写入方法 POST
09-17
更多文章>
Theme by Vdoing | Copyright © 2017-2025 JaneTTR | MIT License
  • 跟随系统
  • 浅色模式
  • 深色模式
  • 阅读模式