import * as rx from '@proftit/rxjs';
import { shareReplayRefOne, useStreams } from '@proftit/rxjs.adjunct';
import * as _ from '@proftit/lodash';

import { CommunicationsSubjectsService } from '~/source/contact/common/services/communications-subjects.service';
import { observeChannel } from '~/source/common/utilities/observe-channel';
import { CommunicationSubjectsChangeSocketService } from '~/source/common/services/communication-subjects-change-socket-service';
import { CommunicationTypesService } from '~/source/common/services/communication-types.service';
import { CommunicationType } from '~/source/common/models/communication-type';
import { WarmRedisCacheService } from '~/source/common/services/warm-redis-cache-service';

const customCommunicationSubjectsCacheKey = 'prf.custom.communicationSubjects';

export class CommunicationSubjectsStoreService {
  onInitAction = new rx.Subject<void>();
  onCacheWriteAction = new rx.Subject<void>();

  cachedValue$ = this.streamCachedValue();
  lastChangeDate$ = this.streamLastChangeDate();
  communicationSubjects$ = this.streamCommunicationSubjects();

  constructor(
    readonly communicationsSubjectsService: CommunicationsSubjectsService,
    readonly communicationSubjectsChangeSocketService: CommunicationSubjectsChangeSocketService,
    readonly communicationTypesService: CommunicationTypesService,
    readonly warmRedisCacheService: WarmRedisCacheService,
  ) {
    useStreams(
      [this.lastChangeDate$, this.communicationSubjects$, this.cachedValue$],
      rx.obs.NEVER,
    );
    this.onInitAction.next();
  }

  streamLastChangeDate(): rx.Observable<number> {
    const previousLastChangeDate = new rx.BehaviorSubject<number>(0);
    return rx.pipe(
      () =>
        rx.obs.merge(
          this.streamLastChangeDateFromSuccessfulStreamerMessage(),
          this.streamLastChangeDateFromUnreceivedStreamerMessage(),
        ),
      rx.withLatestFrom(previousLastChangeDate),
      rx.map(
        ([value, savedLastChangeDate]: [
          { lastChangeDate: number },
          number,
        ]) => {
          const { lastChangeDate } = value;
          return {
            lastChangeDate,
            savedLastChangeDate,
          };
        },
      ),
      rx.filter(
        ({ lastChangeDate, savedLastChangeDate }) =>
          lastChangeDate > savedLastChangeDate,
      ),
      rx.map(({ lastChangeDate }) => lastChangeDate),
      rx.tap((lastChangeDate) => {
        previousLastChangeDate.next(lastChangeDate);
      }),
      shareReplayRefOne(),
    )(null);
  }

  streamLastChangeDateFromUnreceivedStreamerMessage() {
    return rx.pipe(
      () =>
        observeChannel(
          this.communicationSubjectsChangeSocketService,
          this.communicationSubjectsChangeSocketService.channelRoot,
        ).pipe(
          rx.take(1),
          rx.timeout(
            this.communicationSubjectsChangeSocketService
              .SOCKET_TIMEOUT_INTERVAL,
          ),
          rx.catchError(() => {
            return rx.obs.from(
              this.warmRedisCacheService
                .getResourceRoute(
                  this.communicationSubjectsChangeSocketService.entityName,
                )
                .customPutWithQuery({})
                .then((res) => res.plain()),
            );
          }),
        ),
      shareReplayRefOne(),
    )(null);
  }

  streamLastChangeDateFromSuccessfulStreamerMessage() {
    return rx.pipe(
      () =>
        observeChannel(
          this.communicationSubjectsChangeSocketService,
          this.communicationSubjectsChangeSocketService.channelRoot,
        ),
      shareReplayRefOne(),
    )(null);
  }

  streamCommunicationSubjects() {
    return rx.pipe(
      () =>
        rx.obs.merge(this.streamTypesFromCache(), this.streamTypesFromAPI()),
      shareReplayRefOne(),
    )(null);
  }

  streamTypesFromAPI() {
    return rx.pipe(
      () => this.lastChangeDate$,
      rx.withLatestFrom(this.cachedValue$),
      rx.filter(
        ([lastChangeDate, storedValue]) =>
          _.isNil(storedValue) || storedValue.createdAt < lastChangeDate,
      ),
      rx.switchMap((a) => {
        return rx.obs
          .from(this.communicationsSubjectsService.getListWithQuery())
          .pipe(rx.catchError(() => rx.obs.NEVER));
      }),
      rx.tap((value) => {
        this.setCachedValue(value);
      }),
      shareReplayRefOne(),
    )(null);
  }

  streamTypesFromCache() {
    return rx.pipe(
      () => this.lastChangeDate$,
      rx.withLatestFrom(this.cachedValue$),
      rx.filter(
        ([lastChangeDate, storedValue]) =>
          !_.isNil(storedValue) && storedValue.createdAt >= lastChangeDate,
      ),
      rx.map(([lastChangeDate, storedValue]) => storedValue.value),
      shareReplayRefOne(),
    )(null);
  }

  streamCachedValue() {
    return rx.pipe(
      () => rx.obs.merge(this.onInitAction, this.onCacheWriteAction),
      rx.map(() => this.getCachedValue()),
      shareReplayRefOne(),
    )(null);
  }

  getCachedValue() {
    let value = null;
    try {
      value = JSON.parse(
        localStorage.getItem(customCommunicationSubjectsCacheKey),
      );
    } catch (e) {}
    return value;
  }

  setCachedValue(value) {
    const itemToSave = {
      value,
      createdAt: Math.floor(Date.now() / 1000),
    };
    localStorage.setItem(
      customCommunicationSubjectsCacheKey,
      JSON.stringify(itemToSave),
    );
    this.onCacheWriteAction.next();
  }

  streamSubjectsForCommunication(commType: CommunicationType) {
    if (_.isNil(commType)) {
      return rx.obs.of([]);
    }

    return rx.pipe(
      () => this.communicationSubjects$,
      rx.map((types) => types.filter((type) => type.typeId === commType.id)),
      shareReplayRefOne(),
    )(null);
  }
}
