import config from "@config";
import { SSEMessage, parseSSEMessage } from "@models";
import { ChangeSuggestionRequestBody, SuggestionRequestBody } from "@types";
import { captureSentryException } from "@utils";
import { authHelper } from "src/stores/helpers";

// Track active streams by element_id
export const activeStreams: Map<string, AbortController> = new Map();

export async function* SSEStream(
  path: string,
  body: ChangeSuggestionRequestBody | SuggestionRequestBody
): AsyncGenerator<SSEMessage> {
  const elementId = body.step.id;

  // Reject if there's already an active stream for this element
  if (activeStreams.has(elementId)) {
    yield {
      event: "error",
      id: "error",
      data: {
        content: `A stream is already in progress for element ${elementId}. Please wait for it to complete.`,
        metadata: {},
      },
    };
    return;
  }

  // Create a new AbortController for this stream
  const streamController = new AbortController();
  activeStreams.set(elementId, streamController);

  try {
    const url = config.backends.assistant.baseUrl + path;
    const accessToken = await authHelper.getAccessTokenSilently()();

    const response = await fetch(url, {
      method: "POST",
      headers: {
        "Content-Type": "application/json",
        Accept: "text/event-stream",
        Authorization: `Bearer ${accessToken}`,
      },
      body: JSON.stringify(body),
      signal: streamController.signal,
    });

    // Check that the response is OK. If not, throw an error.
    if (!response.ok) {
      throw new Error(`HTTP error! status: ${response.status}`);
    }

    const reader = response.body?.getReader();
    if (!reader) {
      throw new Error("Could not fetch suggestions. Reader is null.");
    }

    const decoder = new TextDecoder("utf-8");
    let buffer = "";

    while (true) {
      // Check if the stream has been aborted.
      if (streamController.signal.aborted) break;

      const { done, value } = await reader.read();
      if (done) break;

      // Decode and accumulate the chunk.
      buffer += decoder.decode(value, { stream: true });

      // Split the buffer on the SSE delimiter (two newlines)
      // This assumes that your SSE events are separated by "\n\n"
      const parts = buffer.split("\n\n");

      // The last part might be an incomplete event, so keep it in the buffer.
      buffer = parts.pop() || "";

      // Process each complete event.
      for (const part of parts) {
        // Skip any empty parts
        if (!part.trim()) continue;

        try {
          const message = parseSSEMessage(part);
          yield message;
        } catch (parseError) {
          captureSentryException(parseError);
          yield {
            event: "error",
            id: "error",
            data: {
              content: `Parsing error: ${(parseError as Error).message}`,
              metadata: {},
            },
          };
        }
      }
    }

    // Optionally, flush any remaining data in the buffer
    if (buffer.trim() !== "") {
      try {
        yield parseSSEMessage(buffer);
      } catch (parseError) {
        captureSentryException(parseError);
        yield {
          event: "error",
          id: "error",
          data: {
            content: `Parsing error on final flush: ${(parseError as Error).message}`,
            metadata: {},
          },
        };
      }
    }
  } catch (error: any) {
    captureSentryException(error);
    yield {
      event: "error",
      id: "error",
      data: {
        content: error.message || JSON.stringify(error),
        metadata: {},
      },
    };
  } finally {
    // Clean up the controller reference when done
    activeStreams.delete(elementId);
  }
}
