import { Injectable } from '@angular/core';
import {
  GetPresignedUploadUrlQuery,
  GetPresignedUploadUrlQueryService,
} from '../graphql/get-presigned-upload-url.onelife.generated';
import {
  StartRecordingMutation,
  StartRecordingMutationService,
} from '../graphql/start-recording.onelife.generated';
import {
  StopRecordingMutation,
  StopRecordingMutationService,
} from '../graphql/stop-recording.onelife.generated';
import {
  TranscribeRecordingMutation,
  TranscribeRecordingMutationService,
} from '../graphql/transcribe-recording.onelife.generated';
import { ApolloQueryResult } from '@apollo/client';
import { firstValueFrom, lastValueFrom, Subject } from 'rxjs';
import { CreateRecordingMutationService } from '../graphql/create-recording.onelife.generated';
import { MutationResult } from 'apollo-angular';
import {
  StartStreamingSessionMutation,
  StartStreamingSessionMutationService,
} from '@app/features/healthscribe/graphql/start-streaming-session.onelife.generated';
import { QueueAudioChunkMutationService } from '@app/features/healthscribe/graphql/queue-audio-chunk.onelife.generated';

export interface ViewableFieldsType {
  chiefComplaint: boolean;
  subjective: boolean;
  hpi: boolean;
  ros: boolean;
  pmh: boolean;
  assessment: boolean;
  plan: boolean;
  physicalExam: boolean;
  assessmentAndPlan: boolean;
  afterVisitSummary: boolean;
}

/**
 * Semaphore that ensures that lower priority promises do not run until all higher-priority
 * promises have completed. Allows multiple higher priority promises to run at once, and multiple
 * lower priority promises to run at once. But multiple higher priority and lower priority promises cannot
 * run at the same time.
 *
 * TODO: Refactor code below to use Semaphore from `async-mutex` library. Delete this class!
 *
 * Example:
 *
 * ```ts
 * const semaphore = new AsyncSemaphore()
 *
 * // Executes immediately, no open promises
 * const withoutResource = semaphore.runLowerPriority(async () => {})
 *
 * // Executed immediately.
 * const withResource = semaphore.runHigherPriority(() => new Promise(x => setTimeout(x, 5000))
 *
 * // DOES NOT RUN. `withResource` has not finished yet!
 * const withoutResourceTwo = semaphore.runLowerPriority(async () => {})
 *
 * await withResource
 *
 * // Immediately invoked because `withResource` is done now. `withoutResourceTwo` is invoked before it.
 * const withoutResourceThree = semaphore.runLowerPriority(async () => {})
 * ```
 */
export class AsyncSemaphore {
  private finished = new Subject<'emit'>();
  countRunning = 0;

  private start(): void {
    this.countRunning += 1;
  }

  private end(): void {
    this.countRunning -= 1;

    if (this.countRunning === 0) {
      // We don't want to run this immediately. Subject executes synchronously.
      // So if we use queueMicrotask instead, the non-resource needing functions will run after the current microtask executes.
      // AKA they will run after the function that awaits `runWithResource` is finished.
      queueMicrotask(() => this.finished.next('emit'));
    }
  }

  /**
   * Runs a function that is executed immediately and blocks all `runLowerPriority` calls until it is finished executing.
   * @param callback
   */
  async runHigherPriority<T>(callback: () => Promise<T>): Promise<T> {
    this.start();
    try {
      return await callback();
    } catch (e) {
      throw e;
    } finally {
      this.end();
    }
  }

  private async wait(): Promise<void> {
    if (this.countRunning === 0) {
      return;
    }
    await firstValueFrom(this.finished);
    await this.wait();
  }

  /**
   * Runs the callback after all active `runHigherPriority` callbacks have been run.
   * If there are ANY active `runHigherPriority` callbacks open, including ones that have been made after the
   * initial `runLowerPriority` function, `callback` will not be invoked until those other promises have been resolved.
   *
   * NOTE: Once the last `runHigherPriority` function is executed, this is resolved on the NEXT MICROTASK. Not the current one.
   * So the function that awaits `runHigherPriority` will run before `callback` is executed.
   */
  async runLowerPriority<T>(callback: () => Promise<T>): Promise<T> {
    await this.wait();

    return await callback();
  }
}

@Injectable({
  providedIn: 'root',
})
export class HealthscribeRecordingUploaderService {
  constructor(
    private getPresignedUploadUrlService: GetPresignedUploadUrlQueryService,
    private startRecordingService: StartRecordingMutationService,
    private stopRecordingService: StopRecordingMutationService,
    private transcribeRecordingService: TranscribeRecordingMutationService,
    private createRecordingService: CreateRecordingMutationService,
    private startStreamingSessionService: StartStreamingSessionMutationService,
    private queueAudioChunkService: QueueAudioChunkMutationService,
  ) {}

  // We want any `stopRecording`, `transcribeRecording`, etc. messages to wait for any
  // remaining chunks to upload. So we use a semaphore here to ensure that they run after any open
  // chunk uploading promises finish.
  //
  // Why do we need a semaphore? `onChunk` and `stopRecording` / `transcribeRecording` are executed in separate
  // microtasks, completely different async functions. This means that we cannot wait for `onChunk` to finish
  // from `stopRecording` unless we stick the `onChunk` promise in a property somewhere (like on this service).
  // However, there can be multiple in-flight `onChunk` promises when the recording ends (ex. a two min 1 second
  // recording will have two chunks uploaded at around the same time). So we need to wait for all open onChunk
  // functions to finish. So we use the `AsyncSemaphore` here to do that, setting `onChunk` as a higher priority
  // function and `stopRecording` and `transcribeRecording` as lower priority functions. `stopRecording` and
  // `transcribeRecording` will now only execute after the entire microtask of `onChunk` completes!
  private semaphore = new AsyncSemaphore();

  getNextPresignedUrl(
    appointmentRecordingId: string,
  ): Promise<ApolloQueryResult<GetPresignedUploadUrlQuery>> {
    return lastValueFrom(
      this.getPresignedUploadUrlService.fetch(
        {
          id: appointmentRecordingId,
        },
        {
          fetchPolicy: 'network-only',
        },
      ),
    );
  }

  async createRecording(appointmentId: string): Promise<string | undefined> {
    const result = await lastValueFrom(
      this.createRecordingService.mutate({
        input: {
          appointmentId: appointmentId,
        },
      }),
    );
    return result.data?.createAppointmentRecording?.recording?.id;
  }

  startRecording(
    appointmentRecordingId: string,
  ): Promise<MutationResult<StartRecordingMutation>> {
    return lastValueFrom(
      this.startRecordingService.mutate({
        input: { appointmentRecordingId: appointmentRecordingId },
      }),
    );
  }

  startStreamingSession(
    appointmentRecordingId: string,
  ): Promise<MutationResult<StartStreamingSessionMutation>> {
    return lastValueFrom(
      this.startStreamingSessionService.mutate({
        input: { appointmentRecordingId: appointmentRecordingId },
      }),
    );
  }

  async stopRecording(
    appointmentRecordingId: string,
    viewableFields: ViewableFieldsType,
  ): Promise<MutationResult<StopRecordingMutation>> {
    // Wrap this so that we don't run this at the same time as onChunk.
    return await this.semaphore.runLowerPriority(() => {
      return lastValueFrom(
        this.stopRecordingService.mutate({
          input: {
            appointmentRecordingId: appointmentRecordingId,
            defaultFields: JSON.stringify(viewableFields),
          },
        }),
      );
    });
  }

  async transcribeRecording(
    appointmentRecordingId: string,
  ): Promise<MutationResult<TranscribeRecordingMutation>> {
    // Wrap this so that we don't run this at the same time as onChunk.
    return await this.semaphore.runLowerPriority(() => {
      return lastValueFrom(
        this.transcribeRecordingService.mutate({
          input: { appointmentRecordingId: appointmentRecordingId },
        }),
      );
    });
  }

  async onChunk(
    appointmentRecordingId: string,
    blob: Blob,
    scribeStreamingInChartEnabled: boolean,
  ): Promise<void> {
    // Make sure that transcribeRecording and stopRecording have to wait for any remaining chunk uploads to finish.
    return await this.semaphore.runHigherPriority(async () => {
      if (!scribeStreamingInChartEnabled) {
        const result = await this.getNextPresignedUrl(appointmentRecordingId);
        const url = result.data?.appointmentRecording
          ?.presignedUploadUrl as string;
        const file = createFileFromBlob(blob);
        if (!url) {
          throw new Error('no url');
        }
        await fetch(url, {
          method: 'PUT',
          body: file,
        });
      } else {
        const result = await blobToBase64(blob);
        if (typeof result === 'string') {
          const base64String = result.split(',')[1];
          await lastValueFrom(
            this.queueAudioChunkService.mutate({
              input: {
                appointmentRecordingId: appointmentRecordingId,
                audioChunk: base64String,
              },
            }),
          );
        }
      }
    });
  }
}

function blobToBase64(blob) {
  return new Promise((resolve, _) => {
    const reader = new FileReader();
    reader.onloadend = () => resolve(reader.result);
    reader.readAsDataURL(blob);
  });
}

function createFileFromBlob(blob: Blob): File {
  return new File([blob], `blob.webm`, { type: 'audio/webm' });
}
