/**
 * Copyright © 2021, AMN Healthcare, Inc. All rights reserved.
 */

import { webSocket } from 'rxjs/webSocket';
import {
  switchMap,
  mergeMap,
  retry,
  takeUntil,
  repeat,
  tap,
} from 'rxjs/operators';
import {
  EMPTY,
  NEVER,
  merge,
  BehaviorSubject,
  Subject,
  connectable,
  timer,
  of,
} from 'rxjs';
import type { Observable } from 'rxjs';
import { parseWebsocketMessage } from './protectedApiWebsocketMessages';
import { backoff, type BackoffOptions } from '@/utils/backoff';
import { debugLog } from '@/utils/debugLog';
import { z } from 'zod';
import { noticeError } from '@/utils/noticeError';
import * as R from 'remeda';

export type SessionWebsocketMessages = ReturnType<typeof parseWebsocketMessage>;

const HEARTBEAT = '--heartbeat--';
const HEARTBEAT_MAX_MISS = 2;
const HEARTBEAT_INTERVAL = 6000;

const retryOptions = {
  maxAttempts: undefined,
  retryInterval: 1_000,
  factor: { amount: 2.5, cappedDelay: 30_000 },
  jitter: true,
  minJitterDelay: 1_000,
} satisfies BackoffOptions;

const retryDelay = (retryCount: number) => {
  const delay = backoff(retryCount, retryOptions) ?? retryOptions.retryInterval;
  return timer(delay);
};

type SessionControl = { type: 'open'; url: string } | { type: 'close' };
type SessionStatus = { type: 'connected' } | { type: 'disconnected' };

const socket$ = new Subject<SessionControl>();
const status$ = new Subject<SessionStatus>();

export const sessionWebsocketStatus$ = connectable(status$, {
  connector: () => new BehaviorSubject<SessionStatus>({ type: 'disconnected' }),
  resetOnDisconnect: true,
});

export const sessionWebsocketConnection$ = connectable(
  socket$.pipe(
    switchMap((message) => {
      if (message.type === 'open') {
        const open$ = new Subject<Event>();
        const close$ = new Subject<CloseEvent>();
        const closing$ = new Subject<void>();
        const heartbeat$ = new Subject<void>();

        const reset$ = merge(open$, heartbeat$).pipe(
          switchMap(() => timer(HEARTBEAT_INTERVAL * HEARTBEAT_MAX_MISS)),
          takeUntil(merge(close$, closing$))
        );

        return webSocket({
          url: message.url,
          openObserver: {
            next: (event: Event) => {
              open$.next(event);
              status$.next({ type: 'connected' });
            },
          },
          closeObserver: {
            next: (event: CloseEvent) => {
              close$.next(event);
              status$.next({ type: 'disconnected' });
            },
          },
          closingObserver: {
            next: () => {
              closing$.next();
              status$.next({ type: 'disconnected' });
            },
          },
          deserializer: (messageEvent) => messageEvent,
        }).pipe(
          tap((event) => {
            if (event.data === HEARTBEAT) {
              heartbeat$.next();
            }
          }),
          mergeMap((event): Observable<SessionWebsocketMessages> => {
            if (event.data === HEARTBEAT) {
              return NEVER;
            }

            try {
              const message = parseWebsocketMessage(event.data);

              return of(message);
            } catch (err) {
              debugLog('[sessionsocket] Unhandled Message', err, event.data);

              if (err && err instanceof z.ZodError) {
                // Extract the action type from the websocket message.
                // We could record the whole message, but it's likely
                // better to record less info in newrelic.
                let actionType: string | undefined;

                try {
                  const msg = JSON.parse(event.data);
                  if (msg && R.isString(msg.action_type)) {
                    actionType = msg.action_type;
                  }
                } catch (_err) {}

                noticeError(err, { actionType: actionType });

                console.error('[sessionsocket] ZodError', err, event.data);
              }

              return NEVER;
            }
          }),
          takeUntil(reset$),
          repeat({
            count: Infinity,
            delay: (retryCount) => retryDelay(retryCount),
          }),
          retry({
            count: Infinity,
            delay: (_, retryCount) => retryDelay(retryCount),
          })
        );
      }

      return EMPTY;
    })
  ),
  { connector: () => new Subject(), resetOnDisconnect: true }
);

sessionWebsocketStatus$.connect();
sessionWebsocketConnection$.connect();

export function openSessionSocket(url: string) {
  socket$.next({ type: 'open', url: url });
}

export function closeSessionSocket() {
  socket$.next({ type: 'close' });
}
