import * as rx from '@proftit/rxjs';

function catchErrorInProcessOff(inProcess$, continueFn) {
  return obs$ => {
    return obs$.pipe(rx.catchError(error => {
      inProcess$.next({
        error,
        inProcess: false
      });
      return continueFn(error);
    }));
  };
}

function shareReplayRefOne() {
  return rx.shareReplay({
    bufferSize: 1,
    refCount: true
  });
}

function combineInProcesses(inProcesses) {
  return rx.pipe(() => rx.obs.combineLatest(...inProcesses), rx.map(list => list.some(i => i.inProcess)), rx.map(inProcess => ({
    inProcess,
    error: null
  })), shareReplayRefOne())(null);
}

function distinctUntilChangedWithState(saveStateFn, compareFn) {
  return obs$ => {
    let last;
    return obs$.pipe(rx.filter(curr => !compareFn(last, curr)), rx.tap(curr => last = saveStateFn(curr)));
  };
}

/*
 * Generate an object with 'value' prop on it that hides
 * behind it access to observable value and action to
 * set that value.
 *
 * This is used maily for ng-model cases where there is
 * a need to give ng-model a regular binding but
 * have reactive listening to that value changes.
 *
 * @param setOp$ - rxjs action that will be trigger when value is changes.
 * @param data$ -  rxjs observable to pull data that will be given to anyone doing 'get' for the value.
 * @param until$ - subject to listen to and clean the subscriptions when it emits.
 *
 * @returns a proxy that has 'value' prop on it to be used in 'nd-model' primarly.
 */

function generateReactiveGetterSetter(setOp$, data$, until$) {
  const inst = {};
  let latestData;
  data$.pipe(rx.tap(data => latestData = data), rx.takeUntil(until$)).subscribe();
  Object.defineProperty(inst, 'value', {
    get() {
      return latestData;
    },

    set(item) {
      setOp$.next(item);
    }

  });
  return inst;
}

function generateInProcessSubject() {
  return new rx.BehaviorSubject({
    inProcess: false,
    error: null
  });
}

function inProcessFilter(inProcess$, predicate) {
  return rx.pipe(rx.filter((...args) => {
    const result = predicate(...args);

    if (!result) {
      inProcess$.next({
        inProcess: false,
        error: null
      });
    }

    return result;
  }));
}

function tapLog(name, ...args) {
  /* eslint-disable no-console */
  // tslint:disable-next-line:no-console
  return rx.tap(x => console.log(name, x, ...args.map(fn => fn())));
  /* eslint-enable no-console */
}

function pipeLog(labelPrefix, ...origOperations) {
  let counter = 0;
  const operations = origOperations.reduce((acc, op) => {
    const label = `${labelPrefix} #${counter}: `;
    const logOp = tapLog(label);
    acc.push(op);
    acc.push(logOp);
    counter += 1;
    return acc;
  }, []); // eslint-disable-next-line @typescript-eslint/ban-ts-ignore
  // @ts-ignore

  return rx.pipe(...operations);
}

function syncOneWay(source$, target$, setFn, compareFn) {
  return rx.pipe(() => source$, rx.distinctUntilChanged(compareFn), rx.withLatestFrom(target$.pipe(rx.startWith(undefined))), rx.filter(([val, target]) => val !== target), rx.tap(([val]) => setFn(val)), rx.map(() => true))(null);
}

function syncTwoWay(obsA$, setValueA, obsB$, setValueB, compareFn) {
  const streamAtoB = syncOneWay(obsA$, obsB$, setValueB, compareFn);
  const streamBtoA = syncOneWay(obsB$, obsA$, setValueA, compareFn);
  return rx.obs.merge(streamAtoB, streamBtoA);
}

function tapFinalizeInProcessOff(inProcess$) {
  return obs$ => {
    return obs$.pipe(rx.tap(() => {
      inProcess$.next({
        inProcess: false,
        error: null
      });
    }), rx.finalize(() => {
      inProcess$.next({
        inProcess: false,
        error: null
      });
    }));
  };
}

function tapInProcessOn(inProcess$) {
  return obs$ => {
    return obs$.pipe(rx.tap(() => {
      inProcess$.next({
        inProcess: true,
        error: null
      });
    }));
  };
}

/**
 * Pattern to use streams inside a component.
 *
 * Subscribe to stream after all streams are constructed. Unsubscribe from it
 * automaticly using `takeUntil` the unsub$ observable emit that the component
 * is ending it's life cycle.
 *
 * @param streams - The streams sources
 * @param usub$ - Observable returning when the component is ending.
 * @return - subscription.
 */

function useStreams(streams, unsub$) {
  return rx.pipe(() => rx.obs.combineLatest(streams), rx.takeUntil(unsub$))(null).subscribe();
}

export { catchErrorInProcessOff, combineInProcesses, distinctUntilChangedWithState, generateInProcessSubject, generateReactiveGetterSetter, inProcessFilter, pipeLog, shareReplayRefOne, syncOneWay, syncTwoWay, tapFinalizeInProcessOff, tapInProcessOn, tapLog, useStreams };
