1
0

logger.py 4.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124
  1. import logging
  2. import sys
  3. import json
  4. from datetime import datetime, timezone
  5. from typing import Optional
  6. from app.core.config import settings
  7. try:
  8. from fluent import handler as fluent_handler
  9. except Exception: # pragma: no cover - 运行时兜底
  10. fluent_handler = None
  11. class JsonLogFormatter(logging.Formatter):
  12. """
  13. 将日志格式化为 JSON(每行一条),便于 Fluentd/ELK 检索。
  14. """
  15. def format(self, record: logging.LogRecord) -> str:
  16. log_obj = {
  17. "timestamp": datetime.now(timezone.utc).isoformat(timespec="milliseconds"),
  18. "level": record.levelname,
  19. "logger": record.name,
  20. "message": record.getMessage(),
  21. "module": record.module,
  22. "funcName": record.funcName,
  23. "lineNo": record.lineno,
  24. "app": settings.APP_NAME,
  25. "env": settings.APP_ENV
  26. }
  27. if record.exc_info:
  28. log_obj["exception"] = self.formatException(record.exc_info)
  29. return json.dumps(log_obj, ensure_ascii=False)
  30. def _build_fluent_handler() -> Optional[logging.Handler]:
  31. """
  32. 创建 Fluentd Handler。
  33. 若 fluent-logger 未安装或初始化失败,返回 None。
  34. """
  35. if not settings.FLUENTD_ENABLED:
  36. return None
  37. if fluent_handler is None:
  38. logging.getLogger(__name__).warning("fluent-logger 未安装,已跳过 Fluentd 日志输出。")
  39. return None
  40. try:
  41. fh = fluent_handler.FluentHandler(
  42. tag=settings.FLUENTD_TAG,
  43. host=settings.FLUENTD_HOST,
  44. port=settings.FLUENTD_PORT,
  45. timeout=settings.FLUENTD_TIMEOUT_SEC
  46. )
  47. # 给 Fluentd 输出结构化字段(dict),避免 record 退化为 string。
  48. # 这里不能使用普通 logging.Formatter,否则会变成纯文本。
  49. if hasattr(fluent_handler, "FluentRecordFormatter"):
  50. fh.setFormatter(fluent_handler.FluentRecordFormatter({
  51. "timestamp": "%(asctime)s",
  52. "level": "%(levelname)s",
  53. "logger": "%(name)s",
  54. "message": "%(message)s",
  55. "module": "%(module)s",
  56. "funcName": "%(funcName)s",
  57. "lineNo": "%(lineno)d",
  58. "app": settings.APP_NAME,
  59. "env": settings.APP_ENV
  60. }))
  61. return fh
  62. except Exception as e:
  63. logging.getLogger(__name__).error(f"初始化 Fluentd Handler 失败: {e}")
  64. return None
  65. def setup_logging():
  66. """
  67. 配置日志系统,使其同时输出到文件和控制台。
  68. 这个函数应该在应用程序启动时只调用一次。
  69. """
  70. # 获取根日志记录器
  71. root_logger = logging.getLogger()
  72. root_logger.setLevel(logging.INFO) # 设置根日志记录器的级别
  73. # 如果已经有处理器,先清空,防止重复添加
  74. if root_logger.hasHandlers():
  75. root_logger.handlers.clear()
  76. # 创建 JSON 格式化器(用于控制台与本地文件)
  77. formatter = JsonLogFormatter()
  78. # 1. 创建控制台处理器 (StreamHandler)
  79. # - 将日志输出到标准输出(您的终端)
  80. console_handler = logging.StreamHandler(sys.stdout)
  81. console_handler.setFormatter(formatter)
  82. root_logger.addHandler(console_handler)
  83. # 2. 创建文件处理器 (FileHandler)
  84. # - 将日志写入文件 app.log
  85. # - mode='w' 表示每次启动都覆盖旧日志
  86. # - encoding='utf-8' 确保正确处理中文字符
  87. file_handler = logging.FileHandler('app.log', mode='w', encoding='utf-8')
  88. file_handler.setFormatter(formatter)
  89. root_logger.addHandler(file_handler)
  90. # 3. 可选:创建 Fluentd 处理器 (fluent-logger)
  91. fluentd_handler = _build_fluent_handler()
  92. if fluentd_handler:
  93. root_logger.addHandler(fluentd_handler)
  94. logging.info(
  95. "Fluentd日志输出已启用: host=%s port=%s tag=%s",
  96. settings.FLUENTD_HOST, settings.FLUENTD_PORT, settings.FLUENTD_TAG
  97. )
  98. # 配置完成后,可以记录一条消息来确认
  99. logging.info("日志系统已成功配置,将同时输出到控制台和 app.log 文件。")
  100. def get_logger(name: str) -> logging.Logger:
  101. """
  102. 获取一个指定名称的日志记录器实例。
  103. 假设 setup_logging() 已经在此之前被调用。
  104. """
  105. return logging.getLogger(name)