import {
  map,
  of,
  type Observable,
  combineLatest,
  startWith,
  filter,
  switchMap,
  distinct,
  mergeMap,
  distinctUntilChanged,
  merge,
  scan
} from 'rxjs';
import { inject, singleton } from 'tsyringe';
import type { FetchPolicy } from '@apollo/client';
import type { Optional } from '@oms/shared/util-types';
import { Logger, cleanMaybe, compactMap } from '@oms/shared/util';
import {
  type AwaitGQLResultType,
  type DataSourceCommon,
  FilterSubject,
  type GQLResultType
} from '@oms/frontend-foundation';
import { asObservableDataSource } from '@oms/frontend-foundation';
import {
  LookupBySymbolSimpleDocument,
  type LookupBySymbolSimpleQueryVariables,
  LookupBySymbolDocument,
  type LookupBySymbolQueryVariables,
  MontageFirmAccountsDocument,
  type MontageFirmAccountsQuery,
  type MontageFirmAccountsQueryVariables,
  type MontageUnboundTradingOrderFragment,
  MontageUnboundTradingOrdersDocument,
  type MontageUnboundTradingOrdersQuery,
  type MontageUnboundTradingOrdersQueryVariables,
  type MontageUnboundTradingOrderConnectionFragment,
  type GetMontageUserPreferencesQuery,
  type GetMontageUserPreferencesQueryVariables,
  GetMontageUserPreferencesDocument,
  GetMontageInstrumentDocument,
  type GetMontageInstrumentQuery,
  type GetMontageInstrumentQueryVariables,
  type MontageInstrumentFragment,
  type MontagePreferencesFlagsFragment,
  type MontageInvestorAccountFragment,
  type GetMontageInvestorAccountQuery,
  type GetMontageInvestorAccountQueryVariables,
  GetMontageInvestorAccountDocument,
  type GetMontagePositionQuantityQuery,
  type GetMontagePositionQuantityQueryVariables,
  GetMontagePositionQuantityDocument,
  type OnMontagePositionUpdatedSubscription,
  type OnMontagePositionUpdatedSubscriptionVariables,
  OnMontagePositionUpdatedDocument,
  type OnMontageTradingOrderAddedSubscription,
  type OnMontageTradingOrderAddedSubscriptionVariables,
  type OnMontageTradingOrderUpdatedSubscription,
  type OnMontageTradingOrderUpdatedSubscriptionVariables,
  type MontageFirmAccountFragment,
  type CancelMontageUnboundTradingOrdersMutation,
  type CancelMontageUnboundTradingOrdersMutationVariables,
  CancelMontageUnboundTradingOrdersDocument,
  OnMontageTradingOrderAddedDocument,
  OnMontageTradingOrderUpdatedDocument,
  type InstrumentDetails,
  type ToggleMarketMakerStatusMutation,
  type ToggleMarketMakerStatusMutationVariables,
  ToggleMarketMakerStatusDocument,
  TradingOrderCategory
} from '@oms/generated/frontend';
import { ApolloClientRPC } from '@app/data-access/api/apollo-client-rpc';
import { GQLResponse } from '@app/data-access/api/graphql/graphql-response';
import { OfflineDb } from '@app/data-access/offline/offline-database';
import type { MontageCollection } from '@app/data-access/offline/collections/montage/montage.collection.types';
import { AuthService } from '../../system/auth/auth.service';
import { Level2WebsocketService } from '../../marketdata/level2-websocket.service';
import convert from './utils/convert.utils';
import type { MontageFilter, MontageItem } from './montage.types';
import type {
  MontageItemServiceLevel2QuoteOptions,
  MontageItemServiceTradingOrderOptions,
  UnboundTradingOrderQueryOptions
} from './utils/utils.types.internal';
import { montagePriceColorComparator, montageSortOrderComparator } from './utils/montage.utils';
import {
  addTargetingIndices,
  applyMontageFilterToOrder,
  mergeMontageItemFeeds,
  sort,
  uniqueItemsBy,
  updatePricePartitionMap
} from './utils/montage.operators.util';
import PricePartitionMapSubject from './utils/price-partition-subject.class';
import MontageTargetState from './utils/montage-target-state.class';

@singleton()
export class MontageService {
  protected fetchPolicy: FetchPolicy = 'cache-first';
  protected logger: Logger;
  protected name: string = 'MontageService';

  // 🏗️ Constructor ----------------------------------------------------------------- /

  constructor(
    @inject(ApolloClientRPC) protected apolloClient: ApolloClientRPC,
    @inject(GQLResponse) protected gqlResponse: GQLResponse,
    @inject(OfflineDb) protected offlineDb: OfflineDb,
    @inject(AuthService) protected authService: AuthService,
    @inject(Level2WebsocketService) protected websocketService: Level2WebsocketService
  ) {
    this.logger = Logger.labeled(this.name);
  }

  // 📢 Public ----------------------------------------------------------------- /

  /**
   * Get a filter subject of merged Montage items including unbound trading orders and level 2 quotes
   *
   * @param scopedActorId - Current actor ID
   * @param options.orderType - Level 2 quotes only: Pass an order type. Will default to `OrderType.Limit` for L2 orders if omitted.
   * @param options.orderBy - Trading orders only: Optionally, pass an array of order props to order by, defaulting to updated timestamp
   * @param options.pollInterval - Trading orders only: Optionally, tell the service poll for trading orders every given ms
   * @param options.roundingType - If rounding is needed, round down, up or nearest
   * @param options.fallbackVolume - Trading orders only: Pass a fallback volume in case `quantity` is not specified in the trading order. Will default to zero if omitted.
   * @param options.fallbackLotSize - Trading orders only: Pass a fallback lot size. If omitted, the constant default (100) will be used.
   * @param options.fallbackLimitPrice - Pass a fallback price in case `limitPrice` is not specified in the trading order. Will default to zero if omitted. Does not apply to Market orders.
   * @param options.fallbackCounterPartyId - Trading orders only: Pass a fallback counter-party ID in case destination venue is not specified in the trading order. Will default to empty string if omitted.
   * @param options.fallbackType - Pass a fallback type in case not specified. Will default to `ask` if omitted.
   * @param options.targetingIndex - Trading orders only: Pass a manual targeting index. Will init as undefined if omitted.
   * @param options.isMarketMaker - Optionally, pass if order is market maker. Defaults to `undefined`
   * @param options.isMyTeam - Optionally, pass if order is team order. Defaults to `undefined`
   * @returns An filter subject with an object containing all unmerged montage streams (level 2 data and unbound trading orders)
   */
  public mergeMontageItems(
    scopedActorId: string,
    options: Omit<MontageItemServiceTradingOrderOptions & MontageItemServiceLevel2QuoteOptions, 'filter'> = {}
  ): {
    filter$: FilterSubject<MontageFilter, DataSourceCommon<MontageItem>>;
    targetState: MontageTargetState;
    pricePartitionMap$: PricePartitionMapSubject;
  } {
    const {
      orderBy,
      pollInterval,
      fallbackVolume,
      fallbackLotSize,
      fallbackCounterPartyId,
      targetingIndex,
      orderType,
      ...commonOptions
    } = options;
    const targetState = new MontageTargetState((e) => {
      this.logger.scope('mergedMontageItems').error(e);
    });
    const pricePartitionMap$ = new PricePartitionMapSubject();
    const filter$ = FilterSubject.from((filter?: MontageFilter) => {
      const unboundTradingOrders$ = this.montageUnboundTradingOrders$(scopedActorId, {
        filter,
        orderBy,
        pollInterval,
        fallbackVolume,
        fallbackLotSize,
        fallbackCounterPartyId,
        targetingIndex,
        ...commonOptions
      }).pipe(startWith([] as MontageItem[]));
      const combined = combineLatest([
        unboundTradingOrders$,
        this.montageLevel2Quotes$(scopedActorId, { filter, orderType, ...commonOptions }).pipe(
          startWith([] as MontageItem[])
        )
      ]).pipe(
        mergeMontageItemFeeds('id'),
        sort(montageSortOrderComparator),
        updatePricePartitionMap(pricePartitionMap$, {
          comparator: montagePriceColorComparator
        }),
        asObservableDataSource()
      );
      targetState.init(unboundTradingOrders$, { scopedActorId, filter, collection: this.montageCollection });
      return combined;
    });
    return { filter$, targetState, pricePartitionMap$ };
  }

  /**
   * Get all level 2 orders for the Montage widget depth grid
   *
   * @param scopedActorId - Current actor ID
   * @param options.filter - Apply a montage filter to filter by ask/buy
   * @param options.orderType - Pass an order type. Will default to `OrderType.Limit` for L2 orders if omitted.
   * @param options.roundingType - If rounding is needed, round down, up or nearest
   * @param options.fallbackLimitPrice - Pass a fallback price in case `limitPrice` is not specified in the trading order. Will default to zero if omitted. Does not apply to Market orders.
   * @param options.fallbackType - Pass a fallback type in case not specified. Will default to `ask` if omitted.
   * @param options.isMarketMaker - Optionally, pass if order is market maker. Defaults to `undefined`
   * @param options.isMyTeam - Optionally, pass if order is team order. Defaults to `undefined`
   * @returns An observable with an datasource object containing all level 2 quotes
   */
  public watchAllMontageLevel2Quotes$(
    scopedActorId: string,
    options?: MontageItemServiceLevel2QuoteOptions
  ): Observable<DataSourceCommon<MontageItem>> {
    return this.montageLevel2Quotes$(scopedActorId, options).pipe(asObservableDataSource());
  }

  /**
   * Get all unbound trading orders for the Montage widget depth grid
   *
   * @param scopedActorId - Current actor ID
   * @param options.filter - Apply a montage filter to filter by ask/buy
   * @param options.orderBy - Optionally, pass an array of order props to order by, defaulting to updated timestamp
   * @param options.pollInterval - Optionally, tell the service poll for trading orders every given ms
   * @param options.roundingType - If rounding is needed, round down, up or nearest
   * @param options.fallbackVolume - Pass a fallback volume in case `quantity` is not specified in the trading order. Will default to zero if omitted.
   * @param options.fallbackLotSize - Pass a fallback lot size. If omitted, the constant default (100) will be used.
   * @param options.fallbackLimitPrice - Pass a fallback price in case `limitPrice` is not specified in the trading order. Will default to zero if omitted. Does not apply to Market orders.
   * @param options.fallbackCounterPartyId - Pass a fallback counter-party ID in case destination venue is not specified in the trading order. Will default to empty string if omitted.
   * @param options.fallbackType - Pass a fallback type in case not specified. Will default to `ask` if omitted.
   * @param options.targetingIndex - Pass a manual targeting index. Will init as undefined if omitted.
   * @param options.isMarketMaker - Optionally, pass if order is market maker. Defaults to `undefined`
   * @param options.isMyTeam - Optionally, pass if order is team order. Defaults to `undefined`
   * @returns An observable with an datasource object containing all unbound trading orders
   */
  public watchAllMontageUnboundTradingOrders$(
    scopedActorId: string,
    options?: MontageItemServiceTradingOrderOptions
  ): Observable<DataSourceCommon<MontageItem>> {
    return this.montageUnboundTradingOrders$(scopedActorId, options).pipe(asObservableDataSource());
  }

  /**
   * Watch current instrument details.
   *
   * @param scopedActorId - Current actor ID
   * @returns An observable following the instrument details for the given window.
   */
  public watchInstrumentDetails$(scopedActorId: string): Observable<Optional<MontageInstrumentFragment>> {
    return this.getInstrumentIdFromOfflineDb(scopedActorId).pipe(
      mergeMap((instrumentId) => this.watchQuery_GetMontageInstrumentQuery$(instrumentId))
    );
  }

  /**
   * Watch current investor account details.
   *
   * @param scopedActorId - Current actor ID
   * @returns An observable following the investor account details for the given window.
   */
  public watchInvestorAccountDetails$(
    scopedActorId: string
  ): Observable<Optional<MontageInvestorAccountFragment>> {
    return this.getInvestorAccountId$(scopedActorId).pipe(
      mergeMap((investorAccountId) => this.watchQuery_GetMontageInvestorAccountQuery$(investorAccountId))
    );
  }

  /**
   * Use to pass the instrument type-ahead query to get a list of matching instruments.
   *
   * @param symbol - Pass the symbol query to look up, such as 'AAPL' or 'TSLA'
   * @returns A promise with an array of matching instruments
   */
  public async lookupInstrumentBySymbol(
    symbol: string,
    isSimple: boolean = true
  ): Promise<InstrumentDetails[]> {
    // Choose the appropriate query document, variables type based on isSimple flag
    const queryDocument = isSimple ? LookupBySymbolSimpleDocument : LookupBySymbolDocument;
    const variables: LookupBySymbolSimpleQueryVariables | LookupBySymbolQueryVariables = { symbol };

    const response = await this.gqlResponse
      .wrapQuery({
        query: queryDocument,
        variables,
        fetchPolicy: this.fetchPolicy
      })
      .exec();

    return response.mapTo(
      ({ data }) => compactMap(cleanMaybe(data.instrumentBySymbol, [])),
      (errors) => {
        errors.forEach((e) => this.logger.scope('lookupInstrumentBySymbol').error(e));
        return [];
      }
    );
  }

  public watchPositionsQuantityUpdated$(scopedActorId: string) {
    return combineLatest([
      this.getInstrumentIdFromOfflineDb(scopedActorId),
      this.getInvestorAccountId$(scopedActorId)
    ]).pipe(
      filter(
        ([accountId, instrumentId]) => typeof accountId === 'string' && typeof instrumentId === 'string'
      ),
      map(([accountId, instrumentId]) => ({
        accountId: accountId as string,
        instrumentId: instrumentId as string
      })),
      switchMap(({ accountId, instrumentId }) =>
        merge(
          this._watchQuery_getMontagePositionQuantity$(accountId, instrumentId),
          this._subscribe_OnMontagePositionUpdatedSubscription$(accountId, instrumentId)
        )
      )
    );
  }

  public watchMontageFirmAccount$(scopedActorId: string): Observable<Optional<MontageFirmAccountFragment>> {
    return this.getInstrumentIdFromOfflineDb(scopedActorId).pipe(
      mergeMap((instrumentId) =>
        this._watchQuery_getMontageFirmAccounts$(!!instrumentId, instrumentId).pipe(
          map((data) => {
            const userDefaultsFirmAccount = data.getUserDefaults?.firmAccount;
            const instrumentCoverageFirmAccount = data.getInstrumentCoverage?.defaultFirmAccount;

            return cleanMaybe(userDefaultsFirmAccount || instrumentCoverageFirmAccount);
          })
        )
      )
    );
  }

  public cancelMontageUnboundTradingOrders$(
    scopedActorId: string
  ): Observable<GQLResultType<CancelMontageUnboundTradingOrdersMutation>> {
    return this.getInstrumentIdFromOfflineDb(scopedActorId).pipe(
      filter((instrumentId) => typeof instrumentId === 'string'),
      mergeMap((instrumentId) => this._mutation_CancelMontageUnboundTradingOrders$(instrumentId as string))
    );
  }

  public async toggleMarketMakerStatus({
    instrumentId,
    mmStatus,
    dryRun
  }: {
    instrumentId: string;
    mmStatus: boolean;
    dryRun: boolean;
  }): AwaitGQLResultType<ToggleMarketMakerStatusMutation> {
    const mutation = this.gqlResponse.wrapMutate<
      ToggleMarketMakerStatusMutation,
      ToggleMarketMakerStatusMutationVariables
    >({
      mutation: ToggleMarketMakerStatusDocument,
      variables: { instrumentId, mmStatus, dryRun }
    });

    return await mutation.exec();
  }

  // 🔒 Protected / private --------------------------------------------------------------- /

  // 🗃️ RxDB state -------------------- /

  public get montageCollection(): MontageCollection {
    return this.offlineDb.collections.montage;
  }

  protected getCurrentUserId$(): Observable<Optional<string>> {
    return this.authService.currentUser$.pipe(
      distinct((user) => user?.id),
      map((user) => user?.id)
    );
  }

  protected getInstrumentIdFromOfflineDb(scopedActorId: string): Observable<Optional<string>> {
    return this.offlineDb.collections.montage.findOne(scopedActorId).$.pipe(
      map((document) => document?.instrumentId),
      distinctUntilChanged()
    );
  }

  protected getInvestorAccountId$(scopedActorId: string): Observable<Optional<string>> {
    return this.offlineDb.collections.montage.findOne(scopedActorId).$.pipe(
      map((document) => document?.investorAccountId),
      distinctUntilChanged()
    );
  }

  protected getLatestUserIdAndInstrumentId$(
    scopedActorId: string
  ): Observable<{ userId: string; instrumentId: string }> {
    return combineLatest([this.getCurrentUserId$(), this.getInstrumentIdFromOfflineDb(scopedActorId)]).pipe(
      map(([userId, instrumentId]) => {
        if (typeof userId === 'string' && typeof instrumentId === 'string') {
          return {
            userId,
            instrumentId
          };
        } else {
          return null;
        }
      }),
      filter(Boolean)
    );
  }

  // Processed observables -------------------- /

  protected montageUnboundTradingOrders$(
    scopedActorId: string,
    options?: MontageItemServiceTradingOrderOptions
  ): Observable<MontageItem[]> {
    return this.convertedUnboundTradingOrders$(scopedActorId, options).pipe(
      uniqueItemsBy('id'),
      sort(montageSortOrderComparator),
      addTargetingIndices()
    );
  }

  protected montageLevel2Quotes$(
    scopedActorId: string,
    options?: MontageItemServiceLevel2QuoteOptions
  ): Observable<MontageItem[]> {
    return this.convertedLevel2Quotes$(scopedActorId, options).pipe(uniqueItemsBy('id'));
  }

  // Conversions to `MontageItem` -------------------- /

  protected convertedUnboundTradingOrders$(
    scopedActorId: string,
    options?: MontageItemServiceTradingOrderOptions
  ): Observable<MontageItem[]> {
    const {
      orderBy,
      pollInterval,
      filter,
      targetingIndex: tradingOrderIndex,
      fallbackLimitPrice,
      fallbackVolume,
      fallbackCounterPartyId,
      ...commonOptions
    } = options || {};
    return combineLatest([
      merge(
        this.watchQuery_UnboundTradingOrders$(scopedActorId, {
          orderBy,
          pollInterval
        }),
        this.watchMontageTradingOrders$(scopedActorId, TradingOrderCategory.Montage)
      ).pipe(applyMontageFilterToOrder(filter)),
      this.watchQuery_GetUserMontagePreferencesQuery$(scopedActorId)
    ]).pipe(
      map(([unboundTradingOrders, preferences]) =>
        unboundTradingOrders.map((unboundTradingOrder) =>
          convert.fromUnboundTradingOrder(unboundTradingOrder).toMontageItem({
            preferences,
            targetingIndex: tradingOrderIndex,
            fallbackLimitPrice,
            fallbackVolume,
            fallbackCounterPartyId,
            ...commonOptions
          })
        )
      ),
      startWith([])
    );
  }

  protected convertedLevel2Quotes$(
    scopedActorId: string,
    options?: MontageItemServiceLevel2QuoteOptions
  ): Observable<MontageItem[]> {
    const { filter, orderType, ...commonOptions } = options || {};
    const { type } = filter ?? {};
    return this.getLatestUserIdAndInstrumentId$(scopedActorId).pipe(
      switchMap(({ userId: ownerId, instrumentId }) =>
        combineLatest([
          this.websocketService.level2Quotes$(filter),
          this.watchQuery_GetUserMontagePreferencesQuery$(ownerId),
          this.watchQuery_GetMontageInstrumentQuery$(instrumentId)
        ]).pipe(
          map(([level2Quotes, preferences, instrument]) =>
            level2Quotes.map((quotePage) =>
              convert.fromLevel2QuotePage(quotePage).toMontageItem(type, {
                preferences,
                instrument,
                orderType,
                ...commonOptions
              })
            )
          ),
          startWith([])
        )
      )
    );
  }

  // Base GQL queries -------------------- /

  protected watchQuery_UnboundTradingOrders$(
    scopedActorId: string,
    options?: UnboundTradingOrderQueryOptions
  ): Observable<MontageUnboundTradingOrderFragment[]> {
    return this.getLatestUserIdAndInstrumentId$(scopedActorId).pipe(
      switchMap(({ userId: ownerId, instrumentId }) =>
        this.watchQuery_MontageUnboundTradingOrdersQuery$(ownerId, instrumentId, options).pipe(
          map(({ nodes }) => cleanMaybe(nodes as MontageUnboundTradingOrderFragment[], [])),
          map((unboundTradingOrders) => compactMap(unboundTradingOrders)),
          startWith([])
        )
      )
    );
  }

  protected watchQuery_MontageUnboundTradingOrdersQuery$(
    ownerId: string,
    instrumentId: string,
    options?: UnboundTradingOrderQueryOptions
  ): Observable<MontageUnboundTradingOrderConnectionFragment> {
    const { orderBy, pollInterval } = options ?? {};
    return this.apolloClient
      .watchQuery<MontageUnboundTradingOrdersQuery, MontageUnboundTradingOrdersQueryVariables>({
        query: MontageUnboundTradingOrdersDocument,
        variables: {
          ownerId,
          instrumentId,
          orderBy
        },
        fetchPolicy: 'no-cache', // TODO: Temporary. Change this back to 'cache-first' when we have the subscription working.
        pollInterval
      })
      .pipe(
        map(({ data }) =>
          cleanMaybe(data.visibleTradingOrders, {
            __typename: 'VisibleTradingOrdersConnection',
            nodes: [],
            totalCount: 0
          })
        )
      );
  }

  protected watchQuery_GetMontageInstrumentQuery$(
    instrumentId?: string
  ): Observable<Optional<MontageInstrumentFragment>> {
    if (!instrumentId) return of(undefined);
    return this.apolloClient
      .watchQuery<GetMontageInstrumentQuery, GetMontageInstrumentQueryVariables>({
        query: GetMontageInstrumentDocument,
        variables: {
          instrumentId
        },
        fetchPolicy: this.fetchPolicy
      })
      .pipe(map(({ data }) => cleanMaybe(data.instrument)));
  }

  protected watchQuery_GetMontageInvestorAccountQuery$(
    investorAccountId?: string
  ): Observable<Optional<MontageInvestorAccountFragment>> {
    if (!investorAccountId) return of(undefined);
    return this.apolloClient
      .watchQuery<GetMontageInvestorAccountQuery, GetMontageInvestorAccountQueryVariables>({
        query: GetMontageInvestorAccountDocument,
        variables: {
          investorAccountId
        },
        fetchPolicy: this.fetchPolicy
      })
      .pipe(map(({ data }) => cleanMaybe(data.getInvestorAccount)));
  }

  protected watchQuery_GetUserMontagePreferencesQuery$(
    ownerId?: string
  ): Observable<MontagePreferencesFlagsFragment> {
    return this.apolloClient
      .watchQuery<GetMontageUserPreferencesQuery, GetMontageUserPreferencesQueryVariables>({
        query: GetMontageUserPreferencesDocument,
        variables: {
          userId: ownerId
        },
        fetchPolicy: this.fetchPolicy
      })
      .pipe(
        map(({ data }) =>
          cleanMaybe(data.getUserPreferences?.montagePreferences, { __typename: 'MontagePreferences' })
        )
      );
  }

  protected _watchQuery_getMontagePositionQuantity$(
    instrumentId: string,
    accountId: string
  ): Observable<number> {
    return this.apolloClient
      .watchQuery<GetMontagePositionQuantityQuery, GetMontagePositionQuantityQueryVariables>({
        query: GetMontagePositionQuantityDocument,
        variables: { accountId, instrumentId },
        fetchPolicy: this.fetchPolicy
      })
      .pipe(map(({ data }) => cleanMaybe(data?.getPositionByAccountIdInstrumentId?.quantity, 0)));
  }

  protected _watchQuery_getMontageFirmAccounts$(
    hasInstrumentId: boolean,
    instrumentId?: string
  ): Observable<MontageFirmAccountsQuery> {
    return this.apolloClient
      .watchQuery<MontageFirmAccountsQuery, MontageFirmAccountsQueryVariables>({
        query: MontageFirmAccountsDocument,
        variables: { hasInstrumentId, instrumentId },
        fetchPolicy: this.fetchPolicy
      })
      .pipe(map(({ data }) => cleanMaybe(data, {})));
  }

  // Base GQL subscriptions -------------------- /

  protected _subscribe_OnMontagePositionUpdatedSubscription$(
    instrumentId: string,
    accountId: string
  ): Observable<number> {
    return this.apolloClient
      .subscribe<OnMontagePositionUpdatedSubscription, OnMontagePositionUpdatedSubscriptionVariables>({
        query: OnMontagePositionUpdatedDocument,
        variables: { accountId, instrumentId },
        fetchPolicy: this.fetchPolicy
      })
      .pipe(map(({ data }) => cleanMaybe(data?.positionDataUpdated?.position?.quantity, 0)));
  }

  protected watchMontageTradingOrders$(
    scopedActorId: string,
    category: string
  ): Observable<MontageUnboundTradingOrderFragment[]> {
    return combineLatest([this.getInstrumentIdFromOfflineDb(scopedActorId)]).pipe(
      map(([instrumentId]) => {
        if (typeof instrumentId !== 'string') {
          return null;
        }

        return {
          instrumentId,
          category
        };
      }),
      filter(Boolean),
      switchMap(({ instrumentId, category }) =>
        merge(
          this._subscribe_OnMontageTradingOrderAddedSubscription$({ instrumentId, category }).pipe(
            map((montageTradingOrder) => montageTradingOrder),
            filter(Boolean)
          ),
          this._subscribe_OnMontageTradingOrderUpdatedSubscription$({
            instrumentId,
            category
          }).pipe(
            map((montageTradingOrder) => montageTradingOrder),
            filter(Boolean)
          )
        ).pipe(
          scan(
            (
              accumulatedOrders: MontageUnboundTradingOrderFragment[],
              order: MontageUnboundTradingOrderFragment
            ) => {
              return accumulatedOrders.concat(order);
            },
            []
          )
        )
      )
    );
  }

  protected _subscribe_OnMontageTradingOrderAddedSubscription$({
    instrumentId,
    category
  }: {
    instrumentId: string;
    category: string;
  }): Observable<Optional<MontageUnboundTradingOrderFragment>> {
    return this.apolloClient
      .subscribe<OnMontageTradingOrderAddedSubscription, OnMontageTradingOrderAddedSubscriptionVariables>({
        query: OnMontageTradingOrderAddedDocument,
        variables: { instrumentId, category },
        fetchPolicy: this.fetchPolicy
      })
      .pipe(
        filter((e) => !!e.data?.tradingOrderAdded),
        map((e) => cleanMaybe(e.data?.tradingOrderAdded?.tradingOrder))
      );
  }

  protected _subscribe_OnMontageTradingOrderUpdatedSubscription$({
    instrumentId,
    category
  }: {
    instrumentId: string;
    category: string;
  }): Observable<Optional<MontageUnboundTradingOrderFragment>> {
    return this.apolloClient
      .subscribe<OnMontageTradingOrderUpdatedSubscription, OnMontageTradingOrderUpdatedSubscriptionVariables>(
        {
          query: OnMontageTradingOrderUpdatedDocument,
          variables: { instrumentId, category },
          fetchPolicy: this.fetchPolicy
        }
      )
      .pipe(
        filter((e) => !!e.data?.tradingOrderUpdated),
        map((e) => cleanMaybe(e.data?.tradingOrderUpdated?.tradingOrder))
      );
  }

  // Base GQL mutations -------------------- /

  protected _mutation_CancelMontageUnboundTradingOrders$(
    instrumentId: string
  ): AwaitGQLResultType<CancelMontageUnboundTradingOrdersMutation> {
    const mutation = this.gqlResponse.wrapMutate<
      CancelMontageUnboundTradingOrdersMutation,
      CancelMontageUnboundTradingOrdersMutationVariables
    >({
      mutation: CancelMontageUnboundTradingOrdersDocument,
      variables: { instrumentId }
    });

    return mutation.awaitAsyncResponse().exec();
  }
}
