import { Observable, NextObserver, Subject, NEVER } from 'rxjs';
import { filter, switchMap, finalize } from 'rxjs/operators';
import { v4 as uuid } from 'uuid';

import { filterUndefined, parseException } from '../../common';
import { UQuery, QueryHash, StreamQueryResult, ResultSource, StreamQuerySuccess, KustoQueryResult } from '../../domain';

import { ITelemetryService } from '../backend';
import type { IQueryCache, QueryMeta } from '../../core';

import { QueryStore } from './QueryStore';
import { QueryQueueEntry, QueryStoreFetch, QuerySchedulerStatus } from './types';

/**
 * Schedules promise-style queries
 */
export class QueryScheduler {
    private readonly telemetry: ITelemetryService;
    private readonly cache: IQueryCache;

    /**
     * Contains all queries currently being scheduled
     */
    public readonly queue: Record<QueryHash, QueryQueueEntry> = {};
    private readonly queryStore = new QueryStore();

    private readonly schedulerSubject = new Subject<QuerySchedulerStatus>();

    private configuredRefreshInterval:
        | {
              interval: number;
              dashboardId: string;
          }
        | undefined;
    private cancelIntervalTimer: (() => void) | undefined = undefined;
    private isRefreshBackgrounded = false;
    private cancelVisibilityChangeListener: () => void;

    constructor(cache: IQueryCache, telemetry: ITelemetryService) {
        this.cache = cache;
        this.telemetry = telemetry;

        this.processQuery = this.processQuery.bind(this);
        this.timerProcessQueue = this.timerProcessQueue.bind(this);
        this.setTimerBackground = this.setTimerBackground.bind(this);

        // Listen to hidden events, and background when the tab is no longer visible
        this.setTimerBackground(document.hidden);
        const onVisibilityChange = () => {
            this.setTimerBackground(document.hidden);

            if (!document.hidden) {
                // Newly became visible. Process deferred
                for (const entry of Object.values(this.queue)) {
                    this.processQuery(entry, true, true);
                }
            }
        };
        document.addEventListener('visibilitychange', onVisibilityChange);
        this.cancelVisibilityChangeListener = () =>
            document.removeEventListener('visibilitychange', onVisibilityChange);
    }

    /**
     * Observes the given query
     * @param query The query to execute
     * @param previousQueryHash The hash of the previously registered query from this same source. If it differs from `query`
     * the previously registered query will be de-registered
     * @param isPersistent If true, `query` will not be de-registered by a call to `deregisterAllNonPersistentQueries`
     */
    register(
        query: UQuery,
        previousQueryHash: string | undefined,
        dashboardId: string,
        executor: (q: UQuery, meta: QueryMeta) => Promise<KustoQueryResult>,
        isPersistent = false
    ): Observable<StreamQueryResult> {
        const { sourceId: cacheKey, queryHash } = query;

        let subject = this.queryStore.getData(queryHash);

        let entry: QueryQueueEntry | undefined;

        if (!subject) {
            let seed: StreamQueryResult | undefined = undefined;

            if (previousQueryHash !== query.queryHash) {
                const previousSubject = previousQueryHash ? this.queryStore.getData(previousQueryHash) : undefined;

                // Pre-populate new observable with old query's data
                seed = previousSubject?.value;
            }

            subject = this.queryStore.create(queryHash, seed);

            entry = {
                query,
                executor,
                fetch: 'immediate',
                dashboardId,
                isPersistent,
                knownRegistrations: 1,
            };

            this.queue[queryHash] = entry;

            // Prepare a single source of cache updates if the cache key is provided
            this.getFromCache(queryHash).then((result) => {
                if (result && subject?.value.kind === 'notRun') {
                    // Only push data from cache if subject doesn't currently have data
                    subject?.next({
                        ...result,
                        status:
                            subject.value.status?.kind === 'loading'
                                ? {
                                      ...subject.value.status,
                                      lastCompleted: result.result.receivedTime,
                                  }
                                : result.status,
                    });
                }
            });

            subject
                .asObservable()
                .pipe(
                    filter(filterUndefined),
                    filter(
                        (r): r is StreamQuerySuccess =>
                            r.kind === 'success' && r.result.source !== ResultSource.cache && r.status.kind === 'done'
                    )
                )
                .subscribe(this.saveToCache(cacheKey, queryHash));

            this.schedulerSubject.next({
                type: 'create',
                hash: query.queryHash,
            });
        } else {
            // Check that queue entry exists
            entry = this.queue[query.queryHash];

            if (!entry) {
                // Create one without any fetch
                entry = {
                    query,
                    executor,
                    fetch: 'none',
                    dashboardId,
                    isPersistent,
                    knownRegistrations: 1,
                };

                this.queue[query.queryHash] = entry;
            } else {
                entry.knownRegistrations += 1;
            }
        }

        if (previousQueryHash && previousQueryHash !== query.queryHash) {
            const queueEntry = this.queue[previousQueryHash];

            // Only destroy if new entry has the same or higher persistence
            if (queueEntry && (isPersistent || !queueEntry.isPersistent)) {
                this.destroy(previousQueryHash);
            }
        }

        // Process deferred entries, executing them
        this.processQuery(entry, true);

        return subject.asObservable().pipe(
            // Decrementing known registration on finalize is _wrong_! In fact,
            // tracking known registrations via observable finalize is wrong.
            finalize(() => {
                const finalEntry = this.queue[query.queryHash];

                if (!finalEntry) {
                    return;
                }

                finalEntry.knownRegistrations -= 1;

                if (finalEntry.knownRegistrations < 0) {
                    // TODO #9976856
                    // Using "trackException" so we get it logged as a error, because this
                    // indicates a bug, but `asserting` here makes it worse. This can get
                    // tripped 100% by:
                    //   1. Registry query
                    //   2. Subscribe to returned observer
                    //   3. Manually destroy registration
                    //   4. Register same query again
                    //   5. unsubscribed from observer returned by
                    //
                    //  (Or at least something close to this)
                    //
                    //   Search test file for "TODO: Demonstration of querySchedular bug"
                    //   for demonstration
                    this.telemetry.trackException(
                        new Error('Should not be possible for known registrations to drop below 0')
                    );
                }
            })
        );
    }

    /**
     * Observes an existing query, identified by the provided `queryHash`
     * If the query cannot be found, the produced observable will wait for it to exist and then transfer the subscription to the data pipeline observable
     * @param queryHash The hash of the query to observe
     */
    observe(queryHash: string): Observable<StreamQueryResult> {
        // Check store to see if this query is already registered
        const subject = this.queryStore.getData(queryHash);

        if (!subject) {
            // Has not been registered (and may never be)
            // Observe the scheduler events to see if it gets created
            return this.schedulerSubject.asObservable().pipe(
                filter((r) => r.type === 'create' && r.hash === queryHash),
                switchMap(() => {
                    const newSubject = this.queryStore.getData(queryHash);

                    // If we have a store item now, switch to using that observable chain
                    return newSubject?.asObservable() ?? NEVER;
                })
            );
        } else {
            return subject.asObservable();
        }
    }

    /**
     * Removes a query from the in-memory cache and queue
     * In order to run this query again, it must be re-registered
     * @param queryHash The hash of the query to deregister
     * @returns True if the query was successfully removed. False if the query could not be found or could not be removed
     */
    destroy(queryHash: string): boolean {
        const subject = this.queryStore.getData(queryHash);

        if (!subject) {
            return false;
        }

        // No observers, remove from queue and remove store entry
        if (this.queryStore.remove(queryHash)) {
            delete this.queue[queryHash];
            return true;
        }

        return false;
    }

    /**
     * Removes all registered queries without the `isPersistent` flag
     */
    destroyNonPersistent(): void {
        this.destroyQueries(false);
    }

    /**
     * Removes all registered queries (including those with the `isPersistent` flag)
     */
    destroyAll(): void {
        this.destroyQueries(true);
    }

    /**
     * Requests the latest data from all currently registered queries
     * @param fetch When the query execution should occur
     * @param skipIfCurrentlyLoading If true, don't fetch query if it's currently being executed
     */
    async requestAll(fetch: Exclude<QueryStoreFetch, 'none'>, skipIfCurrentlyLoading = false): Promise<void> {
        const results = Object.values(this.queue).map((entry) =>
            this.requestQueueEntry(entry, fetch, skipIfCurrentlyLoading)
        );
        await Promise.all(results);
    }

    /**
     * Requests the latest data from the given query, with immediacy provided by `fetch`
     * @param queryHash The hash of the previously registered query to execute
     * @param fetch When the query execution should occur
     * @param skipIfCurrentlyLoading If true, don't fetch query if it's currently being executed
     */
    async request(
        queryHash: string,
        fetch: Exclude<QueryStoreFetch, 'none'>,
        skipIfCurrentlyLoading = false
    ): Promise<void> {
        const queueEntry = this.queue[queryHash];

        if (!queueEntry) {
            return;
        }

        await this.requestQueueEntry(queueEntry, fetch, skipIfCurrentlyLoading);
    }

    startTimer(dashboardId: string, interval: number) {
        this.configuredRefreshInterval = {
            interval,
            dashboardId,
        };

        const intervalId = window.setInterval(this.timerProcessQueue, this.configuredRefreshInterval.interval * 1000);

        this.cancelIntervalTimer?.();
        this.cancelIntervalTimer = () => window.clearInterval(intervalId);
    }

    cancelTimer() {
        this.cancelIntervalTimer?.();
    }

    setTimerBackground(background: boolean) {
        this.isRefreshBackgrounded = background;
    }

    /* Execution */

    private async requestQueueEntry(
        entry: QueryQueueEntry,
        fetch: Exclude<QueryStoreFetch, 'none'>,
        skipIfCurrentlyLoading = false
    ): Promise<void> {
        const currentPriority = priorityForFetch[entry.fetch];
        const newPriority = priorityForFetch[fetch];

        if (newPriority - currentPriority > 0) {
            // Priority elevation
            let newFetch = fetch;

            if (fetch === 'immediate') {
                // If attempting to elevate to immediate, we need to ensure that there are current `register` subscriptions. Otherwise, simply elevate to defer
                newFetch = entry.knownRegistrations > 0 ? 'immediate' : 'defer';
            }

            const newEntry: QueryQueueEntry = {
                ...entry,
                fetch: newFetch,
            };

            this.queue[entry.query.queryHash] = newEntry;

            await this.processQuery(newEntry, false, skipIfCurrentlyLoading);
        }
    }

    private timerProcessQueue() {
        // If backgrounded, request deferred updates
        this.requestAll(this.isRefreshBackgrounded ? 'defer' : 'immediate', true);

        this.telemetry.trackEvent(!this.isRefreshBackgrounded ? 'AutoRefreshTriggered' : 'AutoRefreshDeferred', {
            dashboardId: this.configuredRefreshInterval?.dashboardId,
        });
    }

    private async processQuery(queueEntry: QueryQueueEntry, handleDefer: boolean, skipIfCurrentlyLoading = false) {
        const { query, executor, fetch } = queueEntry;
        if (fetch === 'none') {
            return;
        }

        const subject = this.queryStore.getData(query.queryHash);

        if (!subject) {
            // eslint-disable-next-line no-console
            console.warn(`Cannot find storeItem for query ${query.query}`);

            // Can't find subject, remove from queue
            delete this.queue[query.queryHash];
            return;
        }

        if (fetch === 'immediate' || (fetch === 'defer' && handleDefer)) {
            const previousStatus = subject.value?.status;

            if (skipIfCurrentlyLoading && previousStatus?.kind === 'loading') {
                // Currently loading, skipping
                // Update queue as if we started fetch
                this.queue[query.queryHash] = {
                    ...queueEntry,
                    fetch: 'none',
                };

                return;
            }

            const executionId = uuid();

            subject.next({
                ...subject.value,
                status: {
                    executionId,
                    kind: 'loading',
                    startTime: new Date(),
                    lastCompleted: previousStatus?.kind === 'done' ? previousStatus.endTime : undefined,
                },
            });

            // Fetch executing, clear status
            this.queue[query.queryHash] = {
                ...queueEntry,
                fetch: 'none',
            };

            await executor(query, { dashboardId: queueEntry.dashboardId, executionId })
                .then((value) => {
                    subject.next({
                        kind: 'success',
                        result: value,
                        status: {
                            executionId,
                            kind: 'done',
                            startTime: subject.value.status?.startTime ?? new Date(),
                            endTime: new Date(),
                        },
                    });
                })
                .catch((err) => {
                    const receivedTime = new Date();

                    subject.next({
                        kind: 'err',
                        didRun: true,
                        reason: { title: parseException(err) },
                        receivedTime,
                        status: {
                            executionId,
                            kind: 'done',
                            startTime: subject.value.status?.startTime ?? receivedTime,
                            endTime: receivedTime,
                        },
                    });
                });
        }
    }

    /* Destruction */

    public dispose() {
        this.cancelVisibilityChangeListener();
        this.destroyQueries(true);
        this.cancelIntervalTimer?.();
    }

    private destroyQueries(includingPersistent: boolean) {
        for (const entry of Object.values(this.queue)) {
            if (includingPersistent || !entry.isPersistent) {
                this.destroy(entry.query.queryHash);
            }
        }
    }

    /* Cache */

    private getFromCache(equalityHash: string): Promise<StreamQuerySuccess | undefined> {
        // Retrieve result from local cache
        return this.cache.getCachedData(equalityHash).then((result): StreamQuerySuccess | undefined =>
            result
                ? {
                      kind: 'success',
                      result,
                      status: {
                          // This was executed some time in the past. We don't know what ID it was given, so keep undefined
                          executionId: undefined,
                          kind: 'done',
                          startTime: result.receivedTime,
                          endTime: result.receivedTime,
                      },
                  }
                : undefined
        );
    }

    private saveToCache(cacheKey: string, equalityHash: string): NextObserver<StreamQuerySuccess> {
        return {
            next: ({ result }) => {
                if (result) {
                    // Save result to local cache
                    this.cache.saveCachedData(cacheKey, equalityHash, {
                        ...result,
                        source: ResultSource.cache,
                    });
                }
            },
        };
    }
}

const priorityForFetch: Readonly<Record<QueryStoreFetch, number>> = {
    none: 0,
    defer: 1,
    immediate: 2,
};
