| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386 |
- # -*- coding: utf-8 -*-
- # Author : Charley
- # Python : 3.12.10
- # Date : 2026/5/22 10:55
- """
- 目标网站:
- https://quote.eastmoney.com/concept/sz300624.html?from=classic
- 说明:
- 页面右侧"筹码分布"图没有专用接口 —— 它是前端用 K 线数据在本地用
- CYQCalculator 算出来的(见 quotechart2022.js / src/modules/tools/indicator/cyq.ts)。
- 本脚本: 拉 K 线 → 复刻三角分布筹码算法 → 输出筹码分布。
- """
- import json
- import math
- import random
- import re
- import time
- # import requests
- from curl_cffi.requests import BrowserType
- from curl_cffi import requests
- from loguru import logger
- from datetime import datetime
- from pydantic import BaseModel, Field
- from fastapi import FastAPI, Query
- from fastapi.exceptions import RequestValidationError
- from fastapi.responses import JSONResponse
- from tenacity import retry, retry_if_not_exception_type, stop_after_attempt, wait_fixed
- logger.remove()
- logger.add("./logs/{time:YYYYMMDD}.log", encoding='utf-8', rotation="00:00",
- format="[{time:YYYY-MM-DD HH:mm:ss.SSS}] {level} {message}",
- level="DEBUG", retention="7 day")
- # 直接用库内置的所有浏览器指纹
- client_identifier_list = [b.value for b in BrowserType]
- KLINE_URL = "https://push2his.eastmoney.com/api/qt/stock/kline/get"
- # 反爬绕过: 东方财富会拒绝默认 python-requests UA, 需要伪装浏览器
- HEADERS = {
- "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/147.0.0.0 Safari/537.36",
- "Referer": "https://quote.eastmoney.com/concept/sz300624.html?from=classic",
- }
- app = FastAPI(title="Eastmoney CYQ API", version="1.0.0")
- class InvalidStockMarketError(Exception):
- """当前市场前缀没有返回有效 K 线数据。"""
- class ChipRequest(BaseModel):
- stockCode: str | int = Field(..., description="股票代码,例如 688605")
- endTime: str = Field(..., description="查询日期,支持 YYYY-MM-DD 或 YYYYMMDD")
- def success_response(data):
- return {
- "code": 0,
- "message": "success",
- "data": data,
- }
- def error_response(code: int, message: str, status_code: int = 200):
- return JSONResponse(
- status_code=status_code,
- content={
- "code": code,
- "message": message,
- "data": None,
- },
- )
- @app.exception_handler(RequestValidationError)
- async def validation_exception_handler(_request, exc):
- """
- FastAPI 全局参数校验异常处理器。
- :param _request: (starlette.requests.Request) 当前请求对象, 当前实现不需要使用,
- 但 FastAPI 异常处理器签名要求保留该位置参数
- :param exc: (RequestValidationError) 参数校验异常对象, 含错误明细
- :return: (JSONResponse) 统一错误响应, code=400
- """
- return error_response(400, f"参数错误: {exc.errors()}")
- class CYQCalculator:
- """三角分布筹码分布算法 — 移植自 quotechart2022.js 的 CYQCalculator"""
- def __init__(self, klinedata, fator: int = 150, range_=None):
- self.klinedata = klinedata
- self.fator = fator or 150
- self.range = range_
- def calc(self, e: int) -> dict:
- o = self.fator
- r = max(0, e - self.range + 1) if self.range else 0
- window = self.klinedata[r: max(1, e + 1)]
- if not window:
- raise ValueError("invalid index")
- hi_max = max(k["high"] for k in window)
- lo_min = min(k["low"] for k in window)
- u = max(0.01, (hi_max - lo_min) / (o - 1))
- prices = [round(lo_min + u * s, 2) for s in range(o)]
- chips = [0.0] * o
- for d in window:
- op, cl, hi, lo = d["open"], d["close"], d["high"], d["low"]
- m = (op + cl + hi + lo) / 4
- A = min(1.0, (d["hsl"] or 0) / 100.0)
- g = math.floor((hi - lo_min) / u)
- v = math.ceil((lo - lo_min) / u)
- y0 = (o - 1) if hi == lo else 2.0 / (hi - lo)
- y1 = math.floor((m - lo_min) / u)
- chips = [c * (1 - A) for c in chips]
- if hi == lo:
- chips[y1] += y0 * A / 2
- else:
- for j in range(v, g + 1):
- x = lo_min + u * j
- if x <= m:
- if abs(m - lo) < 1e-8:
- chips[j] += y0 * A
- else:
- chips[j] += (x - lo) / (m - lo) * y0 * A
- else:
- if abs(hi - m) < 1e-8:
- chips[j] += y0 * A
- else:
- chips[j] += (hi - x) / (hi - m) * y0 * A
- k_sum = sum(chips)
- close = self.klinedata[e]["close"]
- def S(target: float) -> float:
- run = 0.0
- for i, c in enumerate(chips):
- if run + c > target:
- return lo_min + u * i
- run += c
- return 0.0
- def compute_percent_chips(percent: float) -> dict:
- if not (0 <= percent <= 1):
- raise ValueError('"percent" out of range')
- lo_p = S(k_sum * (1 - percent) / 2)
- hi_p = S(k_sum * (1 + percent) / 2)
- return {
- "priceRange": [f"{lo_p:.2f}", f"{hi_p:.2f}"],
- "concentration": 0 if lo_p + hi_p == 0 else (hi_p - lo_p) / (hi_p + lo_p),
- }
- benefit = sum(c for s, c in enumerate(chips) if close >= lo_min + s * u)
- benefit_part = 0 if k_sum == 0 else benefit / k_sum
- return {
- "x": chips,
- "y": prices,
- "benefitPart": benefit_part,
- "avgCost": f"{S(0.5 * k_sum):.2f}",
- "percentChips": {
- 90: compute_percent_chips(0.9),
- 70: compute_percent_chips(0.7),
- },
- }
- def after_log(retry_state):
- """
- retry 回调
- :param retry_state: RetryCallState 对象
- """
- # 检查 args 是否存在且不为空
- if retry_state.args and len(retry_state.args) > 0:
- log = retry_state.args[0] # 获取传入的 logger
- else:
- log = logger # 使用全局 logger
- if retry_state.outcome.failed:
- log.warning(
- f"Function '{retry_state.fn.__name__}', Attempt {retry_state.attempt_number} Times")
- else:
- log.info(f"Function '{retry_state.fn.__name__}', Attempt {retry_state.attempt_number} succeeded")
- def get_proxys():
- """
- 获取快代理隧道代理配置 (固定凭证, 字符串拼接不会失败, 不需要重试)。
- :return: (dict) 形如 {"http": "http://user:pwd@host:port/", "https": "..."}
- """
- tunnel = "x371.kdltps.com:15818"
- kdl_username = "t13753103189895"
- kdl_password = "o0yefv6z"
- proxy_url = f"http://{kdl_username}:{kdl_password}@{tunnel}/"
- return {"http": proxy_url, "https": proxy_url}
- @retry(
- stop=stop_after_attempt(5),
- wait=wait_fixed(1),
- after=after_log,
- retry=retry_if_not_exception_type(InvalidStockMarketError),
- )
- def fetch_kline(log, secid: str, end: str = "20990101", lmt: int = 210, fqt: int = 1):
- """
- 拉日K数据。fqt: 0=不复权, 1=前复权, 2=后复权
- :param log: 日志对象
- :param secid: 股票代码,例如 "0.300624"
- :param end: 结束日期,格式 "YYYYMMDD"
- :param lmt: 每次拉取的K线数量
- :param fqt: 复权方式
- :return: K线数据列表
- """
- log.info(f"Fetching kline data for {secid} from {end} with {lmt} bars and {fqt} dividend adjustment")
- params = {
- "secid": secid, # 0.=深市, 1.=沪市
- "ut": "fa5fd1943c7b386f172d6893dbfba10b",
- "fields1": "f1,f2,f3,f4,f5,f6",
- "fields2": "f51,f52,f53,f54,f55,f56,f57,f58,f59,f60,f61",
- "klt": 101, # 101=日K, 102=周K, 103=月K, 5/15/30/60=分钟K
- "fqt": fqt,
- "end": end,
- "lmt": lmt,
- "cb": "cb",
- }
- impersonate = random.choice(client_identifier_list)
- r = requests.get(KLINE_URL, impersonate=impersonate, params=params, timeout=10, headers=HEADERS,
- proxies=get_proxys())
- r.raise_for_status()
- matched = re.search(r"\((.*)\);?$", r.text.strip())
- if not matched:
- raise ValueError(f"{secid} 返回格式异常")
- response_data = json.loads(matched.group(1))
- data = response_data.get("data")
- if data is None:
- raise InvalidStockMarketError(f"{secid} 返回 data=null,可能市场前缀不匹配")
- raw_klines = data.get("klines") or []
- if not raw_klines:
- raise InvalidStockMarketError(f"{secid} 没有返回 K 线数据")
- klines = []
- for line in raw_klines:
- f = line.split(",")
- klines.append({
- "date": f[0],
- "open": float(f[1]),
- "close": float(f[2]),
- "high": float(f[3]),
- "low": float(f[4]),
- "volume": float(f[5]),
- "amount": float(f[6]),
- "amp": float(f[7]),
- "pct": float(f[8]),
- "chg": float(f[9]),
- "hsl": float(f[10]), # 换手率 %
- })
- return data, klines
- def normalize_date(endTime: str) -> str:
- """
- 把日期字符串归一化成东方财富接口要求的 YYYYMMDD 格式。
- :param endTime: (str) 日期字符串,支持 "YYYY-MM-DD" 或 "YYYYMMDD"
- :return: (str) 形如 "20260524" 的日期字符串
- """
- if '-' in endTime:
- return endTime.replace('-', '')
- return endTime
- def guess_market_prefix(stock_code: str) -> str:
- """
- 根据股票代码首位判定东方财富 secid 的市场前缀。
- 规则:
- 沪市主板(60) / 科创板(68) / 沪市可转债(11) / 沪市国债逆回购(13) / 沪市基金&ETF(5) → "1."
- 深市主板(00) / 创业板(30) / 深市可转债(12) / 北交所(4x、8x) → "0."
- :param stock_code: (str) 6 位股票代码字符串,例如 "688605"
- :return: (str) "1." 或 "0."
- """
- code = str(stock_code).strip()
- if code.startswith(("60", "68", "11", "13", "5")):
- return "1."
- return "0."
- def get_chip_distribution(log, stockCode, endTime):
- """
- 计算筹码分布接口数据
- :param log: 日志对象
- :param stockCode: 股票代码
- :param endTime: 结束时间
- :return: 接口返回字典
- """
- log.info(f"Fetching data for stock {stockCode} on {endTime}")
- end_time = normalize_date(endTime)
- stock = guess_market_prefix(str(stockCode))
- secid = stock + str(stockCode)
- raw, klines = fetch_kline(log, secid, end=end_time, lmt=210, fqt=1)
- res = CYQCalculator(klines, fator=150).calc(len(klines) - 1)
- trading_day = klines[-1]["date"] # 交易日
- profit_ratio = f'{res["benefitPart"] * 100:.2f}%' # 获利比例
- average_cost = res["avgCost"] # 平均成本
- p90 = res["percentChips"][90] # 90% 成本区间
- p70 = res["percentChips"][70] # 70% 成本区间
- p90_cost_range = p90["priceRange"] # 90% 成本区间
- p90_concentration_ratio = f'{p90["concentration"] * 100:.2f}%' # 90% 成本区间集中度
- p70_cost_range = p70["priceRange"] # 70% 成本区间
- p70_concentration_ratio = f'{p70["concentration"] * 100:.2f}%' # 70% 成本区间集中度
- data_dict = {
- "stock": stock, # 股票前缀 0.=深市, 1.=沪市
- "stock_code": str(stockCode),
- "end_time": endTime,
- "trading_day": trading_day,
- "profit_ratio": profit_ratio,
- "average_cost": average_cost,
- "p90_cost_range": p90_cost_range,
- "p90_concentration_ratio": p90_concentration_ratio,
- "p70_cost_range": p70_cost_range,
- "p70_concentration_ratio": p70_concentration_ratio,
- }
- return data_dict
- def main(log, stockCode, endTime):
- data_dict = get_chip_distribution(log, stockCode, endTime)
- print(data_dict)
- return data_dict
- @app.get("/chip-distribution")
- def chip_distribution_get(
- stockCode: str = Query(..., description="股票代码,例如 688605"),
- endTime: str = Query(..., description="查询日期,支持 YYYY-MM-DD 或 YYYYMMDD"),
- ):
- try:
- return success_response(get_chip_distribution(logger, stockCode, endTime))
- except Exception as exc:
- logger.exception(f"接口查询失败:{exc}")
- return error_response(500, str(exc))
- @app.post("/chip-distribution")
- def chip_distribution_post(payload: ChipRequest):
- try:
- return success_response(get_chip_distribution(logger, payload.stockCode, payload.endTime))
- except Exception as exc:
- logger.exception(f"接口查询失败:{exc}")
- return error_response(500, str(exc))
- if __name__ == "__main__":
- start_time = time.time()
- now = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
- print(f"\n{'=' * 60}\n[{now}] 开始本次查询\n{'=' * 60}")
- try:
- # main(logger, "688605", "2026-05-25")
- main(logger, "300624", "2026-05-25")
- except Exception as me:
- # 单次失败(网络抖动 / 接口异常)不退出,等下一轮重试
- logger.exception(f"本次查询失败:{me}")
- e_time = time.time()
- logger.info(f"本次查询耗时 {e_time - start_time:.2f} 秒")
- """
- 传入
- stockCode: 688605
- endTime:2026-05-24
- """
|