/*
 * Decompiled with CFR 0.152.
 */
package com.lop.open.api.sdk.internal.msg;

import com.lop.open.api.sdk.internal.fastjson.JSON;
import com.lop.open.api.sdk.internal.msg.LopMsgClient;
import com.lop.open.api.sdk.internal.msg.MessageProcessor;
import com.lop.open.api.sdk.internal.msg.pojo.LopMessage;
import com.lop.open.api.sdk.internal.msg.pojo.LopMsgStatus;
import com.lop.open.api.sdk.internal.msg.utils.MessageUtils;
import com.lop.open.api.sdk.internal.msg.utils.WebsocketUtils;
import com.lop.open.api.sdk.internal.msg.wsclient.OpeningHandshakeException;
import com.lop.open.api.sdk.internal.msg.wsclient.WebSocket;
import com.lop.open.api.sdk.internal.msg.wsclient.WebSocketAdapter;
import com.lop.open.api.sdk.internal.msg.wsclient.WebSocketException;
import com.lop.open.api.sdk.internal.msg.wsclient.WebSocketFrame;
import java.util.Date;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

class LopWSAdapter
extends WebSocketAdapter {
    private MessageProcessor messageProcessor;
    private volatile boolean stopped = false;
    private int queueSize = 2000;
    private int threadCount = Runtime.getRuntime().availableProcessors() * 10;
    private int fetchPeriod = 15;
    private final ThreadPoolExecutor threadPool = new ThreadPoolExecutor(this.threadCount, this.threadCount, this.fetchPeriod * 2, TimeUnit.MICROSECONDS, new ArrayBlockingQueue<Runnable>(this.queueSize));
    private LopMsgClient lopMsgClient;
    private int rowCount = 0;
    private AtomicLong reconnectTimes = new AtomicLong(1L);
    private static final Logger log = LoggerFactory.getLogger(LopWSAdapter.class);
    private static final int MAX_INTERVAL_TIME = 300000;
    private static final int INTERVAL_TIME = 500;

    public LopWSAdapter(LopMsgClient lopMsgClient) {
        this.lopMsgClient = lopMsgClient;
    }

    public LopWSAdapter setMessageProcessor(MessageProcessor messageProcessor) {
        this.messageProcessor = messageProcessor;
        return this;
    }

    public ThreadPoolExecutor getThreadPool() {
        return this.threadPool;
    }

    public void close() {
        this.stopped = true;
    }

    @Override
    public void onTextMessage(WebSocket websocket, String text) throws Exception {
        if (log.isDebugEnabled()) {
            log.debug("onTextMessage text:{}", (Object)text);
        }
        this.handleMessage(websocket, text);
    }

    private void handleMessage(WebSocket websocket, String rawMsg) {
        final LopMessage msg = MessageUtils.parse(rawMsg);
        final WebSocket wsk = websocket;
        while (!this.stopped) {
            try {
                this.threadPool.submit(new Runnable(){

                    @Override
                    public void run() {
                        LopMsgStatus msgStatus = new LopMsgStatus(msg.getMsgId());
                        try {
                            LopWSAdapter.this.messageProcessor.onMessage(msg, msgStatus);
                            if (!msgStatus.isFail()) {
                                LopWSAdapter.this.confirm(wsk, msgStatus);
                            }
                        }
                        catch (Exception e) {
                            log.error("consume or confirm msg error, msgId:" + msg.getMsgId(), (Throwable)e);
                        }
                    }
                });
                break;
            }
            catch (RejectedExecutionException e) {
                log.error("all lop message worker threads are busy currently, msgId:" + msg.getMsgId(), (Throwable)e);
                try {
                    Thread.sleep(50L);
                }
                catch (InterruptedException ie) {
                    Thread.currentThread().interrupt();
                }
            }
        }
    }

    private void confirm(WebSocket websocket, LopMsgStatus status) {
        if (log.isDebugEnabled()) {
            log.debug("confirm status:{}", (Object)JSON.toJSONString(status));
        }
        websocket.sendText(MessageUtils.genConfirmMsg(status));
    }

    @Override
    public void onDisconnected(WebSocket websocket, WebSocketFrame serverCloseFrame, WebSocketFrame clientCloseFrame, boolean closedByServer) throws Exception {
        super.onDisconnected(websocket, serverCloseFrame, clientCloseFrame, closedByServer);
        while (!this.stopped) {
            try {
                websocket = websocket.recreate().connect();
                this.lopMsgClient.setWebsocket(websocket);
                this.reconnectTimes.set(1L);
                if (!log.isDebugEnabled()) break;
                log.debug("reconnect success...");
                break;
            }
            catch (OpeningHandshakeException ohsException) {
                String uri = websocket.getURI().toString();
                log.error("onDisconnected OpeningHandshakeException, uri:" + uri, (Throwable)ohsException);
                if (uri == null || !uri.contains("?")) continue;
                try {
                    this.reconnectClient(uri.split("\\?")[0]);
                    this.reconnectTimes.set(1L);
                    if (!log.isDebugEnabled()) break;
                    log.debug("reconnect success......");
                    break;
                }
                catch (Exception e) {
                    long times = this.reconnectTimes.getAndAdd(1L);
                    log.error("reconnect error with new timestamp, totalTimes:" + times, (Throwable)e);
                    Thread.sleep(Math.min(300000L, 500L * times));
                }
            }
            catch (Exception e) {
                long times = this.reconnectTimes.getAndAdd(1L);
                log.error("reconnect error, totalTimes:" + times, (Throwable)e);
                Thread.sleep(Math.min(300000L, 500L * times));
            }
        }
    }

    private WebSocket reconnectClient(String uri) throws Exception {
        this.lopMsgClient.setTimestamp(WebsocketUtils.genTimestamp(new Date()));
        this.lopMsgClient.connect(uri);
        return this.lopMsgClient.getWebsocket();
    }

    @Override
    public void onPingFrame(WebSocket websocket, WebSocketFrame frame) throws Exception {
        super.onPingFrame(websocket, frame);
        if (log.isDebugEnabled()) {
            log.debug("ping payload:{}", (Object)frame.getPayloadText());
        }
    }

    @Override
    public void onPongFrame(WebSocket websocket, WebSocketFrame frame) throws Exception {
        super.onPongFrame(websocket, frame);
        if (log.isDebugEnabled()) {
            log.debug("pong payload:{}", (Object)frame.getPayloadText());
        }
    }

    @Override
    public void onError(WebSocket websocket, WebSocketException cause) throws Exception {
        super.onError(websocket, cause);
        log.error("onError", (Throwable)cause);
    }

    @Override
    public void onConnected(WebSocket websocket, Map<String, List<String>> headers) throws Exception {
        super.onConnected(websocket, headers);
        if (log.isInfoEnabled()) {
            log.info("connect to server success");
        }
    }
}

