import { Observable, ErrorObserver } from 'rxjs';
import { defaultIfEmpty, map, scan, tap, timeout } from 'rxjs/operators';

import { IQueryCache } from '../types';
import { KustoSchema } from '../../domain';
import { err, ok, ReadonlyRecord, Result, InterfaceFor, Ok } from '../../common';

import {
    ResultSource,
    QueryExecutionId,
    UQuery,
    KustoQueryResult,
    QueryResultFragment,
    StreamQueryResult,
    QueryHash,
    ExternalKustoQueryResult,
} from '../../domain/query';
import type { UDataSource } from '../domain';
import { ITelemetryService } from '../backend';
import { IScope, QueryMeta } from '../config';
import { APP_STRINGS } from '../../res';

import { applyFragment } from './fragment';
import { QueryScheduler } from './QueryScheduler';

export type IQueryService = InterfaceFor<QueryService>;

/**
 * Provides access to data sources via the own backend service.
 */
export class QueryService implements IQueryService {
    /**
     * Four minutes in milliseconds to match ADX's default timeout
     */
    private static readonly queryTimeout = 4 * 60 * 1000;
    private readonly scheduler: QueryScheduler;
    private readonly scopeRecord: ReadonlyRecord<string, undefined | Ok<IScope>>;

    constructor(
        queryCache: IQueryCache,
        private readonly telemetryService: ITelemetryService,
        public readonly scopes: IScope[]
    ) {
        this.scopeRecord = scopes.reduce<Record<string, Ok<IScope>>>((acc, s) => {
            acc[s.id] = ok(s);
            return acc;
        }, {});

        this.scheduler = new QueryScheduler(queryCache, telemetryService);

        this.executeQuery = this.executeQuery.bind(this);
    }

    dispose() {
        this.scheduler.dispose();
    }

    getScope(scopeId: string): Result<IScope> {
        const scope = this.scopeRecord[scopeId];
        if (scope) {
            return scope;
        }
        const message = `${APP_STRINGS.query.errors.missingScope}: ${scopeId}`;
        this.telemetryService.trackException(new Error(message));
        return err(message);
    }

    /**
     * Observes the given query
     * @param query The query to execute
     * @param previousQueryHash The hash of the previously registered query from this same source. If it differs from `query`
     * the previously registered query will be de-registered
     * @param isPersistent If true, `query` will not be de-registered by a call to `deregisterAllNonPersistentQueries`
     */
    registerAndObserveQuery(
        query: UQuery,
        previousQueryHash: QueryHash | undefined,
        dashboardId: string,
        isPersistent = false
    ): Observable<StreamQueryResult> {
        return this.scheduler.register(query, previousQueryHash, dashboardId, this.executeQuery, isPersistent);
    }

    /**
     * Observes an existing query, identified by the provided `queryHash`
     * If the query cannot be found, the produced observable will wait for it to exist and then transfer the subscription to the data pipeline observable
     * @param queryHash The hash of the query to observe
     */
    observeQuery(queryHash: QueryHash): Observable<StreamQueryResult> {
        return this.scheduler.observe(queryHash);
    }

    /**
     * Cancels a currently streaming query execution
     * @param query The query to cancel
     * @param executionId The unique id for the query execution instance to cancel
     */
    async cancelQuery(query: UQuery, executionId: QueryExecutionId): Promise<void> {
        // Get the dashboard id of the original entry. (id may have changed since it was created)
        const entry = this.scheduler.queue[query.queryHash];

        if (entry) {
            await this.scopeRecord[query.dataSource.scopeId]?.value.cancel(query, {
                dashboardId: entry.dashboardId,
                executionId,
            });
        }
    }

    /**
     * Removes a query from processing.
     * In order to run this query again, it must be re-registered
     * @param queryHash The hash of the query to deregister
     * @returns True if the query was successfully removed. False if the query could not be found or could not be removed
     */
    deregisterQuery(queryHash: QueryHash): boolean {
        return this.scheduler.destroy(queryHash);
    }

    /**
     * Removes all registered queries without the `isPersistent` flag
     */
    deregisterAllNonPersistentQueries(): void {
        return this.scheduler.destroyNonPersistent();
    }

    /**
     * Removes all registered queries
     */
    deregisterAllQueries(): void {
        return this.scheduler.destroyAll();
    }

    /**
     * Requests the latest data from all currently registered queries
     */
    requestAllQueriesUpdate(): void {
        this.scheduler.requestAll('immediate');
    }

    /**
     * Requests the latest data from the given query
     * @param queryHash The hash of the previously registered query to execute
     */
    requestQueryUpdate(queryHash: QueryHash): void {
        this.scheduler.request(queryHash, 'immediate');
    }

    /**
     * See `QueryScheduler.startTimer`
     */
    startTimer(dashboardId: string, interval: number): void {
        this.scheduler.startTimer(dashboardId, interval);
    }

    /**
     * See `QueryScheduler.cancelTimer`
     */
    cancelTimer(): void {
        this.scheduler.cancelTimer();
    }

    /**
     * See `QueryScheduler.setTimerBackground`
     */
    setTimerBackground(background: boolean): void {
        this.scheduler.setTimerBackground(background);
    }

    /**
     * See `IScope.getDataSourceSchema`
     */
    async getDataSourceSchema(dataSource: UDataSource, dashboardId: string): Promise<Result<KustoSchema>> {
        const scope = this.getScope(dataSource.scopeId);
        if (scope.kind === 'ok') {
            return ok(await scope.value.getDataSourceSchema(dataSource, dashboardId));
        }
        return scope;
    }
    /**
     * See `IScope.deserializeColumn`
     */
    deserializeColumn(
        scopeId: string,
        dataType: string,
        // eslint-disable-next-line @typescript-eslint/no-explicit-any
        values: any[]
        // eslint-disable-next-line @typescript-eslint/no-explicit-any
    ): Result<any[]> {
        const scope = this.getScope(scopeId);

        if (scope.kind === 'ok') {
            return ok(scope.value.deserializeColumn(dataType, values));
        }
        return scope;
    }

    /**
     * Executes a query as a promise
     * @param query The query to execute
     * @param executionId The unique identifier for this query execution instance
     */
    private executeQuery(query: UQuery, meta: QueryMeta): Promise<KustoQueryResult> {
        const emptyResult = QueryService.getEmptyResult();

        const maybeScope = this.getScope(query.dataSource.scopeId);

        if (maybeScope.kind === 'err') {
            return Promise.reject(new Error(maybeScope.err));
        }

        const result = maybeScope.value.observe(query, meta).pipe(
            tap(this.logQueryError(query)),
            scan<QueryResultFragment, ExternalKustoQueryResult>(applyFragment, emptyResult),
            map(
                (queryResult): KustoQueryResult => ({
                    ...queryResult,
                    scopeId: query.dataSource.scopeId,
                })
            ),
            // Return empty result if nothing received from the upstream data source
            // It's necessary to overwrite cached result if necessary
            defaultIfEmpty<KustoQueryResult>({
                ...emptyResult,
                scopeId: query.dataSource.scopeId,
            })
        );

        return result.pipe(timeout(QueryService.queryTimeout)).toPromise();
    }

    private static getEmptyResult(): ExternalKustoQueryResult {
        return {
            rows: [],
            columns: [],
            receivedTime: new Date(),
            source: ResultSource.online,
            isSorted: false,
        };
    }

    private logQueryError(query: UQuery): ErrorObserver<QueryResultFragment> {
        return {
            error: (error) => {
                this.telemetryService.trackException(
                    error,
                    {},
                    {
                        query: query.query,
                        dataSource: query.dataSource,
                    }
                );
            },
        };
    }
}
