import type { MemoryDatabaseCollections } from './collections';
import { AppDatabase } from './app-database';
import { Subject, Subscription, merge } from 'rxjs';
import { replicateRxCollection } from 'rxdb-v15/plugins/replication';
import type { RxReplicationState } from 'rxdb-v15/plugins/replication';
import { UiStateService } from '@app/data-access/services/ui-state/ui-state.service';
import type { ReplicationPushChangeRow } from '@app/data-access/services/ui-state/ui-state.service';
import { singleton, inject, delay } from 'tsyringe';
import { SyncronisationSignal } from '../memory/syncronisation.signal';
import type { RxCollection, RxError, RxReplicationPullStreamItem, RxTypeError } from 'rxdb-v15';
import type { UiStateDocumentType } from './types';
import { parseJSON } from '@oms/shared/util';

const REPLICATION_COLLECTIONS: (keyof MemoryDatabaseCollections)[] = ['snapshots'];
const REPLICATION_POLL_INTERVAL = 1000 * 30;

type CheckpointType = {
  id: string;
  updatedAt: string;
};

@singleton()
export class RxdbReplicationService {
  public pullStream$: Subject<RxReplicationPullStreamItem<UiStateDocumentType, CheckpointType>> | undefined;
  public replicationStates: RxReplicationState<UiStateDocumentType, CheckpointType>[];
  private poll: ReturnType<typeof setInterval> | undefined;
  private collections: Array<MemoryDatabaseCollections[keyof MemoryDatabaseCollections]>;
  private errorSubscription?: Subscription;

  constructor(
    @inject(UiStateService) private uiStateService: UiStateService,
    @inject(delay(() => AppDatabase)) private db: AppDatabase,
    @inject(SyncronisationSignal) private syncronisationSignal: SyncronisationSignal,
    replicationCollections?: (keyof MemoryDatabaseCollections)[]
  ) {
    this.collections = (replicationCollections || REPLICATION_COLLECTIONS).map(
      (collectionName) => this.db.memory[collectionName]
    );
    this.replicationStates = [];
  }

  async init(shouldPoll: boolean = true) {
    if (!this.collections) {
      throw new Error('Collections not available');
    }

    if (this.pullStream$) {
      console.warn('Replication cannot initialized again');
      return;
    }

    this.pullStream$ = new Subject();

    if (shouldPoll) {
      if (this.poll) {
        clearInterval(this.poll);
      }
      this.poll = setInterval(() => {
        this.pullStream$?.next('RESYNC');
      }, REPLICATION_POLL_INTERVAL);
    }

    this.syncronisationSignal.signal.set({
      isSyncronising: true,
      isSyncronised: false,
      isSyncronisationError: false,
      syncronisationError: null
    });

    this.replicationStates = this.collections.map(this.replicate.bind(this));

    await this.awaitAllCollectionsInSync();

    this.syncronisationSignal.signal.set({
      isSyncronising: false,
      isSyncronised: true,
      isSyncronisationError: false,
      syncronisationError: null
    });

    this.errorSubscription = merge(
      ...this.replicationStates.map((replicationState) => replicationState.error$)
    ).subscribe((error: RxError | RxTypeError) => {
      this.syncronisationSignal.signal.set({
        isSyncronising: false,
        isSyncronised: false,
        isSyncronisationError: true,
        syncronisationError: error.message
      });
    });
  }

  async awaitAllCollectionsInSync() {
    await Promise.all(
      this.replicationStates.map((replicationState) => replicationState.awaitInitialReplication())
    );
  }

  stopReplication() {
    if (this.pullStream$) {
      this.pullStream$.complete();
      this.pullStream$ = undefined;
    }
    if (this.poll) {
      clearInterval(this.poll);
      this.poll = undefined;
    }
    if (this.errorSubscription) {
      this.errorSubscription.unsubscribe();
    }
  }

  private getDocumentId(document: UiStateDocumentType, collection: RxCollection) {
    return String(document[String(collection.schema.jsonSchema.primaryKey)]);
  }

  replicate(collection: RxCollection) {
    const _this = this;
    return replicateRxCollection({
      collection,
      replicationIdentifier: `${collection.name}-replication`,
      push: {
        handler: async function (changeRows: ReplicationPushChangeRow[]) {
          const indexableFields = changeRows.map(({ newDocumentState }) => ({
            documentId: _this.getDocumentId(newDocumentState, collection),
            documentLastUpdatedAt: newDocumentState.lastUpdatedAt,
            documentDeleted: !!newDocumentState._deleted
          }));
          const result = await _this.uiStateService.pushUiState(changeRows, indexableFields, collection.name);
          const conflicts = result.data?.pushUiStates?.documents || [];
          if (conflicts.length > 0) {
            _this.pullStream$?.next('RESYNC');
          }
          return conflicts.map((document: string) => JSON.parse(document));
        }
      },
      pull: {
        handler: async function (
          checkpointOrNull: CheckpointType = { id: '', updatedAt: new Date(0).toISOString() },
          limit: number
        ) {
          const lastUpdatedAt = checkpointOrNull.updatedAt;
          const id = checkpointOrNull.id;
          if (!_this.uiStateService) {
            throw new Error('uiStateService not available');
          }
          const result = await _this.uiStateService.pullUiState(
            {
              id,
              collection: collection.name,
              lastUpdatedAt,
              limit
            },
            collection.name
          );

          const documents =
            result.data.pullUiStates?.documents
              .map((document: string) => {
                const parsedDocument = parseJSON<UiStateDocumentType>(document);

                if (!parsedDocument) {
                  throw new Error('Invalid document');
                }

                return parsedDocument;
              })
              .filter(Boolean)
              .sort(
                (a: UiStateDocumentType, b: UiStateDocumentType) =>
                  new Date(a.lastUpdatedAt).getTime() - new Date(b.lastUpdatedAt).getTime()
              ) || [];

          const newCheckpoint = documents.at(-1);
          return {
            documents,
            checkpoint: newCheckpoint
              ? {
                  id: _this.getDocumentId(newCheckpoint, collection),
                  updatedAt: newCheckpoint.lastUpdatedAt
                }
              : checkpointOrNull
          };
        },
        stream$: _this.pullStream$?.asObservable()
      }
    });
  }
}
