TT Bigdata TT Bigdata
首页
  • 部署专题

    • 常规安装
    • 一键部署
  • 组件专题

    • 安装指导
    • 实战 Kerberos
    • 魔改分享
  • 版本专题

    • 更新说明
    • BUG临时处理
  • 实验室

    • VIEW插件
    • JIRA速查
  • Ambari-Env

    • 环境准备
    • 开始使用
  • 组件编译

    • 专区—Ambari
    • 专区—Bigtop-官方组件
    • 专区—Bigtop-扩展组件
  • 报错解决

    • 专区—Ambari
    • 专区—Bigtop
  • 其他技巧

    • APT仓库增量更新
    • Maven镜像加速
    • Gradle镜像加速
    • Bower镜像加速
    • 虚拟环境思路
    • R环境安装+一键安装脚本
    • Ivy配置私有镜像仓库
    • Node.js 多版本共存方案
    • Ambari Web本地启动
    • Npm镜像加速
    • PostgreSQL快速安装
    • Temurin JDK 23快速安装
  • 成神之路

    • 专区—Ambari
    • 专区—Ambari-Metrics
    • 专区—Bigtop
  • 集成案例

    • Redis集成教学
    • Dolphin集成教学
    • Doris集成教学
    • 持续整理...
  • 核心代码

    • 各组件代码
    • 通用代码模板
  • 国产化&其他系统

    • Kylin V10系列
    • Rocky系列
    • Ubuntu系列
  • Grafana监控方案

    • Ambari-Metrics插件
    • Infinity插件
  • 支持&共建

    • 蓝图愿景
    • 合作共建
登陆
GitHub (opens new window)

JaneTTR

数据酿造智慧,每一滴都是沉淀!
首页
  • 部署专题

    • 常规安装
    • 一键部署
  • 组件专题

    • 安装指导
    • 实战 Kerberos
    • 魔改分享
  • 版本专题

    • 更新说明
    • BUG临时处理
  • 实验室

    • VIEW插件
    • JIRA速查
  • Ambari-Env

    • 环境准备
    • 开始使用
  • 组件编译

    • 专区—Ambari
    • 专区—Bigtop-官方组件
    • 专区—Bigtop-扩展组件
  • 报错解决

    • 专区—Ambari
    • 专区—Bigtop
  • 其他技巧

    • APT仓库增量更新
    • Maven镜像加速
    • Gradle镜像加速
    • Bower镜像加速
    • 虚拟环境思路
    • R环境安装+一键安装脚本
    • Ivy配置私有镜像仓库
    • Node.js 多版本共存方案
    • Ambari Web本地启动
    • Npm镜像加速
    • PostgreSQL快速安装
    • Temurin JDK 23快速安装
  • 成神之路

    • 专区—Ambari
    • 专区—Ambari-Metrics
    • 专区—Bigtop
  • 集成案例

    • Redis集成教学
    • Dolphin集成教学
    • Doris集成教学
    • 持续整理...
  • 核心代码

    • 各组件代码
    • 通用代码模板
  • 国产化&其他系统

    • Kylin V10系列
    • Rocky系列
    • Ubuntu系列
  • Grafana监控方案

    • Ambari-Metrics插件
    • Infinity插件
  • 支持&共建

    • 蓝图愿景
    • 合作共建
登陆
GitHub (opens new window)
  • Hadoop

  • Spark

  • Trino

  • Hudi

  • Paimon

  • Livy

  • Flink

  • Atlas

  • Superset

  • Jsvc

  • Zookeeper

  • Hive

  • Sqoop

  • Cloudbeaver

  • Bigtop-select

  • Knox

  • Hue

    • Hue 访问 Hadoop 权限问题
    • Hue 访问 Yarn 权限问题
    • Hue 访问 Impala 时间格式问题
    • 生产环境下解决方案——Hue/query_api.py
      • 一、先给结论:生产推荐怎么做
      • 二、问题定位:为什么要动 query_api.py
      • 三、方案一:生产热修复(直接替换文件)
        • 1、适用场景
        • 2、修改目标文件路径
        • 3、操作步骤(建议按这个顺序做)
        • 1)备份原文件
        • 2)替换文件内容
        • 3)重启 Hue 服务
        • 4、验证方式(热修复必须做验证)
        • 5、回滚方案(必须准备)
      • 四、方案二:工程化修复(回写源码并重新编译发布)
        • 1、适用场景
        • 2、修复流程(工程化链路)
        • 3、生产落地建议(避免二次事故)
      • 五、为什么不建议只改一处
  • 报错解决-Bigtop
  • Hue
JaneTTR
2025-12-24
目录

生产环境下解决方案——Hue/query_api.py

# 一、先给结论:生产推荐怎么做 优先级

结论速览

  • 临时止血:方案一(线上直接替换文件)——改动快、回滚快,适合窗口期紧张的生产现场
  • 长期稳定:方案二(回写源码、重新编译发布)——可追溯、可复现,适合纳入版本管理与持续交付

适用前提

本文处理的是 Hue JobBrowser(Impala 查询历史) 场景下的时间解析兼容性问题:Impala 返回的 start_time 字符串在某些版本/接口中不带毫秒(甚至出现 2025-12-05 16:50:16.),导致 Hue 侧 datetime.strptime 抛 ValueError,页面异常。

# 二、问题定位:为什么要动 query_api.py Root Cause

Hue JobBrowser 对 Impala 查询列表的处理入口在:

  • apps/jobbrowser/src/jobbrowser/apis/query_api.py

该文件内会对 Impala 返回的 start_time 字段进行解析与本地化转换,并作为:

  • 列表字段展示(submitted)
  • 排序依据(按 submitted 倒序)
  • 时间过滤条件(time filter)

当 start_time 不满足固定格式时,解析会直接失败,接口返回异常,前端页面表现为“列表空/异常”。

生产判断特征
  • Hue 访问 JobBrowser 的接口返回异常或页面无法展示
  • hue.log / access log 附近出现 ValueError: time data ... does not match format ...
  • 堆栈指向 apps/jobbrowser/src/jobbrowser/apis/query_api.py

# 三、方案一:生产热修复(直接替换文件)Fast Fix

# 1、适用场景

  • 修复窗口很短(业务催得急)
  • 当前版本无法快速走一遍完整打包发布链路
  • 允许在生产机器上做一次“可控的文件替换”

风险提示

该方案属于线上改文件,务必做到:

  • 备份原文件
  • 做最小变更
  • 可快速回滚
  • 有发布记录(建议同步到版本库或变更单)

# 2、修改目标文件路径

生产部署路径通常为:

/usr/bigtop/current/hue/apps/jobbrowser/src/jobbrowser/apis/query_api.py
1

建议先确认软链接指向

/usr/bigtop/current/hue 往往是软链接,建议先确认真实路径,避免“替换了但没生效”。

# 3、操作步骤(建议按这个顺序做)

# 1)备份原文件

cp -a /usr/bigtop/current/hue/apps/jobbrowser/src/jobbrowser/apis/query_api.py \
      /usr/bigtop/current/hue/apps/jobbrowser/src/jobbrowser/apis/query_api.py.$(date +%F_%H%M%S).bak
1
2

# 2)替换文件内容

将修复后的 query_api.py 放到目标路径覆盖(具体内容本文不展开粘贴,保持“关键代码可不输出”的要求)。

替换建议

  • 只动 start_time 解析相关逻辑(submitted / time filter)
  • 其他逻辑(apps 列表、排序、字段映射)不要改动

# 3)重启 Hue 服务

生产上 Hue 常见的重启方式与部署形态有关(systemd / supervisor / 自定义脚本),核心原则是: 让 Hue Python 进程重新加载该文件。

# 4、验证方式(热修复必须做验证)

验证清单

  1. 打开 Hue → JobBrowser → Impala,列表能正常展示
  2. 列表按 submitted 排序正常(新记录在前)
  3. 时间过滤(最近 1h / 1d 等)可用
  4. hue.log 不再出现 ValueError 堆栈

# 5、回滚方案(必须准备)

cp -a /usr/bigtop/current/hue/apps/jobbrowser/src/jobbrowser/apis/query_api.py.<备份时间戳>.bak \
      /usr/bigtop/current/hue/apps/jobbrowser/src/jobbrowser/apis/query_api.py
1
2

然后重启 Hue。

回滚原则

生产现场回滚要比继续排查更重要:一旦出现副作用,优先回滚到备份版本,恢复服务可用性。

# 四、方案二:工程化修复(回写源码并重新编译发布)Recommended

# 1、适用场景

  • 需要形成长期可维护的版本
  • 需要走标准发布流程(RPM/DEB/离线包等)
  • 需要解决“线上改文件不可追溯”的问题

# 2、修复流程(工程化链路)

标准链路

1)在 Hue 源码树中定位 apps/jobbrowser/src/jobbrowser/apis/query_api.py 2)完成最小修复(仅时间解析兼容) 3)补充说明与变更记录(commit message / patch) 4)走 Bigtop/内部 CI 编译 → 产出包 5)灰度发布(1 台或 1 个环境先验证) 6)全量发布

# 3、生产落地建议(避免二次事故)

建议做灰度

  • 先在测试环境验证:JobBrowser 列表 / 排序 / 过滤
  • 再选择一台生产节点灰度:确认日志无异常 + 页面无问题
  • 最后全量替换
版本追溯建议
  • 以 patch 形式纳入构建系统(spec 的 %prep 或自定义 patch 阶段)
  • 记录影响范围:Impala 版本、Hue 版本、Bigtop release 版本
  • 写明回滚方式:回退包版本/撤销 patch

# 五、为什么不建议只改一处 Tip

query_api.py 中对时间字段的使用通常至少包含两类:

  1. 展示字段 submitted(解析 + 时区转换 + strftime)
  2. 过滤条件(按 start_time 计算时间窗口)

若只修复展示字段,而过滤逻辑仍使用旧的解析方式,那么在使用筛选条件时仍可能抛异常。

最小修复口径

  • 展示(submitted)与过滤(time filter)保持一致的解析策略
  • 避免出现“页面能打开,但一筛选就报错”的情况

温馨提示

完整的源代码如下


#!/usr/bin/env python
# Licensed to Cloudera, Inc. under one
# or more contributor license agreements.  See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership.  Cloudera, Inc. licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License.  You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from builtins import filter
from builtins import range
import itertools
import logging
import re
import sys
import time
from datetime import datetime
import pytz
from babel import localtime

from desktop.lib import export_csvxls
from libanalyze import analyze as analyzer, rules
from notebook.conf import ENABLE_QUERY_ANALYSIS

from jobbrowser.apis.base_api import Api

if sys.version_info[0] > 2:
  from django.utils.translation import gettext as _
else:
  from django.utils.translation import ugettext as _

ANALYZER = rules.TopDownAnalysis() # We need to parse some files so save as global
LOG = logging.getLogger(__name__)

try:
  from beeswax.models import Session
  from impala.server import get_api as get_impalad_api, _get_impala_server_url
except ImportError as e:
  LOG.exception('Some application are not enabled: %s' % e)


def _get_api(user, cluster=None):
  if cluster and cluster.get('type') == 'altus-dw':
    server_url = 'http://impala-coordinator-%(name)s:25000' % cluster
  else:
    # TODO: multi computes if snippet.get('compute') or snippet['type'] has computes
    application = cluster.get('interface', 'impala')
    session = Session.objects.get_session(user, application=application)
    server_url = _get_impala_server_url(session)
  return get_impalad_api(user=user, url=server_url)


class QueryApi(Api):

  def __init__(self, user, impala_api=None, cluster=None):
    if impala_api:
      self.api = impala_api
    else:
      self.api = _get_api(user, cluster)

  def apps(self, filters):
    kwargs = {}

    jobs = self.api.get_queries(**kwargs)

    filter_list = self._get_filter_list(filters)
    jobs_iter = itertools.chain(jobs['in_flight_queries'], jobs['completed_queries'])
    jobs_iter_filtered = self._n_filter(filter_list, jobs_iter)

    #apps['submitted'] time is stripped to microseconds and converted from type string to datetime object using
    #datetime.strptime() to fetch the local time instead of the UTC time. Finally, The local time is converted to type string
    #using datetime.strftime()

    apps = {
      'apps': sorted([{
        'id': job['query_id'],
        'name': job['stmt'].replace('\r\n', ' ')[:60] + ('...' if len(job['stmt']) > 60 else ''),
        'status': job['state'],
        'apiStatus': self._api_status(job['state']),
        'type': job['stmt_type'],
        'user': job['effective_user'],
        'queue': job.get('resource_pool'),
        'progress': job['progress'],
        'isRunning': job['start_time'] > job['end_time'],
        'canWrite': job in jobs['in_flight_queries'],
        'duration': self._time_in_ms_groups(
            re.search(r"\s*(([\d.]*)([a-z]*))(([\d.]*)([a-z]*))?(([\d.]*)([a-z]*))?",
            job['duration'],
            re.MULTILINE
        ).groups()),
        'submitted': datetime.strptime(job['start_time'].split('.')[0], "%Y-%m-%d %H:%M:%S") \
  .replace(tzinfo=pytz.utc).astimezone(localtime._get_localzone()) \
  .strftime("%Y-%m-%d %H:%M:%S.%f"),

        # Extra specific
        'rows_fetched': job['rows_fetched'],
        'waiting': job['waiting'],
        'waiting_time': job['waiting_time']
      } for job in jobs_iter_filtered], key=lambda job: job.get('submitted'), reverse=True),
      'total': 0
    }
    apps['total'] = len(apps['apps'])

    return apps

  def _time_in_ms_groups(self, groups):
    time = 0
    for x in range(0, len(groups), 3):
      if groups[x+1]:
        time += self._time_in_ms(groups[x+1], groups[x+2])
    return time

  def _time_in_ms(self, time, period):
    if period == 'ns':
      return float(time) / 1000
    elif period == 'ms':
      return float(time)
    elif period == 's':
      return float(time) * 1000
    elif period == 'm':
      return float(time) * 60000 #1000*60
    elif period == 'h':
      return float(time) * 3600000 #1000*60*60
    elif period == 'd':
      return float(time) * 86400000  # 1000*60*60*24
    else:
      return float(time)

  def app(self, appid):
    apps = self.apps({
      'text': 'id:' + appid
    })

    if not apps.get('apps'):
      return {
        'name': _('Unknown or expired query id %s') % appid
      }
    app = apps.get('apps')[0]
    progress_groups = re.search(r"([\d\.\,]+)%", app.get('progress'))
    app.update({
      'progress': float(progress_groups.group(1)) \
          if progress_groups and progress_groups.group(1) else 100 \
            if self._api_status(app.get('status')) in ['SUCCEEDED', 'FAILED'] else 1,
      'type': 'queries',
      'doc_url': "%s/query_plan?query_id=%s" % (self.api.url, appid),
      'properties': {
        'memory': '',
        'profile': '',
        'plan': '',
        'backends': '',
        'finstances': '',
        'metrics': ''
      }
    })

    return app

  def action(self, appid, action):
    message = {'message': '', 'status': 0}

    if action.get('action') == 'kill':
      for _id in appid:
        result = self.api.kill(_id)
        if result.get('error'):
          message['message'] = result.get('error')
          message['status'] = -1
        elif result.get('contents') and message.get('status') != -1:
          message['message'] = result.get('contents')

    return message;

  def logs(self, appid, app_type, log_name=None, is_embeddable=False):
    return {'logs': ''}

  def profile(self, appid, app_type, app_property, app_filters):
    if app_property == 'memory':
      return self._memory(appid, app_type, app_property, app_filters)
    elif app_property == 'profile':
      return self._query_profile(appid)
    elif app_property == 'download-profile':
      return export_csvxls.make_response([self._query_profile(appid)['profile']], 'txt', 'query-profile_%s' % appid)
    elif app_property == 'backends':
      return self._query_backends(appid)
    elif app_property == 'finstances':
      return self._query_finstances(appid)
    else:
      return self._query(appid)


  def profile_encoded(self, appid):
    return self.api.get_query_profile_encoded(query_id=appid)

  def _memory(self, appid, app_type, app_property, app_filters):
    return self.api.get_query_memory(query_id=appid);

  def _metrics(self, appid):
    query_profile = self.api.get_query_profile_encoded(appid)
    profile = analyzer.analyze(analyzer.parse_data(query_profile))
    ANALYZER.pre_process(profile)
    metrics = analyzer.metrics(profile)

    if ENABLE_QUERY_ANALYSIS.get():
      result = ANALYZER.run(profile)
      if result and result[0]:
        for factor in result[0]['result']:
          if factor['reason'] and factor['result_id'] and metrics['nodes'].get(factor['result_id']):
            metrics['nodes'][factor['result_id']]['health'] = factor['reason']
    return metrics

  def _query(self, appid):
    query = self.api.get_query(query_id=appid)
    query['summary'] = query.get('summary').strip() if query.get('summary') else ''
    query['plan'] = query.get('plan').strip() if query.get('plan') else ''
    try:
      query['metrics'] = self._metrics(appid)
    except Exception as e:
      query['metrics'] = {'nodes': {}}
      LOG.exception('Could not parse profile: %s' % e)

    if query.get('plan_json'):
      def get_exchange_icon(o):
        if re.search(r'broadcast', o['label_detail'], re.IGNORECASE):
          return {'svg': 'hi-broadcast'}
        elif re.search(r'hash', o['label_detail'], re.IGNORECASE):
          return {'svg': 'hi-random'}
        else:
          return {'svg': 'hi-exchange'}
      def get_sigma_icon(o):
        if re.search(r'streaming', o['label_detail'], re.IGNORECASE):
          return {'svg': 'hi-sigma'}
        else:
          return {'svg': 'hi-sigma'}
      mapping = {
        'TOP-N': {'type': 'TOPN', 'icon': {'svg': 'hi-filter'}},
        'SORT': {'type': 'SORT', 'icon': {'svg': 'hi-sort'}},
        'MERGING-EXCHANGE': {'type': 'EXCHANGE', 'icon': {'fn': get_exchange_icon}},
        'EXCHANGE': {'type': 'EXCHANGE', 'icon': {'fn': get_exchange_icon}},
        'SCAN HDFS': {'type': 'SCAN_HDFS', 'icon': {'svg': 'hi-copy'}},
        'SCAN KUDU': {'type': 'SCAN_KUDU', 'icon': {'svg': 'hi-table'}},
        'SCAN HBASE': {'type': 'SCAN_HBASE', 'icon': {'font': 'fa-th-large'}},
        'HASH JOIN': {'type': 'HASH_JOIN', 'icon': {'svg': 'hi-join'}},
        'AGGREGATE': {'type': 'AGGREGATE', 'icon': {'fn': get_sigma_icon}},
        'NESTED LOOP JOIN': {'type': 'LOOP_JOIN', 'icon': {'svg': 'hi-nested-loop'}},
        'SUBPLAN': {'type': 'SUBPLAN', 'icon': {'svg': 'hi-map'}},
        'UNNEST': {'type': 'UNNEST', 'icon': {'svg': 'hi-unnest'}},
        'SINGULAR ROW SRC': {'type': 'SINGULAR', 'icon': {'svg': 'hi-vertical-align'}},
        'ANALYTIC': {'type': 'SINGULAR', 'icon': {'svg': 'hi-timeline'}},
        'UNION': {'type': 'UNION', 'icon': {'svg': 'hi-merge'}}
      }
      def process(node, mapping=mapping):
        node['id'], node['name'] = node['label'].split(':')
        details = mapping.get(node['name'])
        if details:
          icon = details['icon']
          if icon and icon.get('fn'):
            icon = icon['fn'](node)
          node['icon'] = icon

      for node in query['plan_json']['plan_nodes']:
        self._for_each_node(node, process)
    return query

  def _for_each_node(self, node, fn):
    fn(node)
    for child in node['children']:
      self._for_each_node(child, fn)

  def _query_profile(self, appid):
    return self.api.get_query_profile(query_id=appid)

  def _query_backends(self, appid):
    return self.api.get_query_backends(query_id=appid)

  def _query_finstances(self, appid):
    return self.api.get_query_finstances(query_id=appid)

  def _api_status_filter(self, status):
    if status == 'FINISHED':
      return 'COMPLETED'
    elif status == 'EXCEPTION':
      return 'FAILED'
    elif status == 'RUNNING':
      return 'RUNNING'

  def _api_status(self, status):
    if status == 'FINISHED':
      return 'SUCCEEDED'
    elif status == 'EXCEPTION':
      return 'FAILED'
    elif status == 'RUNNING':
      return 'RUNNING'
    else:
      return 'PAUSED'

  def _get_filter_list(self, filters):
    filter_list = []
    if filters.get("text"):
      filter_names = {
        'user': 'effective_user',
        'id': 'query_id',
        'name': 'state',
        'type': 'stmt_type',
        'status': 'status'
      }

      def make_lambda(name, value):
        return lambda app: app[name] == value

      for key, name in list(filter_names.items()):
        text_filter = re.search(r"\s*("+key+")\s*:([^ ]+)", filters.get("text"))
        if text_filter and text_filter.group(1) == key:
          filter_list.append(make_lambda(name, text_filter.group(2).strip()))
    if filters.get("time"):
      time_filter = filters.get("time")
      period_ms = self._time_in_ms(float(time_filter.get("time_value")), time_filter.get("time_unit")[0:1])
      current_ms = time.time() * 1000.0
      filter_list.append(
        lambda app: current_ms - (time.mktime(datetime.strptime(
          app['start_time'].split('.')[0], '%Y-%m-%d %H:%M:%S'
        ).timetuple()) * 1000)
                    < period_ms
      )
    if filters.get("states"):
      filter_list.append(lambda app: self._api_status_filter(app['state']).lower() in filters.get("states"))

    return filter_list

  def _n_filter(self, filters, tuples):
    for f in filters:
      tuples = list(filter(f, tuples))
    return tuples



1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
#Bigtop#Hue#Impala#JobBrowser#query_api.py#datetime
Hue 访问 Impala 时间格式问题

← Hue 访问 Impala 时间格式问题

最近更新
01
xmlsectool 依赖缺失问题解析
12-24
02
webhdfs-test 依赖收敛冲突问题处理
12-24
03
Invalid keystore format 问题处理
12-24
更多文章>
Theme by Vdoing | Copyright © 2017-2025 JaneTTR | MIT License
  • 跟随系统
  • 浅色模式
  • 深色模式
  • 阅读模式