import { Observable, Subscription } from 'rxjs';
import { take, shareReplay, takeUntil } from 'rxjs/operators';

import { Action, ActionType } from '@ngxs/store';

import { clearStreamAction$ } from '@shared/decorators/clear-stream.decorator';

export function StreamAction(actions: ActionType | ActionType[]) {
  return (target: any, name: string, descriptor: TypedPropertyDescriptor<any>) => {
    Action(actions)(target, name, descriptor);

    const originalMethod: Function = descriptor.value;
    const calls: { [args: string]: Subscription } = {};

    descriptor.value = function (ctx, action) {
      let obs: Observable<any> = originalMethod.apply(this, [ctx, action]);

      if (!(obs instanceof Observable)) {
        return obs;
      }

      obs = obs.pipe(takeUntil(clearStreamAction$));

      const args = JSON.stringify(action);

      if (calls[args]) {
        calls[args].unsubscribe();
      }

      calls[args] = obs.subscribe({
        error(error: { code: string }) {
          console.error(error);
        },
      });

      return obs.pipe(shareReplay({ refCount: true, bufferSize: 1 }), take(1));
    };

    return descriptor;
  };
}
