import log from 'loglevel';
import io from 'socket.io-client/socket.io';
import * as rx from '@proftit/rxjs';
import * as _ from '@proftit/lodash';
import { directivesPriorities } from '../constants/directives-priorities';
import { shareReplayRefOne, useStreams, tapLog } from '@proftit/rxjs.adjunct';
import { CurrentPlatformSessionStoreServiceDirectiveController } from './current-platform-session-store-service.directive';
import { getMtFeedPerAssetChannelName } from '../api-tradingcore-steamer/get-mt-feed-per-asset-channel-name';
import {
  Mt4Asset,
  MtAssetFeed,
  MtSecurity,
  Mt4Group,
} from '@proftit/tradingcore.api.models.entities';
import { observeComponentLifecycles } from '@proftit/rxjs.adjunct.ng1';
import { PlatformCode, MT4_PLATFORMS } from '@proftit/crm.api.models.enums';

type SubsHolder = Record<string, any>;

function getMtGroupMtAssetChannelName(
  mtServerId: number,
  groupId: number,
  assetId: number,
) {
  return `{mt_group_update}.{mtserver.${mtServerId}}.{mtgroup.${groupId}}.{mtasset.${assetId}}`;
}

function getMtGroupMtSecurityChannelName(
  mtServerId: number,
  groupId: number,
  securityId: number,
) {
  return `{mt_group_update}.{mtserver.${mtServerId}}.{mtgroup.${groupId}}.{mtsecurities.${securityId}}`;
}

function getMtGroupMtChannelName(mtServerId: number, groupId: number) {
  return `{mt_group_update}.{mtserver.${mtServerId}}.{mtgroup.${groupId}}`;
}

export class CurrentTrcStreamerOfBrandPlatformServiceDirectiveController {
  /* requires */

  prfCurrentPlatformSession: CurrentPlatformSessionStoreServiceDirectiveController;

  /* state */

  lifecycles = observeComponentLifecycles(this);

  addSubsOp$ = new rx.Subject<{ key: string; channel$: rx.Observable<any> }>();

  socket$: rx.Observable<io.Socket>;

  subscriptions$: rx.Observable<SubsHolder>;

  /* @ngInject */
  constructor() {}

  $onInit() {
    this.socket$ = this.streamSocket();
    this.subscriptions$ = this.streamSubscriptions();

    useStreams([this.socket$, this.subscriptions$], this.lifecycles.onDestroy$);
  }

  $onChanges() {}

  $onDestroy() {}

  streamSocket() {
    return rx.pipe(
      () => this.prfCurrentPlatformSession.sessionS.stream$,
      rx.switchMap((sessionInfo) => {
        return new rx.Observable<io.Socket>((sub) => {
          let socket: io.Socket;

          function unsubscribe() {
            if (_.isNil(socket)) {
              return;
            }
            socket.disconnect();
            socket.close();
          }

          if (!sessionInfo.isLoggedIn) {
            sub.complete();
            return unsubscribe;
          }

          /* We currently handel only risk manager steamer for mt4 platform in this service */
          if (!MT4_PLATFORMS.includes(sessionInfo.platform.code)) {
            sub.complete();
            return unsubscribe;
          }

          const url = sessionInfo.session.riskManagerStreamerUrl;
          const query = `token=${sessionInfo.session.token}`;
          const options = {
            transports: ['websocket'],
          };

          socket = io(url, {
            query,
            ...options,
          });

          sub.next(socket);

          return unsubscribe;
        });
      }),
      rx.switchMap((socket) => {
        return rx.pipe(
          () =>
            rx.obs.merge(
              rx.obs
                .fromEventPattern(
                  (handler) => socket.on('connect', handler),
                  (handler) => socket.off('connect', handler),
                )
                .pipe(
                  rx.tap((evt) => log.info('Streamer connected', evt)),
                  rx.map(() => socket),
                ),
              rx.obs
                .fromEventPattern(
                  (handler) => socket.on('connect_error', handler),
                  (handler) => socket.off('connect_error', handler),
                )
                .pipe(
                  rx.tap((evt) => log.info('Streamer connection error', evt)),
                  rx.map(() => null),
                ),
              rx.obs
                .fromEventPattern(
                  (handler) => socket.on('disconnect', handler),
                  (handler) => socket.off('disconnect', handler),
                )
                .pipe(
                  rx.tap((evt) => log.info('Streamer disconnect', evt)),
                  rx.map(() => null),
                ),
              rx.obs
                .fromEventPattern(
                  (handler) => socket.on('error', handler),
                  (handler) => socket.off('error', handler),
                )
                .pipe(
                  rx.tap((evt) => log.info('Streamer error', evt)),
                  rx.map(() => null),
                ),
            ),
          rx.delay(10),
          rx.distinctUntilChanged(),
        )(null);
      }),
      shareReplayRefOne(),
    )(null);
  }

  streamSubscriptionsFromAdd(subsHolder$: rx.Observable<SubsHolder>) {
    return rx.pipe(
      () => this.addSubsOp$,
      rx.withLatestFrom(subsHolder$),
      rx.map(([{ key, channel$ }, subsHolder]) => {
        return {
          ...subsHolder,
          [key]: channel$,
        };
      }),
      shareReplayRefOne(),
    )(null);
  }

  streamSubscriptions() {
    const subs$ = new rx.BehaviorSubject<SubsHolder>({});

    return rx.pipe(
      () => rx.obs.merge(this.streamSubscriptionsFromAdd(subs$)),
      rx.startWith({}),
      rx.tap((subs) => subs$.next(subs)),
      shareReplayRefOne(),
    )(null);
  }

  mapParseJsonSocketUpdate<R>(): rx.OperatorFunction<string, R> {
    return rx.pipe(
      rx.switchMap((updates: string) => {
        return rx.pipe(
          () => rx.obs.of(updates),
          rx.map((updates) => JSON.parse(updates)),
          rx.catchError((e) => {
            log.error('error parsing channel message', e);
            return rx.obs.NEVER;
          }),
        )(null);
      }),
    );
  }

  observeMtRelatedEntity<T>(subName: string, channelName: string) {
    return rx.pipe(
      () => rx.obs.combineLatest(this.socket$),
      rx.withLatestFrom(this.subscriptions$),
      rx.switchMap(([[socket], channels]) => {
        if (_.isNil(socket)) {
          return rx.obs.NEVER;
        }

        const existingChannel$ = channels[subName];
        if (existingChannel$) {
          return existingChannel$;
        }

        const channel$ = observeChannel<string>(socket, channelName);

        this.addSubsOp$.next({
          channel$,
          key: subName,
        });

        return channel$;
      }),
      this.mapParseJsonSocketUpdate<T>(),
      shareReplayRefOne(),
    )(null);
  }

  observeMtFeedPerAsset(assetSymbol: string) {
    const SUBS_ID = `mt_feed_per_asset_${assetSymbol}`;

    return rx.pipe(
      () => this.prfCurrentPlatformSession.mtServerId$,
      rx.switchMap((mtServerId) => {
        if (_.isNil(mtServerId)) {
          return rx.obs.NEVER;
        }

        const channelName = getMtFeedPerAssetChannelName(
          mtServerId,
          assetSymbol,
        );

        return this.observeMtRelatedEntity<MtAssetFeed>(SUBS_ID, channelName);
      }),
    )(null);
  }

  observeMtGroupMtAsset(groupId: number, assetId: number) {
    const SUBS_ID = `mt_group_mt_asset_${assetId}`;

    return rx.pipe(
      () => this.prfCurrentPlatformSession.mtServerId$,
      rx.switchMap((mtServerId) => {
        if (_.isNil(mtServerId)) {
          return rx.obs.NEVER;
        }

        const channelName = getMtGroupMtAssetChannelName(
          mtServerId,
          groupId,
          assetId,
        );

        return this.observeMtRelatedEntity<Mt4Asset>(SUBS_ID, channelName);
      }),
    )(null);
  }

  observeMtGroup(groupId: number) {
    const SUBS_ID = `mt_group_${groupId}`;

    return rx.pipe(
      () => this.prfCurrentPlatformSession.mtServerId$,
      rx.switchMap((mtServerId) => {
        if (_.isNil(mtServerId)) {
          return rx.obs.NEVER;
        }

        const channelName = getMtGroupMtChannelName(mtServerId, groupId);

        return this.observeMtRelatedEntity<Mt4Group>(SUBS_ID, channelName);
      }),
    )(null);
  }

  observeMtGroupMtSecurity(groupId: number, securityId: number) {
    const SUBS_ID = `mt_group_mt_security_{securityId}`;

    return rx.pipe(
      () => this.prfCurrentPlatformSession.mtServerId$,
      rx.switchMap((mtServerId) => {
        if (_.isNil(mtServerId)) {
          return rx.obs.NEVER;
        }

        const channelName = getMtGroupMtSecurityChannelName(
          mtServerId,
          groupId,
          securityId,
        );

        return this.observeMtRelatedEntity<MtSecurity>(SUBS_ID, channelName);
      }),
    )(null);
  }
}

function observeChannel<T>(socket: io.Socket, channelName: string) {
  return new rx.Observable((s) => {
    socket.emit('subscribe', channelName, (resp) => {
      if (resp) {
        log.info(`socket - subscribed to channel: ${channelName}`, resp);
        s.next(true);
        return;
      }
      log.info(`socket - error subscribe to channel: ${channelName}`, resp);
    });

    return () => {
      socket.emit('unsubscribe', channelName, (resp) => {
        if (resp) {
          log.info(`socket - unsubscribed to channel: ${channelName}`, resp);
          return;
        }

        log.info(
          `socket - error unsubscribed to channel: ${channelName}`,
          resp,
        );
      });
    };
  }).pipe(
    rx.switchMap(() =>
      rx.obs.fromEventPattern(
        (handler) => socket.addEventListener<T>(channelName, handler),
        (handler) => socket.removeListener(channelName, handler),
      ),
    ),
    shareReplayRefOne(),
  );
}

export const currentTrcStreamerOfBrandPlatformServiceDirective = () => {
  return {
    priority: directivesPriorities.serviceDirective,
    restrict: 'A',
    require: {
      prfCurrentPlatformSession: '^',
    },
    bindToController: true,
    controller: CurrentTrcStreamerOfBrandPlatformServiceDirectiveController,
  };
};

currentTrcStreamerOfBrandPlatformServiceDirective.$inject = [] as string[];
