import Feed, { EventType } from "@dxfeed/api";
import { createSlice } from "@reduxjs/toolkit";
import { createAsyncThunk } from "@reduxjs/toolkit";
import { debounce, DebouncedFunc, throttle } from "lodash";
import { RootState } from "../store";

type dxFeedState = {
  marketData: Record<string, number>;
  error: string | null;
  subscribedSymbols: string[];
};
const initialState: dxFeedState = {
  marketData: {},
  error: null,
  subscribedSymbols: [],
};

let socket;
const unsubscribeMap: Record<string, () => void> = {};
let debounceBatchUpdate: DebouncedFunc<() => void> | null = null;
let eventQueue: Record<string, number> = {};

const connectToSocket = createAsyncThunk(
  "dxFeed/connectSocket",
  async (_, { rejectWithValue }) => {
    try {
      const socketUrl = process.env.REACT_APP_DXFEED_SOCKET_URL;
      if (!socketUrl) {
        throw new Error("Socket URL is not defined.");
      }
      if (!socket) {
        socket = new Feed();
        await socket.connect(socketUrl);
      }
    }
    catch (error) {
      return rejectWithValue(error.message);
    }
  },
);

export const subscribeToDXFeed = createAsyncThunk<
  { symbolsToSubscribe: string[] },
  string[]
>(
  "dxFeed/subscribe",
  async (symbols, { dispatch, getState, rejectWithValue }) => {
    try {
      if (!socket) {
        throw new Error("Socket is not connected.");
      }
      const state = getState() as RootState;

      // outer throttle prevents the inner debounce from being indefinitely postponed by ensuring
      // that the debounce function is triggered at most once every 2 seconds
      if (!debounceBatchUpdate) {
        debounceBatchUpdate = throttle(
          debounce(() => {
            dispatch(updateMarketData(eventQueue));
            eventQueue = {};
          }, 1000),
          2000,
        );
      }

      const symbolsSet = new Set(symbols);
      const previousSymbols = state.dxFeed.subscribedSymbols;
      const previousSymbolsSet = new Set(previousSymbols);

      // unsubscribe symbols that are no longer in the new list
      const symbolsToUnsubscribe = [...previousSymbols].filter(
        symbol => !symbolsSet.has(symbol),
      );

      for (const symbol of symbolsToUnsubscribe) {
        const unsubscribe = unsubscribeMap[symbol];
        if (unsubscribe) {
          unsubscribe();
          delete unsubscribeMap[symbol];
        }
      }

      // Determine the new symbols to subscribe to (avoid resubscribing)
      const symbolsToSubscribe = [...symbols].filter(
        symbol => !previousSymbolsSet.has(symbol),
      );

      if (symbolsToSubscribe.length > 0) {
        for (const symbol of symbolsToSubscribe) {
          if (!unsubscribeMap[symbol]) {
            const callback = (message) => {
              const { close: price, eventSymbol } = message;
              eventQueue[eventSymbol] = price;
              if (debounceBatchUpdate) {
                debounceBatchUpdate();
              }
            };
            const unsubscribe = socket.subscribe(
              [EventType.Candle],
              [symbol],
              callback,
            );
            unsubscribeMap[symbol] = unsubscribe;
          }
        }
      }

      return { symbolsToSubscribe };
    }
    catch (error) {
      return rejectWithValue(error instanceof Error ? error.message : "Unknown error");
    }
  },
);

export const connectToDXFeed = createAsyncThunk<
  { symbolsToSubscribe: string[] },
  string[],
  { rejectValue: string }
>(
  "dxFeed/connect",
  async (symbolsToSubscribe, { dispatch, rejectWithValue }) => {
    try {
      if (!socket) {
        await dispatch(connectToSocket()).unwrap();
      }
      await dispatch(subscribeToDXFeed(symbolsToSubscribe)).unwrap();

      return { symbolsToSubscribe };
    }
    catch (error) {
      return rejectWithValue(error instanceof Error ? error.message : "Error with connecting to DX feed");
    }
  },
);

const dxFeedSlice = createSlice({
  name: "dxFeed",
  initialState,
  reducers: {
    updateMarketData: (state, action) => {
      state.marketData = {
        ...state.marketData,
        ...action.payload,
      };
    },
    disconnectFromSocket: (state) => {
      if (debounceBatchUpdate) {
        debounceBatchUpdate.cancel();
        debounceBatchUpdate = null;
      }
      if (socket) {
        for (const symbol in unsubscribeMap) {
          if (unsubscribeMap[symbol]) {
            unsubscribeMap[symbol]();
            delete unsubscribeMap[symbol];
          }
        }

        try {
          if (socket.connected) {
            socket.disconnect();
          }
        }
        catch (error) {
          console.error("Error during socket disconnection:", error);
        }
        finally {
          socket = null;
        }
        state.marketData = {};
        state.subscribedSymbols = [];
      }
    },
  },
  extraReducers: (builder) => {
    builder
      .addCase(connectToDXFeed.fulfilled, (state, action) => {
        state.error = null;
        const newSymbols = action.payload.symbolsToSubscribe;
        state.subscribedSymbols = Array.from(new Set([...state.subscribedSymbols, ...newSymbols]));
      })
      .addCase(connectToDXFeed.rejected, (state, action) => {
        state.error = action.payload || null;
      });
  },
});

export const { updateMarketData, disconnectFromSocket } = dxFeedSlice.actions;
export default dxFeedSlice.reducer;
