import { isEqual } from 'lodash-es'
import { call, put, race, select, spawn, take, takeLatest } from 'typed-redux-saga'
import { Dagster } from '~/types'
import { State } from '~/types/state'
import { isRunActive, sleep, supabase, toToastError } from '~/utils'
import { Action, ActionTypes, Actions } from '../actions'

export function* runSagas() {
	yield* spawn(watchRuns)
	yield* takeLatest<ActionTypes, any>('RUN_ENRICHMENT', runEnrichment)
}

function* runEnrichment(action: Action<'RUN_ENRICHMENT'>) {
	const res = yield* call(async () =>
		supabase.functions.invoke<Dagster.LaunchPipelineExecutionMutation>('runs', {
			body: action.payload
		})
	)
	const pipelineExecution = res.data?.launchPipelineExecution
	if (res.error || pipelineExecution?.__typename !== 'LaunchRunSuccess') {
		const pipelineError = pipelineExecution && 'errors' in pipelineExecution ? pipelineExecution.errors?.[0] : null
		const toastError = yield* call(toToastError, res.error || pipelineError || pipelineExecution)
		yield* put(Actions.addToast(toastError))
	} else {
		yield put(
			Actions.runLaunchedOk({
				...pipelineExecution.run,
				stats: {
					startTime: new Date().toISOString()
				},
				runConfig: {} // TODO: use params passed to run
			})
		)
	}
}

function* watchRuns() {
	let retries = 5
	while (true) {
		const listId = location.pathname.split('/').find(pt => pt.startsWith('pli_'))
		if (!listId || document.hidden) {
			yield* call(sleep, 500)
			continue
		}

		const res = yield* call(async () =>
			supabase.functions.invoke<Dagster.RunFragment[]>('runs', {
				method: 'GET',
				headers: { 'x-list-id': listId }
			})
		)
		if (res.error) {
			if (retries-- > 0) {
				continue
			} else {
				const toastError = yield* call(toToastError, res.error)
				yield* put(Actions.addToast(toastError))
				break
			}
		} else if (res.data) {
			retries = 5
			const runs = yield* select((state: State) => state.enrichments.runs)
			// Prevents re-rendering when runs are the same
			if (!isEqual(runs, res.data)) {
				yield* put(Actions.getRunsOk(res.data))
			}
		}

		const hasActiveRuns = res.data?.some(run => isRunActive(run))

		const { runTriggered } = yield* race({
			runTriggered: take('RUN_ENRICHMENT'),
			wait: call(sleep, hasActiveRuns ? 3000 : 30000)
		})
		if (runTriggered) yield* call(sleep, 500)
	}
}
