import * as angular from 'angular';
import { EventEmitter2, Listener } from 'eventemitter2';
import * as _ from 'lodash';
import { CancellationToken, IAppSettings, ICancellationService } from '.';
import { IHttpUtility, IUriUtility, IUtility } from '../helpers';

interface JobServiceOptions {
	cancellationToken?: CancellationToken;
	status?: string;
	limit?: number;
	fields?: string;
}

export interface JobRequest {
	path: string;
	content?: any;
}

export interface JobResponse {
	statusCode?: number;
	content?: any;
}

export type JobStatus = 'pending' | 'running' | 'completed' | 'failed' | 'canceled';

export interface Job {
	id: string;
	runnerId?: string;
	request?: JobRequest;
	response?: JobResponse;
	status: JobStatus;
	/** An ISO8601-formatted date. */
	created: string;
	/** An ISO8601-formatted date. */
	started?: string;
	/** An ISO8601-formatted date. */
	finished?: string;
}

export interface Jobs {
	items?: Array<Job | undefined>;
}

angular.module('app').factory('jobService', jobService);

function jobService(
	$http: angular.IHttpService,
	$interval: angular.IIntervalService,
	$q: angular.IQService,
	$timeout: angular.ITimeoutService,
	$log: angular.ILogService,
	uriUtility: IUriUtility,
	httpUtility: IHttpUtility,
	appSettings: IAppSettings,
	cancellationService: ICancellationService,
	utility: IUtility
) {
	const jobCompletionPollIntervalSteps = [500, 1000, 5000];
	const requestsPerStep = 10;
	let jobChangeCancellationSource;
	const myRecentJobs: { [status: string]: { [jobId: string]: Job | undefined } | undefined } = {};
	const jobChangeEventName = 'job.change';
	const events = new EventEmitter2({
		wildcard: false,
	});
	let errorCount = 0;

	/**
	 * return promise that
	 * -- resolves when all jobs complete successfully
	 * -- rejects when all jobs complete, not all successfully
	 * -- progress when any job changes status
	 */
	function awaitJobsCompletionAsync(jobIds: ReadonlyArray<string>, options?: JobServiceOptions) {
		options = options || {};

		let pollingIntervalStep = 0;
		let intervalRequestCount = 0;
		let pollingInterval: number | undefined;

		function updateInterval() {
			if (
				intervalRequestCount >= requestsPerStep &&
				pollingIntervalStep < jobCompletionPollIntervalSteps.length - 1
			) {
				pollingIntervalStep++;
				intervalRequestCount = 0;
			}

			pollingInterval = jobCompletionPollIntervalSteps[pollingIntervalStep];
			intervalRequestCount++;
		}

		const jobsById: { [jobId: string]: Job | undefined } = {};

		const deferred = $q.defer<{ items: Array<Job | undefined> }>();

		// eslint-disable-next-line no-shadow
		(function awaitJobsCompletionAsyncCore(jobIds, options) {
			return getJobsAsync(jobIds, {
				cancellationToken: options.cancellationToken,
				fields: options.fields && `${options.fields},items.(id,status,finished)`,
			}).then(response => {
				const remainingJobIds: string[] = [];
				const updatedJobs: Job[] = [];

				for (const job of response!.items!.filter(utility.isNotNullish)) {
					const isJobPending = !job.finished;
					const lastStatus = jobsById[job.id] && jobsById[job.id]!.status;
					if (lastStatus !== job.status && !isJobPending) {
						updatedJobs.push(job);
					}

					jobsById[job.id] = job;
					if (isJobPending) {
						remainingJobIds.push(job.id);
					}
				}

				if (updatedJobs.length) {
					deferred.notify({ updatedJobs });
				}

				if (remainingJobIds.length) {
					updateInterval();

					return $timeout(
						() => awaitJobsCompletionAsyncCore(remainingJobIds, options),
						pollingInterval,
						false
					);
				}
				return {
					items: jobIds.map(id => jobsById[id]),
				};
			});
		})(jobIds, options)
			.then(result => {
				if (!_.some(result.items, { status: 'failed' })) {
					deferred.resolve(result);
				} else {
					deferred.reject(result);
				}
			})
			.catch(response => {
				deferred.reject(httpUtility.getErrorReason(response));
			});

		return deferred.promise;
	}

	function getJobsAsync(jobIds: ReadonlyArray<string>, options?: JobServiceOptions) {
		options = options || {};

		const request = {
			items: jobIds.map(id => ({ id })),
		};

		return $http
			.post<Jobs>(
				uriUtility.fromPattern(`${appSettings.assetServiceBaseUri}jobs/get`, {
					fields: options.fields,
				}),
				request,
				{ timeout: options.cancellationToken }
			)
			.then(response => response.data);
	}

	function cancelJobAsync(jobId: string, runnerId: string | undefined) {
		return $http.post<any>(
			uriUtility.fromPattern(`${appSettings.assetServiceBaseUri}jobs/{id}/cancel`, {
				id: jobId,
				runner: runnerId,
			}),
			null
		);
	}

	function restartJobAsync(jobId: string, runnerId: string) {
		return $http.post<any>(
			uriUtility.fromPattern(`${appSettings.assetServiceBaseUri}jobs/{id}/restart`, {
				id: jobId,
				runner: runnerId,
			}),
			null
		);
	}

	function adminGetJobServerStatusAsync(cancellationToken: CancellationToken) {
		return $q
			.all({
				service: $http.get(`${appSettings.assetServiceBaseUri}jobs/service`, {
					timeout: cancellationToken,
				}),
				servers: $http.get(
					`${appSettings.assetServiceBaseUri}jobs/servers?status=paused,running&limit=1000`,
					{ timeout: cancellationToken }
				),
			})
			.then(responses => ({
				service: responses.service.data,
				servers: responses.servers.data.items,
			}));
	}

	function adminGetJobsAsync(
		status: string | ReadonlyArray<string>,
		cancellationToken: CancellationToken
	) {
		const params = {
			status: Array.isArray(status) ? status.join(',') : status,
			limit: 1000,
			fields:
				'items.(id,bucketId,status,agent,created,started,serverId,runnerId,request.content.(assetIds,ops.op))',
		};

		return $http
			.get<any>(uriUtility.fromPattern(`${appSettings.assetServiceBaseUri}jobs/`, params), {
				timeout: cancellationToken,
			})
			.then(response => response.data);
	}

	function adminGetJobUri(jobId: string) {
		return uriUtility.fromPattern(`${appSettings.assetServiceBaseUri}jobs/{id}`, {
			id: jobId,
		});
	}

	function adminStartJobServerAsync(serverId: string) {
		return $http.post<any>(
			uriUtility.fromPattern(`${appSettings.assetServiceBaseUri}jobs/servers/{id}/start`, {
				id: serverId,
			}),
			null
		);
	}

	function adminPauseJobServerAsync(serverId: string) {
		return $http.post<any>(
			uriUtility.fromPattern(`${appSettings.assetServiceBaseUri}jobs/servers/{id}/pause`, {
				id: serverId,
			}),
			null
		);
	}

	function adminStopJobServerAsync(serverId: string) {
		return $http.post<any>(
			uriUtility.fromPattern(`${appSettings.assetServiceBaseUri}jobs/servers/{id}/stop`, {
				id: serverId,
			}),
			null
		);
	}

	function adminStartJobServiceAsync() {
		return $http.post<any>(`${appSettings.assetServiceBaseUri}jobs/service/start`, null);
	}

	function adminPauseJobServiceAsync() {
		return $http.post<any>(`${appSettings.assetServiceBaseUri}jobs/service/pause`, null);
	}

	function addStatusChangedHandler(options: { limit?: number }, handler: Listener) {
		events.addListener(jobChangeEventName, handler);

		// create connection only when there is a client listening
		if (!jobChangeCancellationSource || jobChangeCancellationSource.token.isCancelled) {
			jobChangeCancellationSource = cancellationService.createCancellationSource();
			createJobChangeWsConnection(options, jobChangeCancellationSource.token);
		}
	}

	function removeStatusChangedHandler(handler: Listener) {
		events.removeListener(jobChangeEventName, handler);

		// close connection if no client is listening
		if (jobChangeCancellationSource && events.listeners(jobChangeEventName).length === 0) {
			jobChangeCancellationSource.cancel();
			jobChangeCancellationSource = null;
		}
	}

	function createJobChangeWsConnection(
		options: JobServiceOptions,
		cancellationToken: CancellationToken
	) {
		const params = {
			status: options.status || 'Pending,Running',
			limit: options.limit || 25,
			fields:
				options.fields ||
				'items.(id,bucketId,status,agent,created,started,finished,runnerId,request.content.(ops,assetIds),response.content.(id,metadata,file,kind))',
		};

		return $http
			.post<any>(uriUtility.fromPattern('/proxy/websockets/files/v1/jobs/me', params), null)
			.then(response => {
				const url = response.data?.url;
				if (response.status !== 200 || !url) {
					$log.warn('Could not get Job WebSocket URL', response);
					return;
				}

				let pinger: angular.IPromise<any> | undefined;

				const jobChangeWSConnection = new WebSocket(url);
				jobChangeWSConnection.onopen = function () {
					$log.debug('Job WebSocket connection opened');
					errorCount = 0;

					// keep the websocket alive, assuming a 30 second request timeout
					pinger = $interval(() => {
						jobChangeWSConnection.send('{}');
					}, 25000);
				};

				jobChangeWSConnection.onmessage = function (event) {
					const data = JSON.parse(event.data);
					const jobs: Job[] = data.items ? data.items : [data];
					$log.debug('Job WebSocket message received', event, data);

					jobs.forEach(job => {
						Object.keys(myRecentJobs).forEach(key => {
							if (myRecentJobs[key] && myRecentJobs[key]![job.id]) {
								delete myRecentJobs[key]![job.id];
							}
						});
						if (!myRecentJobs[job.status]) {
							myRecentJobs[job.status] = {};
						}
						myRecentJobs[job.status]![job.id] = job;
					});

					events.emit(jobChangeEventName, jobs);
				};

				jobChangeWSConnection.onclose = function (event) {
					$log.debug('Job WebSocket connection closed', event);

					if (pinger) {
						$interval.cancel(pinger);
						pinger = undefined;
					}

					reconnect(options, cancellationToken);
				};

				jobChangeWSConnection.onerror = function (event) {
					$log.warn('Job WebSocket connection error', event);
					errorCount++;

					if (pinger) {
						$interval.cancel(pinger);
						pinger = undefined;
					}
				};

				cancellationToken.whenCanceled(() => {
					jobChangeWSConnection.close();
				});
			});
	}

	function reconnect(options: JobServiceOptions, cancellationToken: CancellationToken) {
		// if connection was not cancelled, attempt to reconnect after 3 seconds.
		if (cancellationToken.isCancelled) {
			return;
		}

		if (errorCount > 10) {
			$log.info('Job WebSocket connection not reconnecting, error limit reached.');
			return;
		}

		$log.info('Job WebSocket connection reconnecting in 3 seconds');

		$timeout(
			() => {
				createJobChangeWsConnection(options, cancellationToken);
			},
			3000,
			false
		);
	}

	return {
		awaitJobsCompletionAsync,
		getJobsAsync,
		get myJobs() {
			return _.cloneDeep(myRecentJobs);
		},
		cancelJobAsync,
		restartJobAsync,
		adminGetJobServerStatusAsync,
		adminGetJobsAsync,
		adminGetJobUri,
		adminStartJobServerAsync,
		adminPauseJobServerAsync,
		adminStopJobServerAsync,
		adminStartJobServiceAsync,
		adminPauseJobServiceAsync,
		addStatusChangedHandler,
		removeStatusChangedHandler,
	};
}

export type IJobService = ReturnType<typeof jobService>;
