osSpecifics详解[二]
# 3.2.2.3 Service 层定位与接口梳理
通过对资源类型的进一步分析,我们可以清楚地看到,Ambari RESTful 框架是如何将前端请求路由到对应的服务模块的。 这一机制是支撑大数据集群“按需调度、自动适配”的基石,它使得用户通过简单的 API,即可实现对组件和主机组件的灵活管理。
服务名称 | 请求类型 | 说明 |
---|---|---|
ComponentService | GET | 获取服务组件的详细信息 |
HostComponentService | GET | 获取主机上各组件及其状态、配置等信息 |
提示
这种细粒度的资源设计让平台具备更高的可观测性和自动化管理能力,支持从服务级到主机级、从单一组件到批量操作的全链路运维需求。
# 源码结构及接口分发解读
Ambari 的 RESTful 层以 Java 注解方式清晰标识 API 路由,服务端统一使用 handleRequest 方法做参数转换、权限校验、业务分发和响应格式统一处理:
public class ComponentService extends BaseService {
/
* Handles GET: /clusters/{clusterID}/services/{serviceID}/components/{componentID}
* Get a specific component.
*
* @param headers http headers
* @param ui uri info
* @param componentName component id
* @return a component resource representation
*/
@GET @ApiIgnore // until documented
@Path("{componentName}")
@Produces("text/plain")
public Response getComponent(String body, @Context HttpHeaders headers, @Context UriInfo ui,
@PathParam("componentName") String componentName, @QueryParam("format") String format) {
if (format != null && format.equals("client_config_tar")) {
return createClientConfigResource(body, headers, ui, componentName);
}
return handleRequest(headers, body, ui, Request.Type.GET,
createComponentResource(m_clusterName, m_serviceName, componentName));
}
/
* Handles GET: /clusters/{clusterID}/services/{serviceID}/components
* Get all components for a service.
*
* @param headers http headers
* @param ui uri info
* @return component collection resource representation
*/
@GET @ApiIgnore // until documented
@Produces("text/plain")
public Response getComponents(String body, @Context HttpHeaders headers, @Context UriInfo ui,
@QueryParam("format") String format) {
if (format != null && format.equals("client_config_tar")) {
return createClientConfigResource(body, headers, ui, null);
}
return handleRequest(headers, body, ui, Request.Type.GET,
createComponentResource(m_clusterName, m_serviceName, null));
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
同理,HostComponentService
负责主机级组件查询:
public class HostComponentService extends BaseService {
/
* Handles GET /clusters/{clusterID}/hosts/{hostID}/host_components/{hostComponentID}
* Get a specific host_component.
*
* @param headers http headers
* @param ui uri info
* @param hostComponentName host_component id
* @return host_component resource representation
*/
@GET
@Path("{hostComponentName}")
@Produces(MediaType.TEXT_PLAIN)
@ApiOperation(value = "Get single host component for a host", response = HostComponentSwagger.class)
@ApiImplicitParams({
@ApiImplicitParam(name = QUERY_FIELDS, value = QUERY_FILTER_DESCRIPTION, dataType = DATA_TYPE_STRING, paramType = PARAM_TYPE_QUERY),
})
@ApiResponses({
@ApiResponse(code = HttpStatus.SC_OK, message = MSG_SUCCESSFUL_OPERATION),
@ApiResponse(code = HttpStatus.SC_NOT_FOUND, message = MSG_CLUSTER_OR_HOST_NOT_FOUND),
@ApiResponse(code = HttpStatus.SC_UNAUTHORIZED, message = MSG_NOT_AUTHENTICATED),
@ApiResponse(code = HttpStatus.SC_FORBIDDEN, message = MSG_PERMISSION_DENIED),
@ApiResponse(code = HttpStatus.SC_INTERNAL_SERVER_ERROR, message = MSG_SERVER_ERROR),
@ApiResponse(code = HttpStatus.SC_BAD_REQUEST, message = MSG_INVALID_ARGUMENTS),
})
public Response getHostComponent(String body, @Context HttpHeaders headers, @Context UriInfo ui,
@PathParam("hostComponentName") String hostComponentName, @QueryParam("format") String format) {
//todo: needs to be refactored when properly handling exceptions
if (m_hostName == null) {
// don't allow case where host is not in url but a host_component instance resource is requested
String s = "Invalid request. Must provide host information when requesting a host_resource instance resource.";
return Response.status(400).entity(s).build();
}
if (format != null && format.equals("client_config_tar")) {
return createClientConfigResource(body, headers, ui, hostComponentName);
}
return handleRequest(headers, body, ui, Request.Type.GET,
createHostComponentResource(m_clusterName, m_hostName, hostComponentName));
}
/
* Handles GET /clusters/{clusterID}/hosts/{hostID}/host_components/
* Get all host components for a host.
*
* @param headers http headers
* @param ui uri info
* @return host_component collection resource representation
*/
@GET
@Path("") // This is needed if class level path is not present otherwise no Swagger docs will be generated for this method
@Produces(MediaType.TEXT_PLAIN)
@ApiOperation(value = "Get all host components for a host", response = HostComponentSwagger.class, responseContainer = RESPONSE_CONTAINER_LIST)
@ApiImplicitParams({
@ApiImplicitParam(name = QUERY_FIELDS, value = QUERY_FILTER_DESCRIPTION, dataType = DATA_TYPE_STRING, paramType = PARAM_TYPE_QUERY),
})
@ApiResponses({
@ApiResponse(code = HttpStatus.SC_OK, message = MSG_SUCCESSFUL_OPERATION),
@ApiResponse(code = HttpStatus.SC_NOT_FOUND, message = MSG_CLUSTER_OR_HOST_NOT_FOUND),
@ApiResponse(code = HttpStatus.SC_UNAUTHORIZED, message = MSG_NOT_AUTHENTICATED),
@ApiResponse(code = HttpStatus.SC_FORBIDDEN, message = MSG_PERMISSION_DENIED),
@ApiResponse(code = HttpStatus.SC_INTERNAL_SERVER_ERROR, message = MSG_SERVER_ERROR),
@ApiResponse(code = HttpStatus.SC_BAD_REQUEST, message = MSG_INVALID_ARGUMENTS),
})
public Response getHostComponents(String body, @Context HttpHeaders headers, @Context UriInfo ui, @QueryParam("format") String format) {
if (format != null && format.equals("client_config_tar")) {
return createClientConfigResource(body, headers, ui, null);
}
return handleRequest(headers, body, ui, Request.Type.GET,
createHostComponentResource(m_clusterName, m_hostName, null));
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
接口参数说明与响应机制:
- 支持 GET/PUT 多种请求方式
- URI 路径参数与 QueryParam 灵活组合
- 支持多数据格式和权限控制,保障接口一致性和安全
方法名 | URL 示例 | 说明 |
---|---|---|
getComponent | /clusters/{clusterID}/services/{serviceID}/components/{componentID} | 获取指定组件详细信息 |
getComponents | /clusters/{clusterID}/services/{serviceID}/components | 获取服务下全部组件 |
getHostComponent | /clusters/{clusterID}/hosts/{hostID}/host_components/{hostComponentID} | 获取主机组件详细信息 |
getHostComponents | /clusters/{clusterID}/hosts/{hostID}/host_components | 获取主机上全部组件 |
# 3.2.3 AmbariManagementControllerImpl 源码调用链全景
# 3.2.3.1 由服务元数据到主机任务的链路溯源
当用户通过前端 API 触发服务操作时,Ambari 内部会通过一系列方法层层下钻,确保所有组件、包、环境都精准适配到主机。* 这条链路既是平台灵活性的保证,也是源码阅读和问题定位的切入点。*
流程总结如下:
- 通过
serviceInfo.getOsSpecifics()
获取所有适配的 OS 包信息,确保服务/组件和目标主机系统环境高度兼容。 - 进入
populateServicePackagesInfo
方法,完成包与主机环境的动态绑定。 - 再调用
getPackagesForServiceHost
细化包清单。 - 随后调用
createHostAction
,为主机下发执行任务(如安装、配置、重启等)。
提示
这一链路中的每一步都支持自定义扩展和异常捕获,方便平台升级和适配新操作系统类型。
# 3.2.3.2 阶段任务调度核心——doStageCreation
在 AmbariManagementControllerImpl 内,doStageCreation
是调度所有服务操作的“调度中心”,其主要职责是:
- 结合前面分析的主机包适配情况,生成执行计划
- 调用
addStages
方法,将每个服务/主机/组件的操作下发为具体 Stage - Stage 进一步转化为数据库可持久化任务,由 Agent 拉取执行
代码链路:
关键节点 | 目标方法 | 所属类 | 说明 |
---|---|---|---|
包适配信息解析 | populateServicePackagesInfo | AmbariManagementControllerImpl | 获取主机需分发的包清单 |
任务动作创建 | createHostAction | AmbariManagementControllerImpl | 生成主机级别的操作动作 |
阶段任务生成 | doStageCreation | AmbariManagementControllerImpl | 汇总所有操作并生成 Stage |
任务持久化 | createAndPersistStages | ComponentResourceProvider | 任务最终持久化到数据库供 Agent 执行 |
# 3.2.4 ServiceResourceProvider 资源操作与分发逻辑
# 3.2.4.1 核心方法与调用顺序
ServiceResourceProvider
是服务资源的关键入口,承担了 API 层到服务操作调度的“中转站”角色。
主要流程如下:
updateResourcesAuthorized
入口,首先做请求授权和参数校验。- 进入
doUpdateResources
,筛选出需要更新/操作的服务集合。 - 通过
updateServices
调用 AmbariManagementControllerImpl 的addStages
,真正下发操作任务。 - 所有操作封装为
RequestStageContainer
对象,便于统一管理和异常捕获。
关键源码片段:
@Override
protected RequestStatus updateResourcesAuthorized(final Request request, Predicate predicate)
throws SystemException, UnsupportedPropertyException, NoSuchResourceException, NoSuchParentResourceException {
RequestStageContainer requestStages = doUpdateResources(null, request, predicate);
//此处省略部分代码
return getRequestStatus(response);
}
private RequestStageContainer doUpdateResources(final RequestStageContainer stages, final Request request, Predicate predicate)
throws UnsupportedPropertyException, SystemException, NoSuchResourceException, NoSuchParentResourceException {
//此处省略部分代码
requestStages = modifyResources(new Command<RequestStageContainer>() {
@Override
public RequestStageContainer invoke() throws AmbariException, AuthorizationException {
return updateServices(stages, requests, request.getRequestInfoProperties(),
runSmokeTest, reconfigureClients, startDependencies);
}
});
}
return requestStages;
}
protected RequestStageContainer updateServices(RequestStageContainer requestStages, Set<ServiceRequest> requests,Map<String, String> requestProperties, boolean runSmokeTest,boolean reconfigureClients, boolean startDependencies) throws AmbariException, AuthorizationException {
// 此处省略其他代码。
// 这个就是调用 AmbariManagementControllerImpl 类的方法。
return controller.addStages(requestStages, cluster, requestProperties,
null, changedServices, changedComps, changedScHosts,
ignoredScHosts, runSmokeTest, reconfigureClients, false);
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
# 3.2.4.2 资源类型与路由全景
通过 Resource.Type 绑定,所有服务操作接口都可统一由 ServiceService 类分发,典型路径如下:
方法名称 | 请求类型 | 路径 | 说明 |
---|---|---|---|
updateService | PUT | /clusters/{clusterId}/services/{serviceId} | 更新单一服务 |
updateServices | PUT | /clusters/{clusterId}/services | 批量更新服务 |
实际调用建议:
- API 可接收 JSON/YAML 请求体,支持批量启停、滚动升级等多种复杂操作
- 推荐配合 UI 界面或外部自动化脚本一同使用,提高运维效率
代码片段举例:
/
* Handles: PUT /clusters/{clusterId}/services/{serviceId}
* Update a specific service.
*
* @param body http body
* @param headers http headers
* @param ui uri info
* @param serviceName service id
* @return information regarding the updated service
*/
@PUT
@Path("{serviceName}")
@Produces(MediaType.TEXT_PLAIN)
@ApiOperation(value = "Updates a service",
nickname = "ServiceService#updateService"
)
@ApiImplicitParams({
@ApiImplicitParam(dataType = SERVICE_REQUEST_TYPE, paramType = PARAM_TYPE_BODY)
})
@ApiResponses({
@ApiResponse(code = HttpStatus.SC_OK, message = MSG_SUCCESSFUL_OPERATION),
@ApiResponse(code = HttpStatus.SC_ACCEPTED, message = MSG_REQUEST_ACCEPTED),
@ApiResponse(code = HttpStatus.SC_BAD_REQUEST, message = MSG_INVALID_ARGUMENTS),
@ApiResponse(code = HttpStatus.SC_NOT_FOUND, message = MSG_RESOURCE_NOT_FOUND),
@ApiResponse(code = HttpStatus.SC_UNAUTHORIZED, message = MSG_NOT_AUTHENTICATED),
@ApiResponse(code = HttpStatus.SC_FORBIDDEN, message = MSG_PERMISSION_DENIED),
@ApiResponse(code = HttpStatus.SC_INTERNAL_SERVER_ERROR, message = MSG_SERVER_ERROR),
})
public Response updateService(String body, @Context HttpHeaders headers, @Context UriInfo ui,
@ApiParam @PathParam("serviceName") String serviceName) {
return handleRequest(headers, body, ui, Request.Type.PUT, createServiceResource(m_clusterName, serviceName));
}
* Handles: PUT /clusters/{clusterId}/services
* Update multiple services.
*
* @param body http body
* @param headers http headers
* @param ui uri info
* @return information regarding the updated service
*/
@PUT
@Produces(MediaType.TEXT_PLAIN)
@ApiOperation(value = "Updates multiple services",
nickname = "ServiceService#updateServices"
)
@ApiImplicitParams({
@ApiImplicitParam(dataType = SERVICE_REQUEST_TYPE, paramType = PARAM_TYPE_BODY)
})
@ApiResponses({
@ApiResponse(code = HttpStatus.SC_OK, message = MSG_SUCCESSFUL_OPERATION),
@ApiResponse(code = HttpStatus.SC_ACCEPTED, message = MSG_REQUEST_ACCEPTED),
@ApiResponse(code = HttpStatus.SC_BAD_REQUEST, message = MSG_INVALID_ARGUMENTS),
@ApiResponse(code = HttpStatus.SC_NOT_FOUND, message = MSG_RESOURCE_NOT_FOUND),
@ApiResponse(code = HttpStatus.SC_UNAUTHORIZED, message = MSG_NOT_AUTHENTICATED),
@ApiResponse(code = HttpStatus.SC_FORBIDDEN, message = MSG_PERMISSION_DENIED),
@ApiResponse(code = HttpStatus.SC_INTERNAL_SERVER_ERROR, message = MSG_SERVER_ERROR),
})
public Response updateServices(String body, @Context HttpHeaders headers, @Context UriInfo ui) {
return handleRequest(headers, body, ui, Request.Type.PUT, createServiceResource(m_clusterName, null));
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64