import io from 'socket.io-client/socket.io';
import _ from 'underscore';
import s from 'underscore.string';
import log from 'loglevel';
import Promise from 'bluebird';
import * as _l from '@proftit/lodash';

import BaseService from '~/source/common/services/baseService';
import TokensService from '~/source/auth/services/tokens';
import { JSONObject } from '~/source/types/JsonObject';
import { SocketRegistryService } from './socket-registry.service';
import { ClientGeneralPubsub } from '~/source/common/services/client-general-pubsub';
import { shareReplayRefOne, useStreams } from '@proftit/rxjs.adjunct';
import * as rx from '@proftit/rxjs';
import {
  USER_LOGOUT,
  USER_UPDATE,
} from '~/source/common/constants/general-pubsub-keys';

export type SocketListener = (msg: string) => void;

export abstract class SocketService extends BaseService {
  static $inject = [
    'appConfig',
    'tokensService',
    'prfSocketRegistryService',
    'prfClientGeneralPubsub',
  ];

  appConfig: any;
  tokensService: TokensService;
  prfSocketRegistryService: SocketRegistryService;
  prfClientGeneralPubsub: ClientGeneralPubsub;

  onDestroy$ = new rx.Subject();
  isSocketReady$ = new rx.BehaviorSubject<boolean>(false);

  constructor(...args) {
    super(...args);
    useStreams(
      [
        this.streamOnUserUpdate(),
        this.streamOnUserLogout(),
        this.isSocketReady$,
      ],
      this.onDestroy$,
    );
  }

  abstract get channelRoot(): string; // defined by subclasses

  get socketRegistryKey() {
    return 'crmSocket';
  }

  /**
   *  Subscribe for update on channel
   *
   *  @param {string} channel
   *  @param {function} callback
   *  @private
   */
  subscribe(channel: string, callback: SocketListener) {
    // if there is no listeners emit subscribe to server
    if (this.socket.hasListeners(channel) === false) {
      // subscribe
      this.socket.emit('subscribe', channel, (res) => {
        if (!res) {
          log.warn('Unable to subscribe to', channel);
        } else {
          log.info(
            `socket ${this.socketRegistryKey}, subscribed to ${channel}`,
          );
        }
      });
    }

    this.socket.addEventListener(channel, callback);
    return { channel, callback };
  }

  /**
   * Unsubscribe from update on channel
   *
   * @param {string} channel
   * @param {function} callback
   * @private
   */
  unsubscribe(channel: string, callback: SocketListener) {
    // first, remove the listener
    this.socket.removeListener(channel, callback);
    // if no listeners remain on channel, emit unsubscribe
    if (this.socket.hasListeners(channel) === false) {
      // unsubscribe
      this.socket.emit('unsubscribe', channel, (res) => {
        if (!res) {
          log.warn('Unable to unsubscribe', channel);
        } else {
          log.info(
            `socket ${this.socketRegistryKey}, unsubscribed to ${channel}`,
          );
        }
      });
    }
  }

  /**
   * Wraps a listener (callback) by a function which does the following flow:
   *
   * 1. Parses the data as JSON
   * 2. Filter and parse the received data (using the 'filter' and 'parse' methods)
   * 3. Runs callback with the parsed object - only if real change has occurred
   * 4. If $scope was passed, runs $scope.$apply (to digest the event)
   * 5. Returns the callback result (as promise)
   *
   * @param {function} cb - Callback to run with parsed data
   * @param {angular.$scope} [$scope] - If passed, will run $scope.$apply before returning
   * @return {function(*=)} wrapped listener function
   */
  wrapListener(
    cb: (obj: any) => void,
    $scope?: angular.IScope,
  ): SocketListener {
    let lastMessage = '';
    return (message) => {
      if (message === lastMessage) {
        log.warn('Ignoring duplicate streamer notification');
        return Promise.resolve({});
      }
      const data = JSON.parse(message);
      const ret = this.filter(data).then((parsedData) => {
        const cbResult = cb(parsedData);

        if ($scope && $scope.$apply) {
          $scope.$apply(); // this is needed since the streamer event is outside angular
        }

        return cbResult;
      });

      lastMessage = message;

      return ret;
    };
  }

  /**
   * Return only white listed properties of object
   *
   * @param {Object} data
   * @returns {Promise}
   */
  filter(data: JSONObject): Promise<JSONObject> {
    // all properties are white listed. return parsed data without picking.
    if (_.isEmpty(this.whiteList)) {
      return Promise.resolve(this.parse(data));
    }

    // get only whitelisted fields
    return this.parse(_.pick(data, ...this.whiteList, 'id'));
  }

  /**
   * dynamic parse for streamer data fields
   * @param {object} data
   * @returns {Promise}
   */
  parse(data: JSONObject): Promise<JSONObject> {
    const promises = [];

    return this.generalParse(data).then((parsedData) => {
      // parse each field of the data object
      _.each(data, (fieldValue, fieldName) => {
        const parseFn = `parse${s.capitalize(fieldName)}`;
        if (_.isFunction(this[parseFn])) {
          promises.push(this[parseFn](data));
        }
      });

      return Promise.all(promises).then(() => data);
    });
  }

  /**
   *
   */
  generalParse(data: any): Promise<any> {
    return Promise.resolve(data);
  }

  /**
   * White list of properties that would passed by filter method.
   * Defined on child classes
   *
   * @returns {Array}
   */
  get whiteList(): string[] {
    return [];
  }

  /**
   * Getter for socket.io instance.
   *
   * Creates instance if it does not exist
   *
   * @returns {Socket} - used socket.io instance
   */
  get socket(): io.Socket {
    const socket = this.prfSocketRegistryService.getSocket(
      this.socketRegistryKey,
    );

    if (_.isUndefined(socket)) {
      return this.createSocket();
    }

    return socket;
  }

  /**
   * Allow to provide additional options for the socket construction.
   * @overide
   */
  getSocketOptions() {
    return {};
  }

  /**
   * Create socket.io instance and sets global "socket" var.
   */
  createSocket(): void {
    const url = this.getStreamerUrl();
    let socket = this.prfSocketRegistryService.getSocket(
      this.socketRegistryKey,
    );

    if (_l.isNil(socket)) {
      const query = this.buildSocketQuery();
      const socketOptions = this.getSocketOptions();
      /*
       * because of internal implementation of socket.io the connection
       * would be made once
       */
      socket = io(url, {
        query,
        ...socketOptions,
      });

      socket.on('connect', () => {
        log.info('Streamer connected');
      });

      socket.on('connect_error', (e) => {
        log.error('Streamer connection error %s', e);
      });

      socket.on('disconnect', (reason) => {
        log.error('Streamer disconnected: %s', reason);
      });

      socket.on('error', (e) => {
        log.error('Streamer error: %s', e);
      });

      this.isSocketReady$.next(true);

      this.prfSocketRegistryService.addSocket(this.socketRegistryKey, socket);
    }

    return socket;
  }

  getStreamerUrl() {
    return this.appConfig.connections.streamer;
  }

  buildSocketQuery() {
    const user = this.tokensService.getCachedUser();
    const query = `token=${user.jwt}&type=user`;
    return query;
  }

  streamOnUserUpdate() {
    return rx.pipe(
      () => this.prfClientGeneralPubsub.getObservable(),
      rx.withLatestFrom(this.isSocketReady$),
      rx.filter(
        ([pubsubPayload, isSocketReady]) =>
          isSocketReady && pubsubPayload.key === USER_UPDATE,
      ),
      rx.tap(([pubsubPayload, isSocketReady]) => {
        this.socket.emit(
          'refreshToken',
          (pubsubPayload as any).payload.jwt,
          (res) => {
            if (!res) {
              log.warn('Unable to update streamer token');
            }
          },
        );
      }),
      shareReplayRefOne(),
    )(null);
  }

  streamOnUserLogout() {
    return rx.pipe(
      () => this.prfClientGeneralPubsub.getObservable(),
      rx.withLatestFrom(this.isSocketReady$),
      rx.filter(
        ([pubsubPayload, isSocketReady]) =>
          isSocketReady && pubsubPayload.key === USER_LOGOUT,
      ),
      rx.tap(([pubsubPayload, isSocketReady]) => {
        this.socket.emit('refreshToken', '', (res) => {
          if (!res) {
            log.warn('Unable to remove streamer token');
          }
        });
      }),
      shareReplayRefOne(),
    )(null);
  }
}

export default SocketService;
