import { filter, finalize, map, merge, retry, startWith, Subject, Subscription, tap, timer } from 'rxjs';
import type { Observable } from 'rxjs';
import { Disposable, inject, singleton } from 'tsyringe';
import { MarketData, MarketDataEvent } from './marketdata.common';
import type { IMarketDataService } from './marketdata.common';
import { GetInstrumentDocument } from '@oms/generated/frontend';
import type {
  GetInstrumentQuery,
  GetInstrumentQueryVariables,
  Level1IntegrationEvent
} from '@oms/generated/frontend';
import { FactsetClient } from './factset/factset.client';
import { level1 } from './factset/factset.observables';
import { ProcessState } from '@app/data-access/memory/process-id.subject';
import { MarketDataEventUnion, MarketDataUpdateEvent } from '@app/common/marketdata/marketdata.events';
import { testScoped } from '@app/workspace.registry';
import { BroadcastSubject } from '@oms/shared-frontend/rx-broadcast';
import isEmpty from 'lodash/isEmpty';
import { RxApolloClient } from '@app/data-access/api/rx-apollo-client';

/**
 * - Multi-process -> connect to factset for any window that needs market data.
 * - Single-process -> connect to factset on the main process only, and use broadcast-channel to send messages to other windows.
 */
export type MarketDataConnectionStrategy = 'multi-process' | 'single-process';
export const MarketDataConnectionStrategyToken = 'MarketDataConnectionStrategyToken';

type MarketDataSubscription = {
  processes: string[];
  subscription?: Subscription;
  latest?: Level1IntegrationEvent;
};

@testScoped
@singleton()
export class MarketDataService implements IMarketDataService, Disposable {
  protected apolloClient: RxApolloClient;
  protected factsetClient: FactsetClient;
  protected processState: ProcessState;
  protected connectionStrategy: MarketDataConnectionStrategy = 'single-process';
  protected subUnsubSubscription?: Subscription;
  protected tickerSubscribers: Map<string, MarketDataSubscription>;
  protected marketData$: Subject<MarketDataEventUnion>;
  protected tickerObservers: Map<string, number> = new Map();

  constructor(
    @inject(FactsetClient) factsetClient: FactsetClient,
    @inject(RxApolloClient) apolloClient: RxApolloClient,
    @inject(ProcessState) processState: ProcessState,
    @inject(MarketDataConnectionStrategyToken) connectionStrategy: MarketDataConnectionStrategy
  ) {
    this.connectionStrategy = connectionStrategy;
    this.factsetClient = factsetClient;
    this.apolloClient = apolloClient;
    this.processState = processState;
    this.tickerSubscribers = new Map();
    this.marketData$ =
      this.connectionStrategy === 'multi-process'
        ? new Subject<MarketDataEventUnion>()
        : new BroadcastSubject(`${this.processState.LEADER_PROCESS_ID}-marketdata`);
  }

  public start(fromPlugin?: boolean) {
    const isLeaderProcess = this.processState.processId === this.processState.LEADER_PROCESS_ID;
    const shouldConnectToFactset =
      (isLeaderProcess && this.connectionStrategy === 'single-process') ||
      (this.connectionStrategy === 'multi-process' && !fromPlugin);
    const shouldSetupSubscription =
      (isLeaderProcess && this.connectionStrategy === 'single-process') || !fromPlugin;
    const alreadySubscribed = this.subUnsubSubscription && !this.subUnsubSubscription.closed;
    if (shouldConnectToFactset && !this.factsetClient.connected) {
      //console.debug(`🚀 Connecting to factset on process ${this.processState.processId}`);
      this.factsetClient.connect();
    }

    if (!alreadySubscribed && shouldSetupSubscription) {
      /*console.debug(
        `🔌 Booting subscription for ${this.processState.processId} using ${this.connectionStrategy} strategy`
      );*/
      this.subUnsubSubscription = this.marketData$
        .pipe(
          filter(
            (e) =>
              e.type !== 'LEVEL_ONE_UPDATE' &&
              ((this.connectionStrategy === 'single-process' && isLeaderProcess) ||
                e.payload.processId === this.processState.processId)
          ),
          tap((e) => {
            switch (e.type) {
              case 'SUBSCRIBE':
                e.payload.tickers.forEach((t) => {
                  const processEntry = this.tickerSubscribers.get(t);
                  const processesSubscribed = [
                    ...new Set([...(processEntry?.processes || []), e.payload.processId])
                  ];
                  const newProcessSubscribed = processEntry?.processes?.length !== processesSubscribed.length;
                  const needsFactsetSubscription =
                    (!processEntry || !processEntry.subscription) &&
                    ((this.connectionStrategy === 'single-process' && isLeaderProcess) ||
                      this.connectionStrategy === 'multi-process');

                  /*needsFactsetSubscription &&
                    console.debug(
                      `🔌 Connecting to factset on process ${this.processState.processId} for symbol ${t}`
                    );*/

                  /*!needsFactsetSubscription &&
                    isLeaderProcess &&
                    this.connectionStrategy === 'single-process' &&
                    newProcessSubscribed &&
                    console.debug(
                      `♻️ Adding new process ${e.payload.processId} to factset subscription for symbol ${t}`
                    );*/

                  this.tickerSubscribers.set(t, {
                    processes: processesSubscribed,
                    subscription: needsFactsetSubscription
                      ? level1({
                          ticker: t,
                          client: this.factsetClient
                        })
                          .pipe(
                            map((e) => {
                              const currentData = this.tickerSubscribers.get(t) || {
                                latest: undefined,
                                subscription: undefined,
                                processes: this.processState.processId ? [this.processState.processId] : []
                              };

                              currentData.latest = e;
                              this.tickerSubscribers.set(t, currentData);
                              return {
                                ticker: t,
                                data: e
                              } as MarketDataEvent;
                            })
                          )
                          .subscribe((e) => {
                            e.data &&
                              this.marketData$.next({
                                type: 'LEVEL_ONE_UPDATE',
                                payload: { ticker: e.ticker, data: e.data }
                              });
                          })
                      : processEntry?.subscription,
                    latest: processEntry?.latest
                  });

                  newProcessSubscribed &&
                    this.connectionStrategy === 'single-process' &&
                    isLeaderProcess &&
                    processEntry?.latest &&
                    this.marketData$.next({
                      type: 'LEVEL_ONE_UPDATE',
                      payload: {
                        data: processEntry.latest,
                        ticker: t
                      }
                    });
                });
                break;
              case 'UNSUBSCRIBE':
                e.payload.tickers.forEach((t) => {
                  const subscriberEntry = this.tickerSubscribers.get(t);
                  if (subscriberEntry?.processes?.length && subscriberEntry.processes.length > 1) {
                    const processToRemove = subscriberEntry.processes.findIndex(
                      (p) => p === e.payload.processId
                    );
                    // console.debug(
                    //   `⚠️ Unsubscribing process ${subscriberEntry.processes[processToRemove]} for ticker ${t}`
                    // );
                    subscriberEntry.processes.splice(processToRemove, 1);
                    this.tickerSubscribers.set(t, {
                      ...subscriberEntry,
                      processes: subscriberEntry.processes
                    });
                  } else {
                    subscriberEntry?.subscription &&
                      this.processState.processId &&
                      console.warn(
                        `🔴 Removing factset subscription for ${t} on process ${this.processState.processId}`
                      );
                    subscriberEntry?.subscription?.unsubscribe();
                    this.tickerSubscribers.delete(t);
                  }
                });
                break;
            }
          }),
          retry({
            resetOnSuccess: true,
            delay: (err, count) => {
              this.processState.processId &&
                console.error(
                  `❌ Retrying market data subscription on process ${this.processState.processId}. It has failed ${count} time${count === 1 ? '' : 's'} in a row.`,
                  err
                );
              return timer(10);
            }
          }),
          finalize(() => {
            this.processState.processId &&
              console.warn(`🔴 Disconnecting subscriptions on process ${this.processState.processId}`);
            this.tickerSubscribers.values()?.forEach?.(({ subscription }) => {
              subscription?.unsubscribe();
            });

            this.connectionStrategy === 'single-process' &&
              !isLeaderProcess &&
              this.processState.processId &&
              this.marketData$.next({
                type: 'UNSUBSCRIBE',
                payload: {
                  tickers: Array.from(this.tickerSubscribers.keys()),
                  processId: this.processState.processId
                }
              });

            this.tickerSubscribers.clear();
            this.tickerObservers.clear();
          })
        )
        .subscribe();
    }
  }

  public observe(...tickers: string[]): Observable<MarketDataEvent & { data?: MarketData }> {
    this.start();
    const currentInProcessTickers = this.tickerSubscribers.keys();
    const distinctTickersToSub = Array.from(new Set([...tickers]));
    const newTickersToSubTo = [...distinctTickersToSub, ...currentInProcessTickers].filter(
      (ticker) => !this.tickerSubscribers.has(ticker)
    );

    if (this.processState.processId && newTickersToSubTo.length) {
      this.marketData$.next({
        type: 'SUBSCRIBE',
        payload: {
          processId: this.processState.processId,
          tickers: newTickersToSubTo
        }
      });
    }

    return merge(
      ...distinctTickersToSub.map((t) => {
        const tickerObserverCount = this.tickerObservers.get(t) || 0;
        this.tickerObservers.set(t, tickerObserverCount + 1);
        return this.marketData$.pipe(
          startWith({
            type: 'LEVEL_ONE_UPDATE',
            payload: { ticker: t, data: this.tickerSubscribers.get(t)?.latest || {} }
          }),
          filter((e) => e.type === 'LEVEL_ONE_UPDATE' && e.payload.ticker === t),
          map((e) => (e.payload as unknown as MarketDataUpdateEvent['payload']).data),
          filter((e) => !isEmpty(e)),
          map((e) => {
            const currentData = this.tickerSubscribers.get(t) || {
              latest: undefined,
              subscription: undefined,
              processes: this.processState.processId ? [this.processState.processId] : []
            };

            currentData.latest = e;
            this.tickerSubscribers.set(t, currentData);

            return {
              ticker: t,
              data: new MarketData({ level1: e })
            };
          }),
          retry({
            resetOnSuccess: true,
            delay: (err, count) => {
              this.processState.processId &&
                console.error(
                  `❌ Retrying market data subscription for ticker ${t} on process ${this.processState.processId}. It has failed ${count} time${count === 1 ? '' : 's'} in a row.`,
                  err
                );
              return timer(10);
            }
          }),
          finalize(() => {
            const observerCount = (this.tickerObservers.get(t) || 1) - 1;
            this.tickerObservers.has(t) && this.tickerObservers.set(t, observerCount);

            if (this.tickerObservers.has(t) && observerCount === 0) {
              this.processState.processId &&
                this.marketData$.next({
                  type: 'UNSUBSCRIBE',
                  payload: {
                    tickers: [t],
                    processId: this.processState.processId
                  }
                });
              this.tickerObservers.delete(t);

              if (
                this.tickerObservers.size === 0 &&
                (this.connectionStrategy === 'multi-process' ||
                  (this.processState.processId !== this.processState.LEADER_PROCESS_ID &&
                    this.connectionStrategy === 'single-process'))
              ) {
                this.processState.processId &&
                  console.warn(
                    `⚠️ There are no more observers on process ${this.processState.processId}, disconnecting...`
                  );
                void this.disconnect().then().catch();
              }
            }
          })
        );
      })
    );
  }

  public hasAllTickers(...tickers: string[]): boolean {
    const subscribedTickers = Array.from(this.tickerSubscribers.keys());
    return tickers.every((t) => subscribedTickers.includes(t));
  }

  public read(ticker: string): MarketData | undefined {
    const latest = this.tickerSubscribers.get(ticker)?.latest;
    return latest ? new MarketData({ level1: latest }) : undefined;
  }

  public readAll(...tickers: string[]): (MarketData | undefined)[] {
    return tickers.map((t) => this.read(t));
  }

  public async getInstrumentById(id: string) {
    const response = await this.apolloClient.query<GetInstrumentQuery, GetInstrumentQueryVariables>({
      query: GetInstrumentDocument,
      variables: { id }
    });

    if (response.data && response.data.instrument) {
      return response.data.instrument;
    }

    if (response.errors) {
      console.error(response.errors);
    }

    throw new Error('Failed to fetch instrument details');
  }

  public async disconnect() {
    const isLeaderProcess = this.processState.LEADER_PROCESS_ID === this.processState.processId;
    const shouldDisconnectFromFactset =
      (isLeaderProcess && this.connectionStrategy === 'single-process') ||
      this.connectionStrategy === 'multi-process';
    shouldDisconnectFromFactset && (await this.factsetClient.disconnect());

    this.subUnsubSubscription?.unsubscribe();
  }

  public async dispose(): Promise<void> {
    await this.disconnect();
  }
}
