import json import pip from elasticsearch import Elasticsearch from datetime import datetime import sys from fontTools.misc.cython import returns import timeSelect import keysCount class ElasticsearchQuery: def __init__(self, hosts=None): """ 初始化Elasticsearch客户端 :param hosts: ES服务器地址,默认为['http://localhost:9200'] :param http_auth: 认证信息,格式为('username', 'password') """ self.hosts = hosts self.es = None self.connect() def connect(self): """建立Elasticsearch连接""" try: self.es = Elasticsearch( hosts=self.hosts, verify_certs=False, # 开发环境可关闭证书验证 ) if self.es.ping(): print("✅ Elasticsearch连接成功") else: print("❌ Elasticsearch连接失败") sys.exit(1) except Exception as e: print(f"❌ 连接Elasticsearch时发生错误: {e}") sys.exit(1) def execute_query(self, index_name, distinct_id, start_time, end_time): """ 执行查询 :param index_name: 索引名称 :param distinct_id: 用户ID :param start_time: 开始时间戳 :param end_time: 结束时间戳 :return: 查询结果 """ query_body = { "query": { "bool": { "must": [ { "term": { "distinctId": distinct_id } }, { "range": { "time": { "gte": start_time, "lte": end_time } } }, { "exists": { "field": "properties.params.log" } } ] } } } try: response = self.es.search( index=index_name, body=query_body ) return response except Exception as e: print(f"❌ 查询执行失败: {e}") return None def format_results(self, response): """格式化查询结果""" if not response or 'hits' not in response: print("❌ 未找到匹配的结果") return hits = response['hits']['hits'] total = response['hits']['total']['value'] print(f"\n📊 查询结果统计:") print(f" 匹配文档总数: {total}") print(f" 返回文档数量: {len(hits)}") print("\n" + "=" * 80) result = [] for i, hit in enumerate(hits, 1): source = hit['_source'] score = hit['_score'] print(f"\n📄 文档 {i} (得分: {score:.2f}):") print(f" 文档ID: {hit['_id']}") print(f" 用户ID: {source.get('distinctId', 'N/A')}") print(f" 时间戳: {source.get('time', 'N/A')}") # 格式化时间显示 if 'time' in source: try: dt = datetime.fromtimestamp(source['time'] / 1000) print(f" 格式化时间: {dt.strftime('%Y-%m-%d %H:%M:%S.%f')[:-3]}") except: pass # 显示properties.params.log内容 properties = source.get('properties', {}) params = properties.get('params', {}) log_content = params.get('log', 'N/A') print(f" 日志内容: {log_content}") print("-" * 60) try: params_dict = json.loads(log_content) except Exception as e: print(f"JSON解析错误: {e}, 跳过该条目") continue converted_item = { "time": source['time'], "params": params_dict } result.append(converted_item) return result def main(): """主函数""" print("🚀 Elasticsearch查询工具") print("=" * 50) # 配置信息 - 请根据实际情况修改 ES_HOSTS = ['http://192.168.32.212:9200'] # ES服务器地址 now = datetime.now() INDEX_NAME = "traces-" + str(now.year) + "-" + str(now.month) + "-" + str(now.day).zfill(2) # 索引名称 # 创建查询实例 es_query = ElasticsearchQuery(hosts=ES_HOSTS) # 查询参数 distinct_id = "1902111" start_time = timeSelect.enhanced_time_picker() end_time = start_time + 7200000 print(f"\n🔍 执行查询:") print(f" 索引: {INDEX_NAME}") print(f" 用户ID: {distinct_id}") print(f" 时间范围: {start_time} - {end_time}") # 执行查询 response = es_query.execute_query(INDEX_NAME, distinct_id, start_time, end_time) # 显示结果 if response: data = es_query.format_results(response) keysCount.show(data) print("\n✅ 查询完成") ''' 请先行安装 pip install elasticsearch pip install fontTools pip install pandas pip install matplotlib ''' if __name__ == "__main__": main()