| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182 |
- import json
- import pip
- from elasticsearch import Elasticsearch
- from datetime import datetime
- import sys
- from fontTools.misc.cython import returns
- import timeSelect
- import keysCount
- class ElasticsearchQuery:
- def __init__(self, hosts=None):
- """
- 初始化Elasticsearch客户端
- :param hosts: ES服务器地址,默认为['http://localhost:9200']
- :param http_auth: 认证信息,格式为('username', 'password')
- """
- self.hosts = hosts
- self.es = None
- self.connect()
- def connect(self):
- """建立Elasticsearch连接"""
- try:
- self.es = Elasticsearch(
- hosts=self.hosts,
- verify_certs=False, # 开发环境可关闭证书验证
- )
- if self.es.ping():
- print("✅ Elasticsearch连接成功")
- else:
- print("❌ Elasticsearch连接失败")
- sys.exit(1)
- except Exception as e:
- print(f"❌ 连接Elasticsearch时发生错误: {e}")
- sys.exit(1)
- def execute_query(self, index_name, distinct_id, start_time, end_time):
- """
- 执行查询
- :param index_name: 索引名称
- :param distinct_id: 用户ID
- :param start_time: 开始时间戳
- :param end_time: 结束时间戳
- :return: 查询结果
- """
- query_body = {
- "query": {
- "bool": {
- "must": [
- {
- "term": {
- "distinctId": distinct_id
- }
- },
- {
- "range": {
- "time": {
- "gte": start_time,
- "lte": end_time
- }
- }
- },
- {
- "exists": {
- "field": "properties.params.log"
- }
- }
- ]
- }
- }
- }
- try:
- response = self.es.search(
- index=index_name,
- body=query_body
- )
- return response
- except Exception as e:
- print(f"❌ 查询执行失败: {e}")
- return None
- def format_results(self, response):
- """格式化查询结果"""
- if not response or 'hits' not in response:
- print("❌ 未找到匹配的结果")
- return
- hits = response['hits']['hits']
- total = response['hits']['total']['value']
- print(f"\n📊 查询结果统计:")
- print(f" 匹配文档总数: {total}")
- print(f" 返回文档数量: {len(hits)}")
- print("\n" + "=" * 80)
- result = []
- for i, hit in enumerate(hits, 1):
- source = hit['_source']
- score = hit['_score']
- print(f"\n📄 文档 {i} (得分: {score:.2f}):")
- print(f" 文档ID: {hit['_id']}")
- print(f" 用户ID: {source.get('distinctId', 'N/A')}")
- print(f" 时间戳: {source.get('time', 'N/A')}")
- # 格式化时间显示
- if 'time' in source:
- try:
- dt = datetime.fromtimestamp(source['time'] / 1000)
- print(f" 格式化时间: {dt.strftime('%Y-%m-%d %H:%M:%S.%f')[:-3]}")
- except:
- pass
- # 显示properties.params.log内容
- properties = source.get('properties', {})
- params = properties.get('params', {})
- log_content = params.get('log', 'N/A')
- print(f" 日志内容: {log_content}")
- print("-" * 60)
- try:
- params_dict = json.loads(log_content)
- except Exception as e:
- print(f"JSON解析错误: {e}, 跳过该条目")
- continue
- converted_item = {
- "time": source['time'],
- "params": params_dict
- }
- result.append(converted_item)
- return result
- def main():
- """主函数"""
- print("🚀 Elasticsearch查询工具")
- print("=" * 50)
- # 配置信息 - 请根据实际情况修改
- ES_HOSTS = ['http://192.168.32.212:9200'] # ES服务器地址
- now = datetime.now()
- INDEX_NAME = "traces-" + str(now.year) + "-" + str(now.month) + "-" + str(now.day).zfill(2) # 索引名称
- # 创建查询实例
- es_query = ElasticsearchQuery(hosts=ES_HOSTS)
- # 查询参数
- distinct_id = "1902111"
- start_time = timeSelect.enhanced_time_picker()
- end_time = start_time + 7200000
- print(f"\n🔍 执行查询:")
- print(f" 索引: {INDEX_NAME}")
- print(f" 用户ID: {distinct_id}")
- print(f" 时间范围: {start_time} - {end_time}")
- # 执行查询
- response = es_query.execute_query(INDEX_NAME, distinct_id, start_time, end_time)
- # 显示结果
- if response:
- data = es_query.format_results(response)
- keysCount.show(data)
- print("\n✅ 查询完成")
- '''
- 请先行安装
- pip install elasticsearch
- pip install fontTools
- pip install pandas
- pip install matplotlib
- '''
- if __name__ == "__main__":
- main()
|