BidTask.java 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396
  1. package cn.hobbystocks.auc.task;
  2. import cn.hobbystocks.auc.app.AppClient;
  3. import cn.hobbystocks.auc.cache.CacheMap;
  4. import cn.hobbystocks.auc.common.constant.Constants;
  5. import cn.hobbystocks.auc.common.core.redis.Locker;
  6. import cn.hobbystocks.auc.common.core.redis.RedisCache;
  7. import cn.hobbystocks.auc.common.user.UserUtils;
  8. import cn.hobbystocks.auc.common.utils.CloneUtils;
  9. import cn.hobbystocks.auc.common.utils.DateUtils;
  10. import cn.hobbystocks.auc.domain.*;
  11. import cn.hobbystocks.auc.handle.RuleHandlerHolder;
  12. import cn.hobbystocks.auc.handle.context.Live;
  13. import cn.hobbystocks.auc.mapper.*;
  14. import cn.hobbystocks.auc.service.DepositOrderService;
  15. import cn.hobbystocks.auc.service.IAuctionService;
  16. import cn.hobbystocks.auc.service.ILotService;
  17. import cn.hobbystocks.auc.service.IOrderService;
  18. import cn.hobbystocks.auc.vo.OrderVO;
  19. import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
  20. import com.google.common.collect.Lists;
  21. import io.micrometer.core.instrument.util.NamedThreadFactory;
  22. import lombok.extern.slf4j.Slf4j;
  23. import org.apache.commons.lang3.StringUtils;
  24. import org.apache.commons.lang3.Strings;
  25. import org.springframework.beans.factory.annotation.Autowired;
  26. import org.springframework.beans.factory.annotation.Value;
  27. import org.springframework.scheduling.annotation.Scheduled;
  28. import org.springframework.stereotype.Component;
  29. import org.springframework.util.CollectionUtils;
  30. import javax.annotation.PostConstruct;
  31. import java.util.*;
  32. import java.util.concurrent.*;
  33. @Component
  34. @Slf4j
  35. public class BidTask implements CacheMap {
  36. // region params
  37. private static final ConcurrentHashMap<String, ConcurrentHashMap<String, Live>> liveCacheMap = new ConcurrentHashMap<>();
  38. @Autowired
  39. private RedisCache redisCache;
  40. @Autowired
  41. private RuleHandlerHolder ruleHandlerHolder;
  42. @Autowired
  43. private Locker locker;
  44. @Autowired
  45. private ILotService lotService;
  46. @Autowired
  47. private IAuctionService auctionService;
  48. @Autowired
  49. private DepositOrderService depositOrderService;
  50. @Autowired
  51. private IOrderService orderService;
  52. @Value("${auction.thread.corePoolSize:50}")
  53. private Integer corePoolSize;
  54. @Value("${auction.thread.maximumPoolSize:100}")
  55. private Integer maximumPoolSize;
  56. @Value("${auction.thread.queueSize:50}")
  57. private Integer queueSize;
  58. @Value("${user.info-url:http://coresvc2/user/}")
  59. private String userUrl;
  60. private ThreadPoolExecutor threadPool;
  61. @Autowired
  62. private AppClient appClient;
  63. @Autowired
  64. private LotMapper lotMapper;
  65. @Autowired
  66. private BidMapper bidMapper;
  67. @Autowired
  68. private BidRecordMapper bidRecordMapper;
  69. // endregion
  70. // region init
  71. @PostConstruct
  72. public void init() {
  73. TimeZone.setDefault(TimeZone.getTimeZone("Asia/Shanghai"));
  74. UserUtils.setUrl(userUrl);
  75. sync();
  76. this.threadPool = new ThreadPoolExecutor(corePoolSize,
  77. maximumPoolSize,
  78. 60L, TimeUnit.SECONDS,
  79. new ArrayBlockingQueue<>(queueSize),
  80. new NamedThreadFactory("tradition-live-lot"),
  81. (r, e) -> log.error("thread pool rejected execution")
  82. );
  83. }
  84. // endregion
  85. // region methods
  86. /**
  87. * 同步 Redis 缓存中的数据到本地的 liveCacheMap 中
  88. */
  89. @Override
  90. public void sync() {
  91. // 遍历所有以 Constants.REDIS_MAP_AUC_LOT_PREFIX 为前缀的 Redis 键
  92. redisCache.keys(Constants.REDIS_MAP_AUC_LOT_PREFIX).forEach(aucKey -> {
  93. // 清空本地缓存
  94. liveCacheMap.clear();
  95. // 将 Redis 中对应键的值(一个 Map)放入本地缓存
  96. liveCacheMap.put(aucKey, new ConcurrentHashMap<>(redisCache.getCacheMap(aucKey)));
  97. });
  98. }
  99. /**
  100. * 更新缓存到本地liveCacheMap
  101. */
  102. @Override
  103. public void putLive(Live live){
  104. live = CloneUtils.clone(live);
  105. ConcurrentHashMap<String, Live> cacheMap = liveCacheMap.get(String.format(Constants.REDIS_MAP_AUC_LOT_TEMPLATE, live.getLot().getAuctionId()));
  106. if (Objects.isNull(cacheMap)) {
  107. cacheMap = new ConcurrentHashMap<>();
  108. }
  109. cacheMap.put(live.getLot().getId().toString(), live);
  110. liveCacheMap.put(String.format(Constants.REDIS_MAP_AUC_LOT_TEMPLATE, live.getLot().getAuctionId()), cacheMap);
  111. }
  112. /**
  113. * 根据拍卖 ID 获取本地缓存中的全部直播数据
  114. */
  115. @Override
  116. public Map<String, Live> viewAuction(Long auctionId) {
  117. ConcurrentHashMap<String, Live> map = liveCacheMap.get(String.format(Constants.REDIS_MAP_AUC_LOT_TEMPLATE, auctionId));
  118. return Objects.isNull(map) ? new HashMap<>() : new HashMap<>(map);
  119. }
  120. /**
  121. * 根据拍品 ID 获取本地缓存中的直播数据
  122. */
  123. @Override
  124. public Live viewLot(Long auctionId, Long lotId) {
  125. ConcurrentHashMap<String, Live> map = liveCacheMap.get(String.format(Constants.REDIS_MAP_AUC_LOT_TEMPLATE, auctionId));
  126. if (CollectionUtils.isEmpty(map)) {
  127. return null;
  128. }
  129. return map.get(lotId.toString());
  130. }
  131. /**
  132. * 本地缓存删除直播数据
  133. */
  134. @Override
  135. public void end(Long auctionId) {
  136. // 本地缓存删除直播数据
  137. liveCacheMap.remove(String.format(Constants.REDIS_MAP_AUC_LOT_TEMPLATE, auctionId));
  138. Lot cond = new Lot();
  139. cond.setDelFlag(Constants.DEL_FLAG_NO_DELETE);
  140. cond.setAuctionId(auctionId);
  141. }
  142. // endregion
  143. // region schedule tasks
  144. /**
  145. * 每5秒检查下订单状态,如订单已过期则更新拍品状态为流拍
  146. */
  147. /*@Scheduled(fixedRate = 5000)
  148. public void expireOrder(){
  149. //查询所有订单状态,如果订单超时未支付,更新拍品状态为流拍
  150. List<Order> orderList = orderService.list();
  151. for (Order order : orderList) {
  152. Lot lot = lotMapper.selectLotById(order.getLotId());
  153. if (System.currentTimeMillis() > order.getExpireTime().getTime()) {
  154. //当前时间超过订单过期时间
  155. if (!Objects.equals(order.getStatus(), 101L) &&
  156. !Objects.equals(order.getStatus(), 102L) &&
  157. !Objects.equals(order.getStatus(), 103L) &&
  158. !Objects.equals(order.getStatus(), 104L) &&
  159. !Objects.equals(order.getStatus(), 105L) &&
  160. !Objects.equals(order.getStatus(), 106L) &&
  161. !Objects.equals(order.getStatus(), 301L)) {
  162. //订单状态未支付
  163. //更新拍品状态为流拍
  164. lotMapper.updateLot(Lot.builder().id(order.getLotId()).status(Constants.LOT_STATUS_PASS).build());
  165. //更新订单标识为已处理
  166. orderService.modifyOrder(Order.builder().orderNo(order.getOrderNo()).flag(1).expireTime(DateUtils.addDays(new Date(),1)).build());
  167. }else{
  168. //订单状态已支付,更新拍品为成交状态
  169. lotMapper.updatePay(order.getLotId(), 1);
  170. orderService.modifyOrder(Order.builder().orderNo(order.getOrderNo()).flag(2).build());
  171. // 通知更新拍品信息
  172. }
  173. appClient.notice(lot.getId().toString(),"","{}","lot_auction","");
  174. }
  175. }
  176. }*/
  177. /**
  178. * 每1秒检查一次正在竞拍的拍品,并更新拍卖状态
  179. */
  180. @Scheduled(fixedRate = 1000)
  181. public void live() {
  182. // 遍历所有正在竞拍的拍品
  183. lotService.selectBidding().forEach(lot -> {
  184. // 根据拍品的拍卖ID生成Redis缓存键
  185. String aucKey = String.format(Constants.REDIS_MAP_AUC_LOT_TEMPLATE, lot.getAuctionId());
  186. // 从Redis缓存中获取对应拍品的数据
  187. Live live = redisCache.getCacheMapValue(aucKey, lot.getId().toString());
  188. if (Objects.nonNull(live)) {
  189. // 将拍卖数据放入本地缓存中
  190. liveCacheMap.computeIfAbsent(aucKey, s -> new ConcurrentHashMap<>())
  191. .put(live.getLot().getId().toString(), live);
  192. // 将拍卖任务提交到线程池中执行
  193. threadPool.submit(() -> {
  194. String currentStatus = ruleHandlerHolder.getCurrentStatus(live);
  195. // 如果当前状态是拍品状态未开始,则返回
  196. if (Objects.equals(Constants.LOT_STATUS_WAITING, currentStatus))
  197. return;
  198. // 尝试获取锁,并执行拍卖计算服务
  199. locker.tryLock(String.format(Constants.REDIS_LOCK_LOT_TEMPLATE, live.getLot().getId()), () -> lotService.live(live));
  200. });
  201. }
  202. });
  203. // 遍历所有拍品状态撤拍的拍品
  204. lotService.selectCancel().forEach(lot -> {
  205. // 根据拍品的拍卖ID生成Redis缓存键
  206. String aucKey = String.format(Constants.REDIS_MAP_AUC_LOT_TEMPLATE, lot.getAuctionId());
  207. // 从本地缓存中移除已取消的拍品的拍卖数据
  208. liveCacheMap.computeIfAbsent(aucKey, s -> new ConcurrentHashMap<>())
  209. .remove(lot.getId().toString());
  210. });
  211. // 遍历本地缓存中的所有拍卖数据
  212. for (String aucKey : liveCacheMap.keySet()) {
  213. ConcurrentHashMap<String, Live> map = liveCacheMap.get(aucKey);
  214. List<String> lotIds = Lists.newArrayList();
  215. // 遍历所有拍卖数据,检查是否有过期的拍卖
  216. map.values().forEach(live -> {
  217. // 如果当前时间超过拍卖结束时间3秒,则将该拍品ID添加到移除列表中
  218. if (live.getCurrentEndTime() + 3000 < System.currentTimeMillis()) {
  219. lotIds.add(live.getLot().getId().toString());
  220. }
  221. });
  222. // 移除过期的拍卖数据
  223. lotIds.forEach(map::remove);
  224. }
  225. }
  226. /**
  227. * 每5分钟检查一次已结束的拍卖,并移除缓存
  228. */
  229. @Scheduled(fixedRate = 5 * 60 * 1000)
  230. public void delFinishLive() {
  231. // 遍历所有正在拍卖的拍卖会
  232. auctionService.live().forEach(auction -> {
  233. // 根据拍卖ID生成Redis缓存键
  234. String aucKey = String.format(Constants.REDIS_MAP_AUC_LOT_TEMPLATE, auction.getId());
  235. // 从Redis缓存中获取对应拍卖的数据Map
  236. Map<String, Live> auctionMap = redisCache.getCacheMap(aucKey);
  237. // 遍历所有直播数据
  238. auctionMap.values().forEach(live -> {
  239. // 获取拍品的状态
  240. String status = live.getLot().getStatus();
  241. // 如果拍品状态为拍品状态流拍 或 已售出并且直播结束时间超过5分钟
  242. if ((Constants.LOT_STATUS_PASS.equals(status) || Constants.LOT_STATUS_SOLD.equals(status)) &&
  243. (live.getCurrentEndTime() + 5 * 60 * 1000) < System.currentTimeMillis()) {
  244. // 取消拍卖,删除redis删除锁
  245. ruleHandlerHolder.cancelLot(live.getLot());
  246. //liveCacheMap 中没有aucKey时插入一个ConcurrentHashMap,再remove掉
  247. liveCacheMap.computeIfAbsent(aucKey, s -> new ConcurrentHashMap<>())
  248. .remove(live.getLot().getId().toString());
  249. // 清缓存中记录的竞价记录
  250. redisCache.delList(String.format(Constants.REDIS_MAP_AUC_LOT_BID_LIST_PREFIX, live.getLot().getId()));
  251. redisCache.delList(String.format(Constants.REDIS_MAP_AUC_LOT_BIDRECORD_LIST_PREFIX, live.getLot().getId()));
  252. redisCache.delList(String.format(Constants.REDIS_MAP_AUC_LOT_BID_USER_TEMPLATE, live.getLot().getId()));
  253. }
  254. });
  255. });
  256. }
  257. /**
  258. * 每秒更新拍卖会状态
  259. */
  260. @Scheduled(fixedRate = 1000)
  261. public void auction(){
  262. auctionService.auctionLive();
  263. }
  264. /**
  265. * 每1秒检查一次拍卖状态,并将缓存中未写入数据库的bid数据写入数据库
  266. */
  267. @Scheduled(fixedDelay = 1000)
  268. private void auctionLotCheck(){
  269. lotService.selectBidding().forEach(lot -> {
  270. List<Bid> bidList = redisCache.getCacheList(String.format(Constants.REDIS_MAP_AUC_LOT_BID_LIST_PREFIX, lot.getId()));
  271. List<BidRecord> bidRecordList = redisCache.getCacheList(String.format(Constants.REDIS_MAP_AUC_LOT_BIDRECORD_LIST_PREFIX, lot.getId()));
  272. redisCache.delList(String.format(Constants.REDIS_MAP_AUC_LOT_BID_LIST_PREFIX, lot.getId()));
  273. redisCache.delList(String.format(Constants.REDIS_MAP_AUC_LOT_BIDRECORD_LIST_PREFIX, lot.getId()));
  274. if(!Objects.isNull(bidList)&&!bidList.isEmpty()){
  275. int size = bidList.size();
  276. for (int i = 0; i < size; i += 100) {
  277. int endIndex = Math.min(i + 100, size);
  278. List<Bid> batch = bidList.subList(i, endIndex);
  279. bidMapper.batchInsertBid(batch);
  280. }
  281. bidMapper.clearCurrentBid(lot.getId());
  282. bidMapper.setCurrentBid(lot.getId());
  283. }
  284. if(!Objects.isNull(bidRecordList)&&!bidRecordList.isEmpty()){
  285. int size = bidRecordList.size();
  286. for (int i = 0; i < size; i += 100) {
  287. int endIndex = Math.min(i + 100, size);
  288. List<BidRecord> batch = bidRecordList.subList(i, endIndex);
  289. bidRecordMapper.batchInsertBidRecord(batch);
  290. }
  291. }
  292. });
  293. }
  294. //拍卖会结束发起退保证金,每五分钟执行一次
  295. @Scheduled(fixedRate = 1000*60*5)
  296. public void depositOrderRefund(){
  297. //查询已支付待退款的保证金订单
  298. DepositOrder depositOrder = new DepositOrder();
  299. depositOrder.setStatus(1);
  300. List<DepositOrder> depositOrderList = depositOrderService.selectDepositOrder(depositOrder);
  301. for (DepositOrder order : depositOrderList) {
  302. Long userId = order.getUserId();
  303. //判断拍卖会是否已结束
  304. Auction auction = auctionService.selectAuctionById(order.getAuctionId());
  305. if (!Constants.GROUP_STATUS_FINISH.equals(auction.getStatus())){
  306. //如果拍卖会未结束,处理下一个保证金订单
  307. continue;
  308. }
  309. if (StringUtils.equals("拍卖会",order.getDepositType())){
  310. //判断是否有未结束的未设置单独保证金的拍品
  311. List<Lot> lots = lotMapper.selectNotEndLotList(order.getAuctionId());
  312. if (!CollectionUtils.isEmpty(lots))
  313. continue;
  314. //拍卖会保证金,查询用户在该拍卖会下是否有非单独保证金的拍品中标未支付订单
  315. List<Order> orderList = orderService.getOrderListByUserAndAuction(order.getAuctionId(), userId);
  316. if (CollectionUtils.isEmpty(orderList)){
  317. //todo 没有待支付订单,或没中标,或已支付,执行退保证金操作
  318. OrderVO orderVO = depositOrderService.refundDepositOrder(order.getOrderNo());
  319. // todo 更新保证金订单状态
  320. depositOrderService.lambdaUpdate().eq(DepositOrder::getId,order.getId()).set(DepositOrder::getStatus,orderVO.getStatus());
  321. continue;
  322. }
  323. //有待支付的订单列表,判断订单是否已过期,并更新订单记录表状态,
  324. for (Order order1 : orderList) {
  325. if (order1.getFlag()==2){
  326. //todo 订单待支付已过期,调用扣除保证金接口,更新订单记录表状态
  327. // todo 更新保证金订单状态
  328. }
  329. }
  330. }else{
  331. Long lotId = order.getLotId();
  332. //查询拍品是否已结束,未结束则跳过
  333. Lot lot = lotMapper.selectLotById(lotId);
  334. if (StringUtils.equalsAny(lot.getStatus(), Constants.LOT_STATUS_WAITING, Constants.LOT_STATUS_STARTING, Constants.LOT_STATUS_BIDDING))
  335. continue;
  336. //已结束,查询用户是否有该拍品的订单,
  337. LambdaQueryWrapper<Order> queryWrapper = new LambdaQueryWrapper<>();
  338. queryWrapper.eq(Order::getLotId,lotId).eq(Order::getUserId,order.getUserId());
  339. Order order1 = orderService.getOne(queryWrapper);
  340. if (order1==null){
  341. //todo 未中标,退拍品保证金
  342. continue;
  343. }
  344. if (order1.getFlag()==2){
  345. //todo 订单已过期 调用接口扣除保证金
  346. }
  347. }
  348. }
  349. }
  350. // endregion
  351. }