TT Bigdata TT Bigdata
首页
  • 部署专题

    • 常规安装
    • 一键部署
  • 组件安装

    • 常规&高可用
  • 版本专题

    • 更新说明
  • Ambari-Env

    • 环境准备
    • 开始使用
  • 组件编译

    • 专区—Ambari
    • 专区—Bigtop
  • 报错解决

    • 专区—Ambari
    • 专区—Bigtop
  • 其他技巧

    • Maven镜像加速
    • Gradle镜像加速
    • Bower镜像加速
    • 虚拟环境思路
    • R环境安装+一键安装脚本
    • Ivy配置私有镜像仓库
    • Node.js 多版本共存方案
    • Ambari Web本地启动
    • Npm镜像加速
    • PostgreSQL快速安装
    • Temurin JDK 23快速安装
  • 成神之路

    • 专区—Ambari
    • 专区—Bigtop
  • 集成案例

    • Redis集成教学
    • Dolphin集成教学
    • Doris集成教学
    • 持续整理...
  • 模板代码

    • 各类组件
    • 通用模板
  • 国产化&其他系统

    • Centos系列
    • Kylin系列
    • OpenEuler系列
    • Rocky系列
    • Ubuntu系列
  • 生产调优

    • 组件调优指南
    • 1v1指导调优
  • 定制开发

    • 组件版本定制
    • 样式风格定制
  • 蓝图愿景
  • 技术支持
  • 合作共建
GitHub (opens new window)

JaneTTR

数据酿造智慧,每一滴都是沉淀!
首页
  • 部署专题

    • 常规安装
    • 一键部署
  • 组件安装

    • 常规&高可用
  • 版本专题

    • 更新说明
  • Ambari-Env

    • 环境准备
    • 开始使用
  • 组件编译

    • 专区—Ambari
    • 专区—Bigtop
  • 报错解决

    • 专区—Ambari
    • 专区—Bigtop
  • 其他技巧

    • Maven镜像加速
    • Gradle镜像加速
    • Bower镜像加速
    • 虚拟环境思路
    • R环境安装+一键安装脚本
    • Ivy配置私有镜像仓库
    • Node.js 多版本共存方案
    • Ambari Web本地启动
    • Npm镜像加速
    • PostgreSQL快速安装
    • Temurin JDK 23快速安装
  • 成神之路

    • 专区—Ambari
    • 专区—Bigtop
  • 集成案例

    • Redis集成教学
    • Dolphin集成教学
    • Doris集成教学
    • 持续整理...
  • 模板代码

    • 各类组件
    • 通用模板
  • 国产化&其他系统

    • Centos系列
    • Kylin系列
    • OpenEuler系列
    • Rocky系列
    • Ubuntu系列
  • 生产调优

    • 组件调优指南
    • 1v1指导调优
  • 定制开发

    • 组件版本定制
    • 样式风格定制
  • 蓝图愿景
  • 技术支持
  • 合作共建
GitHub (opens new window)
  • 方法论

  • 代码生命周期-metainfo

  • 架构剖析

    • server与agent协作详解[一]
    • server与agent协作详解[二]
      • 3. 实操与代码解析 🔧
        • 3.1 代码详解
        • 3.1.1 流程步骤梳理
        • 3.1.1.1 指令发布:AgentCommandsPublisher
        • 3.1.1.2 指令分发:STOMPUpdatePublisher
        • 3.1.1.3 消息映射与分发:DefaultMessageEmitter
        • 3.1.1.4 emitMessageToHost 解析
        • 3.1.1.5 convertAndSendToUser 解析
        • 3.1.1.6 Agent 端事件订阅
        • 3.1.2 流程穿透
        • 3.1.2.2 文字分步说明
      • 4. 总结与延伸学习 🚀
        • 4.1 内容回顾
        • 4.2 后续学习建议
    • ambari install逻辑详解[一]
    • ambari install逻辑详解[二]
    • ambari install逻辑详解[三]
    • stack-hooks逻辑详解[一]
    • stack-hooks逻辑详解[二]
    • distro-select逻辑详解
    • java 请求过程解读[一]
    • java 请求过程解读[二]
    • java 请求过程解读[三]
    • java 请求过程泛化及补充[一]
    • java 请求过程泛化及补充[二]
  • UI样式

  • GOD-Ambari
  • 架构剖析
JaneTTR
2025-06-02
目录

server与agent协作详解[二]

# 3. 实操与代码解析 🔧

# 3.1 代码详解

# 3.1.1 流程步骤梳理

Ambari Server 与 Agent 指令交互大致分为如下几个核心环节,每一环都在源码中有明确分工:

# 3.1.1.1 指令发布:AgentCommandsPublisher

指令发布主逻辑位于 AgentCommandsPublisher.java,关键方法为 sendAgentCommand:

public void sendAgentCommand(Multimap<Long, AgentCommand> agentCommands) throws AmbariRuntimeException {
    try {
        threadPools.getAgentPublisherCommandsPool().submit(() -> {
            executionCommandsClusters.entrySet().stream().parallel().forEach(entry -> {
                STOMPUpdatePublisher.publish(new ExecutionCommandEvent(
                    entry.getKey(),
                    agentConfigsHolder.initializeDataIfNeeded(entry.getKey(), true).getTimestamp(),
                    entry.getValue()
                ));
            });
        }).get();
    } catch (InterruptedException|ExecutionException e) {
        LOG.error("Exception on sendAgentCommand", e);
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
# 3.1.1.2 指令分发:STOMPUpdatePublisher

image-20241119090655967

在 STOMPUpdatePublisher 内,publish 方法负责事件类型判断与投递:

public void publish(STOMPEvent event) {
    if (DefaultMessageEmitter.DEFAULT_AGENT_EVENT_TYPES.contains(event.getType())) {
      publishAgent(event);
    } else if (DefaultMessageEmitter.DEFAULT_API_EVENT_TYPES.contains(event.getType())) {
      publishAPI(event);
    } else {
      throw new AmbariRuntimeException("Event with type {" + event.getType() + "} can not be published.");
    }
  }

private void publishAgent(STOMPEvent event) {
    agentEventBus.post(event);
}
1
2
3
4
5
6
7
8
9
10
11
12
13
  • 使用 agentEventBus(Guava EventBus)线程安全分发事件,驱动后续链路。
# 3.1.1.3 消息映射与分发:DefaultMessageEmitter

DefaultMessageEmitter 负责对事件类型做进一步判断,将消息推送到目标路径,分“全体广播”与“定向推送”两类:

image-20241126084844891

@Override
public void emitMessage(STOMPEvent event) throws AmbariException {
  if (StringUtils.isEmpty(getDestination(event))) {
    throw new MessageDestinationIsNotDefinedException(event.getType());
  }
  if (event instanceof STOMPHostEvent) {
    STOMPHostEvent hostUpdateEvent = (STOMPHostEvent) event;
    if (hostUpdateEvent.getType().equals(STOMPEvent.Type.COMMAND)) {
      emitMessageRetriable((ExecutionCommandEvent) hostUpdateEvent);
    } else {
      emitMessageToHost(hostUpdateEvent);
    }
  } else {
    emitMessageToAll(event);
  }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16

image-20241126104050513

  • DEFAULT_DESTINATIONS 存储事件类型和路径映射,如:

    put(STOMPEvent.Type.COMMAND, "/commands");
    
    1

# 3.1.1.4 emitMessageToHost 解析

image-20241126143459536

emitMessageToHost 专门负责发消息到目标主机:

protected void emitMessageToHost(STOMPHostEvent event) throws HostNotRegisteredException {
  Long hostId = event.getHostId();
  String sessionId = agentSessionManager.getSessionId(hostId);
  LOG.debug("Received status update event {} for host {} registered with session ID {}", event, hostId, sessionId);
  MessageHeaders headers = createHeaders(sessionId);
  simpMessagingTemplate.convertAndSendToUser(sessionId, getDestination(event), event, headers);
}
1
2
3
4
5
6
7
  • 先查找 Agent 的 sessionId,再通过 simpMessagingTemplate 定向发送。
# 3.1.1.5 convertAndSendToUser 解析

image-20241126150430292

convertAndSendToUser 负责按 /user/{session}/commands 路径推送消息:

@Override
public void convertAndSendToUser(String user, String destination, Object payload,
        @Nullable Map<String, Object> headers, @Nullable MessagePostProcessor postProcessor)
        throws MessagingException {
    // ...参数校验
    super.convertAndSend(this.destinationPrefix + user + destination, payload, headers, postProcessor);
}
1
2
3
4
5
6
7

image-20241126144259108

  • 自动加 /user/ 前缀和 sessionId 路径,保证安全与隔离。
# 3.1.1.6 Agent 端事件订阅

Agent 端监听 Server 指令,Python 代码结构如下:

image-20241126152230378

  • 监听 /user/commands 路径。相关变量定义:
COMMANDS_TOPIC = '/user/commands'
# ... 省略其它 topic 常量
1
2

image-20241126162934870

def on_message(self, headers, message):
  if not 'destination' in headers:
    logger.warn("Received event from server which does not contain 'destination' header")
    return

  destination = headers['destination']
  if destination.rstrip('/') == self.get_handled_path().rstrip('/'):
    # 进入处理
1
2
3
4
5
6
7
8
  • 通过 on_message 针对 destination 路径判断是否处理。

image-20241126152425598

class CommandsEventListener(EventListener):
  def on_event(self, headers, message):
    commands = []
    cancel_commands = []
    for cluster_id in message['clusters'].keys():
      cluster_dict = message['clusters'][cluster_id]
      if 'commands' in cluster_dict:
        commands += cluster_dict['commands']
      if 'cancelCommands' in cluster_dict:
        cancel_commands += cluster_dict['cancelCommands']
    for command in commands:
      command['requiredConfigTimestamp'] = message['requiredConfigTimestamp']
    with self.action_queue.lock:
      self.action_queue.cancel(cancel_commands)
      self.action_queue.put(commands)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
  • 事件会被推入 action_queue 队列,后续被消费。

image-20241126152723188

# 3.1.2 流程穿透

下面用“图 + 分步文本”把上述全链路完整梳理:

1. 指令生成:
   AgentCommandsPublisher.sendAgentCommand
            ↓
2. 事件发布:
   STOMPUpdatePublisher.publish
            ↓
3. 监听触发:
   EventBus.post --> STOMPUpdateListener.onUpdateEvent
            ↓
4. 消息发送:
   DefaultMessageEmitter.emitMessage
            ↓
   若指定主机 --> emitMessageToHost --> SimpMessagingTemplate.convertAndSendToUser
            ↓
5. Agent端处理:
   STOMP Listener.on_message --> CommandsEventListener.on_event --> action_queue.put
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
# 3.1.2.2 文字分步说明
  1. Server 端:

    • sendAgentCommand 生成指令事件。
    • 由 STOMPUpdatePublisher 投递至 EventBus,驱动监听链路。
  2. 事件监听器:

    • STOMPUpdateListener 捕获并调用 DefaultMessageEmitter 进入路径生成和消息发送环节。
  3. 消息发送:

    • 针对目标主机,通过 emitMessageToHost 拼接 session 路径,交给 SimpMessagingTemplate.convertAndSendToUser 发送。
  4. Agent 端:

    • Python 端通过 on_message 监听消息,将指令通过 CommandsEventListener.on_event 推入 action_queue,等待本地消费。

# 4. 总结与延伸学习 🚀

# 4.1 内容回顾

本章我们详细剖析了 STOMP 协议在 Ambari Server-Agent 指令分发链路中的源码与实现:

  • 指令如何自上而下生成、路由、映射目标路径并安全分发;
  • Agent 端如何监听、判别和推送到本地队列,最终触发命令消费;
  • 全链路关键点用图片、注释与代码结合解读,便于查阅和实践。

# 4.2 后续学习建议

  1. 源码实战 建议结合源码断点和日志分析实际 Install、Start、Stop 等指令的分发执行;
  2. 定制开发 理解消息分发机制后,可安全扩展自定义指令或特定业务事件,实现更灵活的企业运维自动化;
  3. 深挖原理 推荐继续学习 ambari-stomp、Guava EventBus 等实现细节,提升分布式系统设计能力。
#Ambari#commandScript#STOMP#Server-Agent#源码解析#消息总线#运维自动化
server与agent协作详解[一]
ambari install逻辑详解[一]

← server与agent协作详解[一] ambari install逻辑详解[一]→

最近更新
01
Pandoc 缺失导致 SparkR 构建失败
06-08
02
Cyrus SASL/GSASL 缺失解决
06-07
03
Hadoop_3.3.4 编译实战 1.0.0+
06-06
更多文章>
Theme by Vdoing | Copyright © 2017-2025 JaneTTR | MIT License
  • 跟随系统
  • 浅色模式
  • 深色模式
  • 阅读模式