keysFromEs.py 5.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182
  1. import json
  2. import pip
  3. from elasticsearch import Elasticsearch
  4. from datetime import datetime
  5. import sys
  6. from fontTools.misc.cython import returns
  7. import timeSelect
  8. import keysCount
  9. class ElasticsearchQuery:
  10. def __init__(self, hosts=None):
  11. """
  12. 初始化Elasticsearch客户端
  13. :param hosts: ES服务器地址,默认为['http://localhost:9200']
  14. :param http_auth: 认证信息,格式为('username', 'password')
  15. """
  16. self.hosts = hosts
  17. self.es = None
  18. self.connect()
  19. def connect(self):
  20. """建立Elasticsearch连接"""
  21. try:
  22. self.es = Elasticsearch(
  23. hosts=self.hosts,
  24. verify_certs=False, # 开发环境可关闭证书验证
  25. )
  26. if self.es.ping():
  27. print("✅ Elasticsearch连接成功")
  28. else:
  29. print("❌ Elasticsearch连接失败")
  30. sys.exit(1)
  31. except Exception as e:
  32. print(f"❌ 连接Elasticsearch时发生错误: {e}")
  33. sys.exit(1)
  34. def execute_query(self, index_name, distinct_id, start_time, end_time):
  35. """
  36. 执行查询
  37. :param index_name: 索引名称
  38. :param distinct_id: 用户ID
  39. :param start_time: 开始时间戳
  40. :param end_time: 结束时间戳
  41. :return: 查询结果
  42. """
  43. query_body = {
  44. "query": {
  45. "bool": {
  46. "must": [
  47. {
  48. "term": {
  49. "distinctId": distinct_id
  50. }
  51. },
  52. {
  53. "range": {
  54. "time": {
  55. "gte": start_time,
  56. "lte": end_time
  57. }
  58. }
  59. },
  60. {
  61. "exists": {
  62. "field": "properties.params.log"
  63. }
  64. }
  65. ]
  66. }
  67. }
  68. }
  69. try:
  70. response = self.es.search(
  71. index=index_name,
  72. body=query_body
  73. )
  74. return response
  75. except Exception as e:
  76. print(f"❌ 查询执行失败: {e}")
  77. return None
  78. def format_results(self, response):
  79. """格式化查询结果"""
  80. if not response or 'hits' not in response:
  81. print("❌ 未找到匹配的结果")
  82. return
  83. hits = response['hits']['hits']
  84. total = response['hits']['total']['value']
  85. print(f"\n📊 查询结果统计:")
  86. print(f" 匹配文档总数: {total}")
  87. print(f" 返回文档数量: {len(hits)}")
  88. print("\n" + "=" * 80)
  89. result = []
  90. for i, hit in enumerate(hits, 1):
  91. source = hit['_source']
  92. score = hit['_score']
  93. print(f"\n📄 文档 {i} (得分: {score:.2f}):")
  94. print(f" 文档ID: {hit['_id']}")
  95. print(f" 用户ID: {source.get('distinctId', 'N/A')}")
  96. print(f" 时间戳: {source.get('time', 'N/A')}")
  97. # 格式化时间显示
  98. if 'time' in source:
  99. try:
  100. dt = datetime.fromtimestamp(source['time'] / 1000)
  101. print(f" 格式化时间: {dt.strftime('%Y-%m-%d %H:%M:%S.%f')[:-3]}")
  102. except:
  103. pass
  104. # 显示properties.params.log内容
  105. properties = source.get('properties', {})
  106. params = properties.get('params', {})
  107. log_content = params.get('log', 'N/A')
  108. print(f" 日志内容: {log_content}")
  109. print("-" * 60)
  110. try:
  111. params_dict = json.loads(log_content)
  112. except Exception as e:
  113. print(f"JSON解析错误: {e}, 跳过该条目")
  114. continue
  115. converted_item = {
  116. "time": source['time'],
  117. "params": params_dict
  118. }
  119. result.append(converted_item)
  120. return result
  121. def main():
  122. """主函数"""
  123. print("🚀 Elasticsearch查询工具")
  124. print("=" * 50)
  125. # 配置信息 - 请根据实际情况修改
  126. ES_HOSTS = ['http://192.168.32.212:9200'] # ES服务器地址
  127. now = datetime.now()
  128. INDEX_NAME = "traces-" + str(now.year) + "-" + str(now.month) + "-" + str(now.day).zfill(2) # 索引名称
  129. # 创建查询实例
  130. es_query = ElasticsearchQuery(hosts=ES_HOSTS)
  131. # 查询参数
  132. distinct_id = "1902111"
  133. start_time = timeSelect.enhanced_time_picker()
  134. end_time = start_time + 7200000
  135. print(f"\n🔍 执行查询:")
  136. print(f" 索引: {INDEX_NAME}")
  137. print(f" 用户ID: {distinct_id}")
  138. print(f" 时间范围: {start_time} - {end_time}")
  139. # 执行查询
  140. response = es_query.execute_query(INDEX_NAME, distinct_id, start_time, end_time)
  141. # 显示结果
  142. if response:
  143. data = es_query.format_results(response)
  144. keysCount.show(data)
  145. print("\n✅ 查询完成")
  146. '''
  147. 请先行安装
  148. pip install elasticsearch
  149. pip install fontTools
  150. pip install pandas
  151. pip install matplotlib
  152. '''
  153. if __name__ == "__main__":
  154. main()