import * as rx from '@proftit/rxjs';
import * as _ from '@proftit/lodash';
import {
  buildOrderPnlChannel,
  Mt4OrderPnlSocketService,
} from '~/source/common/services/mt4-order-pnl-socket.service';
import { observeChannel } from '~/source/common/utilities/observe-channel';
import {
  buildCfdPlatformPositionPayoutChannel,
  CfdPlatformPositionPayoutSocketService,
} from '~/source/common/api-cfd-platform/cfd-platform-position-payout-socket.service';
import { Mt4OrderPnlUpdates } from '../mt4-order-pnl-updates';
import { CustomerPosition } from '@proftit/crm.api.models.entities';
import {
  PlatformCode,
  CustomerPositionStatusesCode,
  MT4_PLATFORMS,
} from '@proftit/crm.api.models.enums';
import { CurrentPlatformSessionStoreServiceDirectiveController } from '~/source/common/service-directives/current-platform-session-store-service.directive';
import { calcProfitFromCfdUpdate } from '../calc-profit-from-cfd-update';
import { CfdPositionStreamerUpdate } from '../cfd-position-streamer-update';

/**
 * Stream for all sockets subscribtions per position.
 *
 * @return observable stream
 */
export function streamSubscribeToPositionSocketsUpdates(
  positions$: rx.Observable<CustomerPosition[]>,
  OPEN_PNL_UPDATE_INTERVAL: number,
  mt4OrderPnlSocketService: () => Mt4OrderPnlSocketService,
  cfdPlatformPositionPayoutSocketService: () => CfdPlatformPositionPayoutSocketService,
  onEntityUpdate: (props: Object) => void,
  prfCurrentPlatformSession: CurrentPlatformSessionStoreServiceDirectiveController,
  propertiesToUpdate$: rx.Subject<any>,
) {
  return rx.pipe(
    () =>
      rx.obs.combineLatest(
        positions$,
        prfCurrentPlatformSession.sessionS.stream$,
      ),
    rx.switchMap(([positions, sessionInfo]) => {
      if (!sessionInfo.isLoggedIn) {
        return rx.obs.NEVER;
      }

      const listeners = positions.reduce((acc, position) => {
        const accountWatcher = observeAccountPerPlatform(
          position,
          mt4OrderPnlSocketService,
          cfdPlatformPositionPayoutSocketService,
          OPEN_PNL_UPDATE_INTERVAL,
          sessionInfo.session.token,
          sessionInfo.session.streamerUrl,
          propertiesToUpdate$,
        );

        if (!_.isNil(accountWatcher)) {
          acc.push(accountWatcher);
        }

        return acc;
      }, []);

      return rx.obs.merge(rx.obs.NEVER, ...listeners);
    }),
    rx.tap((props: Object) => {
      onEntityUpdate({
        ...props,
      });
    }),
  )(null);
}

function observeAccountPerPlatform(
  position: CustomerPosition,
  mt4OrderPnlSocketService: () => Mt4OrderPnlSocketService,
  cfdPlatformPositionPayoutSocketService: () => CfdPlatformPositionPayoutSocketService,
  OPEN_PNL_UPDATE_INTERVAL: number,
  token: string,
  streamerUrl: string,
  propertiesToUpdate$: rx.Subject<any>,
) {
  if (MT4_PLATFORMS.includes(position.tradingAccount.platform.code)) {
    const mt4$ = buildMt4UpdatesObserver(
      position,
      mt4OrderPnlSocketService,
      token,
      streamerUrl,
    ).pipe(
      rx.throttleTime(OPEN_PNL_UPDATE_INTERVAL),
      rx.map((updates: Mt4OrderPnlUpdates) => ({
        id: position.id,
        syncRemoteId: updates.order,
        profit: updates.pnl,
      })),
      rx.takeUntil(
        propertiesToUpdate$.pipe(
          rx.filter(
            (crmStreamerUpdate: any) => crmStreamerUpdate.id === position.id,
          ),
          rx.filter(
            (crmStreamerUpdate: any) =>
              crmStreamerUpdate.positionStatusCode ===
              CustomerPositionStatusesCode.ForexClose,
          ),
        ),
      ),
    );

    return mt4$;
  }

  if (
    [PlatformCode.Cfd, PlatformCode.Bundle].includes(
      position.tradingAccount.platform.code,
    )
  ) {
    if (
      position.positionStatusCode !== CustomerPositionStatusesCode.ForexOpen
    ) {
      return null;
    }

    const cfd$ = buildCfdUpdatesObserver(
      position,
      cfdPlatformPositionPayoutSocketService,
      token,
      streamerUrl,
    ).pipe(
      rx.throttleTime(OPEN_PNL_UPDATE_INTERVAL),
      rx.map((cfdProps: any) => ({
        id: position.id,
        syncRemoteId: cfdProps.positionId,
        payout: cfdProps.payout,
      })),
      rx.map((props: CfdPositionStreamerUpdate) => ({
        ...props,
        profit: calcProfitFromCfdUpdate(position, props),
      })),
      rx.takeUntil(
        propertiesToUpdate$.pipe(
          rx.filter(
            (crmStreamerUpdate: any) => crmStreamerUpdate.id === position.id,
          ),
          rx.filter(
            (crmStreamerUpdate: any) =>
              crmStreamerUpdate.positionStatusCode ===
              CustomerPositionStatusesCode.ForexClose,
          ),
        ),
      ),
    );

    return cfd$;
  }
}

/**
 * Build mt4 update socket observer
 *
 * @param position
 * @return observer thet register for mt4 updates per position.
 */
function buildMt4UpdatesObserver(
  position: CustomerPosition,
  mt4OrderPnlSocketService: () => Mt4OrderPnlSocketService,
  token: string,
  streamerUrl: string,
) {
  const serviceInstance = mt4OrderPnlSocketService();
  // serviceInstance.setToken(token);
  serviceInstance.setStreamerUrl(streamerUrl);
  const channel = buildOrderPnlChannel(position.syncRemoteId);

  return observeChannel<Mt4OrderPnlUpdates>(serviceInstance, channel);
}

/**
 * Build CFD update socket observer
 */
function buildCfdUpdatesObserver(
  position: CustomerPosition,
  cfdPlatformPositionPayoutSocketService: () => CfdPlatformPositionPayoutSocketService,
  token: string,
  streamerUrl: string,
) {
  const serviceInstance = cfdPlatformPositionPayoutSocketService();
  serviceInstance.setToken(token);
  serviceInstance.setStreamerUrl(streamerUrl);

  const channel = buildCfdPlatformPositionPayoutChannel(
    position.tradingAccount.syncRemoteId,
    position.syncRemoteId,
  );

  return observeChannel(serviceInstance, channel);
}
