| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396 |
- package cn.hobbystocks.auc.task;
- import cn.hobbystocks.auc.app.AppClient;
- import cn.hobbystocks.auc.cache.CacheMap;
- import cn.hobbystocks.auc.common.constant.Constants;
- import cn.hobbystocks.auc.common.core.redis.Locker;
- import cn.hobbystocks.auc.common.core.redis.RedisCache;
- import cn.hobbystocks.auc.common.user.UserUtils;
- import cn.hobbystocks.auc.common.utils.CloneUtils;
- import cn.hobbystocks.auc.common.utils.DateUtils;
- import cn.hobbystocks.auc.domain.*;
- import cn.hobbystocks.auc.handle.RuleHandlerHolder;
- import cn.hobbystocks.auc.handle.context.Live;
- import cn.hobbystocks.auc.mapper.*;
- import cn.hobbystocks.auc.service.DepositOrderService;
- import cn.hobbystocks.auc.service.IAuctionService;
- import cn.hobbystocks.auc.service.ILotService;
- import cn.hobbystocks.auc.service.IOrderService;
- import cn.hobbystocks.auc.vo.OrderVO;
- import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
- import com.google.common.collect.Lists;
- import io.micrometer.core.instrument.util.NamedThreadFactory;
- import lombok.extern.slf4j.Slf4j;
- import org.apache.commons.lang3.StringUtils;
- import org.apache.commons.lang3.Strings;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.beans.factory.annotation.Value;
- import org.springframework.scheduling.annotation.Scheduled;
- import org.springframework.stereotype.Component;
- import org.springframework.util.CollectionUtils;
- import javax.annotation.PostConstruct;
- import java.util.*;
- import java.util.concurrent.*;
- @Component
- @Slf4j
- public class BidTask implements CacheMap {
- // region params
- private static final ConcurrentHashMap<String, ConcurrentHashMap<String, Live>> liveCacheMap = new ConcurrentHashMap<>();
- @Autowired
- private RedisCache redisCache;
- @Autowired
- private RuleHandlerHolder ruleHandlerHolder;
- @Autowired
- private Locker locker;
- @Autowired
- private ILotService lotService;
- @Autowired
- private IAuctionService auctionService;
- @Autowired
- private DepositOrderService depositOrderService;
- @Autowired
- private IOrderService orderService;
- @Value("${auction.thread.corePoolSize:50}")
- private Integer corePoolSize;
- @Value("${auction.thread.maximumPoolSize:100}")
- private Integer maximumPoolSize;
- @Value("${auction.thread.queueSize:50}")
- private Integer queueSize;
- @Value("${user.info-url:http://coresvc2/user/}")
- private String userUrl;
- private ThreadPoolExecutor threadPool;
- @Autowired
- private AppClient appClient;
- @Autowired
- private LotMapper lotMapper;
- @Autowired
- private BidMapper bidMapper;
- @Autowired
- private BidRecordMapper bidRecordMapper;
- // endregion
- // region init
- @PostConstruct
- public void init() {
- TimeZone.setDefault(TimeZone.getTimeZone("Asia/Shanghai"));
- UserUtils.setUrl(userUrl);
- sync();
- this.threadPool = new ThreadPoolExecutor(corePoolSize,
- maximumPoolSize,
- 60L, TimeUnit.SECONDS,
- new ArrayBlockingQueue<>(queueSize),
- new NamedThreadFactory("tradition-live-lot"),
- (r, e) -> log.error("thread pool rejected execution")
- );
- }
- // endregion
- // region methods
- /**
- * 同步 Redis 缓存中的数据到本地的 liveCacheMap 中
- */
- @Override
- public void sync() {
- // 遍历所有以 Constants.REDIS_MAP_AUC_LOT_PREFIX 为前缀的 Redis 键
- redisCache.keys(Constants.REDIS_MAP_AUC_LOT_PREFIX).forEach(aucKey -> {
- // 清空本地缓存
- liveCacheMap.clear();
- // 将 Redis 中对应键的值(一个 Map)放入本地缓存
- liveCacheMap.put(aucKey, new ConcurrentHashMap<>(redisCache.getCacheMap(aucKey)));
- });
- }
- /**
- * 更新缓存到本地liveCacheMap
- */
- @Override
- public void putLive(Live live){
- live = CloneUtils.clone(live);
- ConcurrentHashMap<String, Live> cacheMap = liveCacheMap.get(String.format(Constants.REDIS_MAP_AUC_LOT_TEMPLATE, live.getLot().getAuctionId()));
- if (Objects.isNull(cacheMap)) {
- cacheMap = new ConcurrentHashMap<>();
- }
- cacheMap.put(live.getLot().getId().toString(), live);
- liveCacheMap.put(String.format(Constants.REDIS_MAP_AUC_LOT_TEMPLATE, live.getLot().getAuctionId()), cacheMap);
- }
- /**
- * 根据拍卖 ID 获取本地缓存中的全部直播数据
- */
- @Override
- public Map<String, Live> viewAuction(Long auctionId) {
- ConcurrentHashMap<String, Live> map = liveCacheMap.get(String.format(Constants.REDIS_MAP_AUC_LOT_TEMPLATE, auctionId));
- return Objects.isNull(map) ? new HashMap<>() : new HashMap<>(map);
- }
- /**
- * 根据拍品 ID 获取本地缓存中的直播数据
- */
- @Override
- public Live viewLot(Long auctionId, Long lotId) {
- ConcurrentHashMap<String, Live> map = liveCacheMap.get(String.format(Constants.REDIS_MAP_AUC_LOT_TEMPLATE, auctionId));
- if (CollectionUtils.isEmpty(map)) {
- return null;
- }
- return map.get(lotId.toString());
- }
- /**
- * 本地缓存删除直播数据
- */
- @Override
- public void end(Long auctionId) {
- // 本地缓存删除直播数据
- liveCacheMap.remove(String.format(Constants.REDIS_MAP_AUC_LOT_TEMPLATE, auctionId));
- Lot cond = new Lot();
- cond.setDelFlag(Constants.DEL_FLAG_NO_DELETE);
- cond.setAuctionId(auctionId);
- }
- // endregion
- // region schedule tasks
- /**
- * 每5秒检查下订单状态,如订单已过期则更新拍品状态为流拍
- */
- /*@Scheduled(fixedRate = 5000)
- public void expireOrder(){
- //查询所有订单状态,如果订单超时未支付,更新拍品状态为流拍
- List<Order> orderList = orderService.list();
- for (Order order : orderList) {
- Lot lot = lotMapper.selectLotById(order.getLotId());
- if (System.currentTimeMillis() > order.getExpireTime().getTime()) {
- //当前时间超过订单过期时间
- if (!Objects.equals(order.getStatus(), 101L) &&
- !Objects.equals(order.getStatus(), 102L) &&
- !Objects.equals(order.getStatus(), 103L) &&
- !Objects.equals(order.getStatus(), 104L) &&
- !Objects.equals(order.getStatus(), 105L) &&
- !Objects.equals(order.getStatus(), 106L) &&
- !Objects.equals(order.getStatus(), 301L)) {
- //订单状态未支付
- //更新拍品状态为流拍
- lotMapper.updateLot(Lot.builder().id(order.getLotId()).status(Constants.LOT_STATUS_PASS).build());
- //更新订单标识为已处理
- orderService.modifyOrder(Order.builder().orderNo(order.getOrderNo()).flag(1).expireTime(DateUtils.addDays(new Date(),1)).build());
- }else{
- //订单状态已支付,更新拍品为成交状态
- lotMapper.updatePay(order.getLotId(), 1);
- orderService.modifyOrder(Order.builder().orderNo(order.getOrderNo()).flag(2).build());
- // 通知更新拍品信息
- }
- appClient.notice(lot.getId().toString(),"","{}","lot_auction","");
- }
- }
- }*/
- /**
- * 每1秒检查一次正在竞拍的拍品,并更新拍卖状态
- */
- @Scheduled(fixedRate = 1000)
- public void live() {
- // 遍历所有正在竞拍的拍品
- lotService.selectBidding().forEach(lot -> {
- // 根据拍品的拍卖ID生成Redis缓存键
- String aucKey = String.format(Constants.REDIS_MAP_AUC_LOT_TEMPLATE, lot.getAuctionId());
- // 从Redis缓存中获取对应拍品的数据
- Live live = redisCache.getCacheMapValue(aucKey, lot.getId().toString());
- if (Objects.nonNull(live)) {
- // 将拍卖数据放入本地缓存中
- liveCacheMap.computeIfAbsent(aucKey, s -> new ConcurrentHashMap<>())
- .put(live.getLot().getId().toString(), live);
- // 将拍卖任务提交到线程池中执行
- threadPool.submit(() -> {
- String currentStatus = ruleHandlerHolder.getCurrentStatus(live);
- // 如果当前状态是拍品状态未开始,则返回
- if (Objects.equals(Constants.LOT_STATUS_WAITING, currentStatus))
- return;
- // 尝试获取锁,并执行拍卖计算服务
- locker.tryLock(String.format(Constants.REDIS_LOCK_LOT_TEMPLATE, live.getLot().getId()), () -> lotService.live(live));
- });
- }
- });
- // 遍历所有拍品状态撤拍的拍品
- lotService.selectCancel().forEach(lot -> {
- // 根据拍品的拍卖ID生成Redis缓存键
- String aucKey = String.format(Constants.REDIS_MAP_AUC_LOT_TEMPLATE, lot.getAuctionId());
- // 从本地缓存中移除已取消的拍品的拍卖数据
- liveCacheMap.computeIfAbsent(aucKey, s -> new ConcurrentHashMap<>())
- .remove(lot.getId().toString());
- });
- // 遍历本地缓存中的所有拍卖数据
- for (String aucKey : liveCacheMap.keySet()) {
- ConcurrentHashMap<String, Live> map = liveCacheMap.get(aucKey);
- List<String> lotIds = Lists.newArrayList();
- // 遍历所有拍卖数据,检查是否有过期的拍卖
- map.values().forEach(live -> {
- // 如果当前时间超过拍卖结束时间3秒,则将该拍品ID添加到移除列表中
- if (live.getCurrentEndTime() + 3000 < System.currentTimeMillis()) {
- lotIds.add(live.getLot().getId().toString());
- }
- });
- // 移除过期的拍卖数据
- lotIds.forEach(map::remove);
- }
- }
- /**
- * 每5分钟检查一次已结束的拍卖,并移除缓存
- */
- @Scheduled(fixedRate = 5 * 60 * 1000)
- public void delFinishLive() {
- // 遍历所有正在拍卖的拍卖会
- auctionService.live().forEach(auction -> {
- // 根据拍卖ID生成Redis缓存键
- String aucKey = String.format(Constants.REDIS_MAP_AUC_LOT_TEMPLATE, auction.getId());
- // 从Redis缓存中获取对应拍卖的数据Map
- Map<String, Live> auctionMap = redisCache.getCacheMap(aucKey);
- // 遍历所有直播数据
- auctionMap.values().forEach(live -> {
- // 获取拍品的状态
- String status = live.getLot().getStatus();
- // 如果拍品状态为拍品状态流拍 或 已售出并且直播结束时间超过5分钟
- if ((Constants.LOT_STATUS_PASS.equals(status) || Constants.LOT_STATUS_SOLD.equals(status)) &&
- (live.getCurrentEndTime() + 5 * 60 * 1000) < System.currentTimeMillis()) {
- // 取消拍卖,删除redis删除锁
- ruleHandlerHolder.cancelLot(live.getLot());
- //liveCacheMap 中没有aucKey时插入一个ConcurrentHashMap,再remove掉
- liveCacheMap.computeIfAbsent(aucKey, s -> new ConcurrentHashMap<>())
- .remove(live.getLot().getId().toString());
- // 清缓存中记录的竞价记录
- redisCache.delList(String.format(Constants.REDIS_MAP_AUC_LOT_BID_LIST_PREFIX, live.getLot().getId()));
- redisCache.delList(String.format(Constants.REDIS_MAP_AUC_LOT_BIDRECORD_LIST_PREFIX, live.getLot().getId()));
- redisCache.delList(String.format(Constants.REDIS_MAP_AUC_LOT_BID_USER_TEMPLATE, live.getLot().getId()));
- }
- });
- });
- }
- /**
- * 每秒更新拍卖会状态
- */
- @Scheduled(fixedRate = 1000)
- public void auction(){
- auctionService.auctionLive();
- }
- /**
- * 每1秒检查一次拍卖状态,并将缓存中未写入数据库的bid数据写入数据库
- */
- @Scheduled(fixedDelay = 1000)
- private void auctionLotCheck(){
- lotService.selectBidding().forEach(lot -> {
- List<Bid> bidList = redisCache.getCacheList(String.format(Constants.REDIS_MAP_AUC_LOT_BID_LIST_PREFIX, lot.getId()));
- List<BidRecord> bidRecordList = redisCache.getCacheList(String.format(Constants.REDIS_MAP_AUC_LOT_BIDRECORD_LIST_PREFIX, lot.getId()));
- redisCache.delList(String.format(Constants.REDIS_MAP_AUC_LOT_BID_LIST_PREFIX, lot.getId()));
- redisCache.delList(String.format(Constants.REDIS_MAP_AUC_LOT_BIDRECORD_LIST_PREFIX, lot.getId()));
- if(!Objects.isNull(bidList)&&!bidList.isEmpty()){
- int size = bidList.size();
- for (int i = 0; i < size; i += 100) {
- int endIndex = Math.min(i + 100, size);
- List<Bid> batch = bidList.subList(i, endIndex);
- bidMapper.batchInsertBid(batch);
- }
- bidMapper.clearCurrentBid(lot.getId());
- bidMapper.setCurrentBid(lot.getId());
- }
- if(!Objects.isNull(bidRecordList)&&!bidRecordList.isEmpty()){
- int size = bidRecordList.size();
- for (int i = 0; i < size; i += 100) {
- int endIndex = Math.min(i + 100, size);
- List<BidRecord> batch = bidRecordList.subList(i, endIndex);
- bidRecordMapper.batchInsertBidRecord(batch);
- }
- }
- });
- }
- //拍卖会结束发起退保证金,每五分钟执行一次
- @Scheduled(fixedRate = 1000*60*5)
- public void depositOrderRefund(){
- //查询已支付待退款的保证金订单
- DepositOrder depositOrder = new DepositOrder();
- depositOrder.setStatus(1);
- List<DepositOrder> depositOrderList = depositOrderService.selectDepositOrder(depositOrder);
- for (DepositOrder order : depositOrderList) {
- Long userId = order.getUserId();
- //判断拍卖会是否已结束
- Auction auction = auctionService.selectAuctionById(order.getAuctionId());
- if (!Constants.GROUP_STATUS_FINISH.equals(auction.getStatus())){
- //如果拍卖会未结束,处理下一个保证金订单
- continue;
- }
- if (StringUtils.equals("拍卖会",order.getDepositType())){
- //判断是否有未结束的未设置单独保证金的拍品
- List<Lot> lots = lotMapper.selectNotEndLotList(order.getAuctionId());
- if (!CollectionUtils.isEmpty(lots))
- continue;
- //拍卖会保证金,查询用户在该拍卖会下是否有非单独保证金的拍品中标未支付订单
- List<Order> orderList = orderService.getOrderListByUserAndAuction(order.getAuctionId(), userId);
- if (CollectionUtils.isEmpty(orderList)){
- //todo 没有待支付订单,或没中标,或已支付,执行退保证金操作
- OrderVO orderVO = depositOrderService.refundDepositOrder(order.getOrderNo());
- // todo 更新保证金订单状态
- depositOrderService.lambdaUpdate().eq(DepositOrder::getId,order.getId()).set(DepositOrder::getStatus,orderVO.getStatus());
- continue;
- }
- //有待支付的订单列表,判断订单是否已过期,并更新订单记录表状态,
- for (Order order1 : orderList) {
- if (order1.getFlag()==2){
- //todo 订单待支付已过期,调用扣除保证金接口,更新订单记录表状态
- // todo 更新保证金订单状态
- }
- }
- }else{
- Long lotId = order.getLotId();
- //查询拍品是否已结束,未结束则跳过
- Lot lot = lotMapper.selectLotById(lotId);
- if (StringUtils.equalsAny(lot.getStatus(), Constants.LOT_STATUS_WAITING, Constants.LOT_STATUS_STARTING, Constants.LOT_STATUS_BIDDING))
- continue;
- //已结束,查询用户是否有该拍品的订单,
- LambdaQueryWrapper<Order> queryWrapper = new LambdaQueryWrapper<>();
- queryWrapper.eq(Order::getLotId,lotId).eq(Order::getUserId,order.getUserId());
- Order order1 = orderService.getOne(queryWrapper);
- if (order1==null){
- //todo 未中标,退拍品保证金
- continue;
- }
- if (order1.getFlag()==2){
- //todo 订单已过期 调用接口扣除保证金
- }
- }
- }
- }
- // endregion
- }
|