Sfoglia il codice sorgente

详情页建立长连接

hr~ 3 settimane fa
parent
commit
40d8e9cd11

+ 9 - 0
bid/pom.xml

@@ -25,6 +25,15 @@
 			<groupId>org.springframework.boot</groupId>
 			<artifactId>spring-boot-starter-web</artifactId>
 		</dependency>
+		<dependency>
+			<groupId>org.springframework.boot</groupId>
+			<artifactId>spring-boot-starter-websocket</artifactId>
+		</dependency>
+		<dependency>
+			<groupId>org.springframework.boot</groupId>
+			<artifactId>spring-boot-starter-test</artifactId>
+			<scope>test</scope>
+		</dependency>
 		<!-- actuator -->
 		<dependency>
 			<groupId>org.springframework.boot</groupId>

+ 3 - 1
bid/src/main/java/cn/hobbystocks/auc/config/SecurityConfig.java

@@ -37,6 +37,7 @@ public class SecurityConfig {
             "/bid/lot/notice/list",
             "/bid/lot/notice/detail/**",
             "/bid/bidding/addPrice",
+            "/bid/ws/lot/**",
             "/auction/banner/list",
             "/lot/hot/list",
             "/lot/list/search",
@@ -44,7 +45,8 @@ public class SecurityConfig {
             "/auction/details",
             "/lot/detail/**",
             "/lot/notice/list",
-            "/lot/notice/detail/**"
+            "/lot/notice/detail/**",
+            "/ws/lot/**"
     };
 
     public SecurityConfig(AuthenticationFilter authenticationFilter) {

+ 25 - 0
bid/src/main/java/cn/hobbystocks/auc/websocket/LotRealtimeMessage.java

@@ -0,0 +1,25 @@
+package cn.hobbystocks.auc.websocket;
+
+import lombok.Builder;
+import lombok.Data;
+
+import java.math.BigDecimal;
+
+@Data
+@Builder
+public class LotRealtimeMessage {
+
+    public static final String TYPE = "LOT_DETAIL_REALTIME";
+
+    private String type;
+    private Long lotId;
+    private Long auctionId;
+    private String status;
+    private Long currentEndTime;
+    private Long timestamp;
+    private Long serverTime;
+    private BigDecimal currentPrice;
+    private Long bidCount;
+    private Long bidPersionCount;
+    private Boolean delayChanged;
+}

+ 56 - 0
bid/src/main/java/cn/hobbystocks/auc/websocket/LotRealtimeWebSocketPublisher.java

@@ -0,0 +1,56 @@
+package cn.hobbystocks.auc.websocket;
+
+import cn.hobbystocks.auc.domain.Lot;
+import cn.hobbystocks.auc.handle.context.Live;
+import cn.hobbystocks.auc.handle.context.tradition.TraditionLive;
+import cn.hobbystocks.auc.realtime.LotRealtimePublisher;
+import com.alibaba.fastjson.JSON;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Component;
+
+import java.math.BigDecimal;
+import java.util.Objects;
+
+@Component
+public class LotRealtimeWebSocketPublisher implements LotRealtimePublisher {
+
+    private final LotWebSocketSessionRegistry sessionRegistry;
+
+    @Autowired
+    public LotRealtimeWebSocketPublisher(LotWebSocketSessionRegistry sessionRegistry) {
+        this.sessionRegistry = sessionRegistry;
+    }
+
+    @Override
+    public void publish(Live live, Long previousEndTime) {
+        if (live == null || live.getLot() == null || live.getLot().getId() == null) {
+            return;
+        }
+        Lot lot = live.getLot();
+        Long currentEndTime = live.getCurrentEndTime();
+        Long serverTime = System.currentTimeMillis();
+        LotRealtimeMessage message = LotRealtimeMessage.builder()
+                .type(LotRealtimeMessage.TYPE)
+                .lotId(lot.getId())
+                .auctionId(lot.getAuctionId())
+                .status(lot.getStatus())
+                .currentEndTime(currentEndTime)
+                .timestamp(currentEndTime == null ? null : currentEndTime - serverTime)
+                .serverTime(serverTime)
+                .currentPrice(currentPrice(live))
+                .bidCount(lot.getBidCount())
+                .bidPersionCount(lot.getBidPersionCount())
+                .delayChanged(previousEndTime != null
+                        && currentEndTime != null
+                        && !Objects.equals(previousEndTime, currentEndTime))
+                .build();
+        sessionRegistry.broadcast(lot.getId(), JSON.toJSONString(message));
+    }
+
+    private BigDecimal currentPrice(Live live) {
+        if (live instanceof TraditionLive) {
+            return ((TraditionLive) live).getCurrentPrice();
+        }
+        return null;
+    }
+}

+ 25 - 0
bid/src/main/java/cn/hobbystocks/auc/websocket/LotWebSocketConfig.java

@@ -0,0 +1,25 @@
+package cn.hobbystocks.auc.websocket;
+
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.web.socket.config.annotation.EnableWebSocket;
+import org.springframework.web.socket.config.annotation.WebSocketConfigurer;
+import org.springframework.web.socket.config.annotation.WebSocketHandlerRegistry;
+
+@Configuration
+@EnableWebSocket
+public class LotWebSocketConfig implements WebSocketConfigurer {
+
+    private final LotWebSocketHandler lotWebSocketHandler;
+
+    @Autowired
+    public LotWebSocketConfig(LotWebSocketHandler lotWebSocketHandler) {
+        this.lotWebSocketHandler = lotWebSocketHandler;
+    }
+
+    @Override
+    public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) {
+        registry.addHandler(lotWebSocketHandler, "/ws/lot/{lotId}", "/bid/ws/lot/{lotId}")
+                .setAllowedOriginPatterns("*");
+    }
+}

+ 59 - 0
bid/src/main/java/cn/hobbystocks/auc/websocket/LotWebSocketHandler.java

@@ -0,0 +1,59 @@
+package cn.hobbystocks.auc.websocket;
+
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Component;
+import org.springframework.web.socket.CloseStatus;
+import org.springframework.web.socket.WebSocketSession;
+import org.springframework.web.socket.handler.TextWebSocketHandler;
+
+import java.net.URI;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+@Component
+@Slf4j
+public class LotWebSocketHandler extends TextWebSocketHandler {
+
+    private static final Pattern LOT_PATH_PATTERN = Pattern.compile(".*/(?:bid/)?ws/lot/(\\d+)$");
+
+    private final LotWebSocketSessionRegistry sessionRegistry;
+    private final ConcurrentMap<String, Long> sessionLots = new ConcurrentHashMap<>();
+
+    @Autowired
+    public LotWebSocketHandler(LotWebSocketSessionRegistry sessionRegistry) {
+        this.sessionRegistry = sessionRegistry;
+    }
+
+    @Override
+    public void afterConnectionEstablished(WebSocketSession session) throws Exception {
+        Long lotId = resolveLotId(session.getUri());
+        if (lotId == null) {
+            session.close(CloseStatus.BAD_DATA);
+            return;
+        }
+        sessionLots.put(session.getId(), lotId);
+        sessionRegistry.register(lotId, session);
+    }
+
+    @Override
+    public void afterConnectionClosed(WebSocketSession session, CloseStatus status) {
+        Long lotId = sessionLots.remove(session.getId());
+        if (lotId != null) {
+            sessionRegistry.unregister(lotId, session);
+        }
+    }
+
+    static Long resolveLotId(URI uri) {
+        if (uri == null) {
+            return null;
+        }
+        Matcher matcher = LOT_PATH_PATTERN.matcher(uri.getPath());
+        if (!matcher.matches()) {
+            return null;
+        }
+        return Long.valueOf(matcher.group(1));
+    }
+}

+ 68 - 0
bid/src/main/java/cn/hobbystocks/auc/websocket/LotWebSocketSessionRegistry.java

@@ -0,0 +1,68 @@
+package cn.hobbystocks.auc.websocket;
+
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.stereotype.Component;
+import org.springframework.web.socket.TextMessage;
+import org.springframework.web.socket.WebSocketSession;
+
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+@Component
+@Slf4j
+public class LotWebSocketSessionRegistry {
+
+    private final ConcurrentMap<Long, Set<WebSocketSession>> sessionsByLotId = new ConcurrentHashMap<>();
+
+    public void register(Long lotId, WebSocketSession session) {
+        sessionsByLotId.computeIfAbsent(lotId, key -> ConcurrentHashMap.newKeySet()).add(session);
+    }
+
+    public void unregister(Long lotId, WebSocketSession session) {
+        Set<WebSocketSession> sessions = sessionsByLotId.get(lotId);
+        if (sessions == null) {
+            return;
+        }
+        sessions.remove(session);
+        if (sessions.isEmpty()) {
+            sessionsByLotId.remove(lotId, sessions);
+        }
+    }
+
+    public void broadcast(Long lotId, String payload) {
+        Set<WebSocketSession> sessions = sessionsByLotId.get(lotId);
+        if (sessions == null || sessions.isEmpty()) {
+            return;
+        }
+        TextMessage message = new TextMessage(payload);
+        for (WebSocketSession session : sessions) {
+            if (!sendIfOpen(session, message)) {
+                sessions.remove(session);
+            }
+        }
+        if (sessions.isEmpty()) {
+            sessionsByLotId.remove(lotId, sessions);
+        }
+    }
+
+    int sessionCount(Long lotId) {
+        Set<WebSocketSession> sessions = sessionsByLotId.get(lotId);
+        return sessions == null ? 0 : sessions.size();
+    }
+
+    private boolean sendIfOpen(WebSocketSession session, TextMessage message) {
+        if (!session.isOpen()) {
+            return false;
+        }
+        try {
+            synchronized (session) {
+                session.sendMessage(message);
+            }
+            return true;
+        } catch (Exception e) {
+            log.warn("send lot websocket message failed, sessionId={}", session.getId(), e);
+            return false;
+        }
+    }
+}

+ 8 - 0
lot/src/main/java/cn/hobbystocks/auc/realtime/LotRealtimePublisher.java

@@ -0,0 +1,8 @@
+package cn.hobbystocks.auc.realtime;
+
+import cn.hobbystocks.auc.handle.context.Live;
+
+public interface LotRealtimePublisher {
+
+    void publish(Live live, Long previousEndTime);
+}

+ 11 - 0
lot/src/main/java/cn/hobbystocks/auc/service/impl/BidServiceImpl.java

@@ -18,6 +18,7 @@ import cn.hobbystocks.auc.handle.context.LiveContext;
 import cn.hobbystocks.auc.handle.context.tradition.TraditionLive;
 import cn.hobbystocks.auc.mapper.BidMapper;
 import cn.hobbystocks.auc.mapper.LotMapper;
+import cn.hobbystocks.auc.realtime.LotRealtimePublisher;
 import cn.hobbystocks.auc.service.IBidService;
 import cn.hobbystocks.auc.vo.BidVO;
 import lombok.extern.slf4j.Slf4j;
@@ -50,6 +51,8 @@ public class BidServiceImpl implements IBidService
     public EventPublisher eventPublisher;
     @Autowired(required = false)
     private CacheMap cacheMap;
+    @Autowired(required = false)
+    private LotRealtimePublisher lotRealtimePublisher;
 
     @Override
     public Bid selectBidById(Long id)
@@ -73,6 +76,7 @@ public class BidServiceImpl implements IBidService
         locker.tryLock(Constants.REDIS_LOCK_SYNC_LOT_TEMPLATE, () -> {
             locker.tryLock(String.format(Constants.REDIS_LOCK_LOT_TEMPLATE, bid.getLotId()), () ->{
                 Live live = redisCache.getCacheMapValue(String.format(Constants.REDIS_MAP_AUC_LOT_TEMPLATE, bid.getAuctionId()), bid.getLotId().toString());
+                Long previousEndTime = live.getCurrentEndTime();
                 // 插入新的出价
                 insertCurrBid(bid, live.getRound());
                 // 执行addPrice方法之前已进行了拍卖状态的判断
@@ -90,6 +94,7 @@ public class BidServiceImpl implements IBidService
                         redisCache.setList(String.format(Constants.REDIS_MAP_AUC_LOT_BID_TEMPLATE, live.getLot().getId()), bidList);
                         live.getLot().setProperties(null);
                         redisCache.setCacheMapValue(String.format(Constants.REDIS_MAP_AUC_LOT_TEMPLATE, live.getLot().getAuctionId()), live.getLot().getId().toString(), live);
+                        publishRealtime(live, previousEndTime);
                         // 这里发送 状态变更消息 其中同步内存中的缓存和发送IM消息给APP
                         //TODO 异常问题
 //                        eventPublisher.publishChangeEvent(new ChangeEvent(live, null, bidList));
@@ -155,6 +160,12 @@ public class BidServiceImpl implements IBidService
         return result.get();
     }
 
+    private void publishRealtime(Live live, Long previousEndTime) {
+        if (Objects.nonNull(lotRealtimePublisher)) {
+            lotRealtimePublisher.publish(live, previousEndTime);
+        }
+    }
+
     private void insertCurrBid(BidVO bid, Long round) {
         bidMapper.clearCurrentBid(bid.getLotId());
         bid.setDelFlag(Constants.DEL_FLAG_NO_DELETE);