type Callback = () => Promise export type AsyncQueue = { push: (task: Callback) => Promise flush: () => Promise size: number } /** * Ensures that each callback pushed onto the queue is executed in series. * Such a quetie 😻 * @param opts.dedupeConcurrent If dedupeConcurrent is `true` it ensures that if multiple * tasks are pushed onto the queue while there is an active task, only the * last one will be executed, once the active task has completed. * e.g. in the below example, only 0 and 3 will be executed. * ``` * const queue = createAsyncQueue({ dedupeConcurrent: true }) * queue.push(async () => console.log(0)) // returns 0 * queue.push(async () => console.log(1)) // returns 3 * queue.push(async () => console.log(2)) // returns 3 * queue.push(async () => console.log(3)) // returns 3 * ``` * */ export function createAsyncQueue(opts = { dedupeConcurrent: false }): AsyncQueue { const { dedupeConcurrent } = opts let queue: Callback[] = [] let running: Promise | undefined let nextPromise = new DeferredPromise() const push = (task: Callback) => { let taskPromise = new DeferredPromise() if (dedupeConcurrent) { queue = [] if (nextPromise.started) nextPromise = new DeferredPromise() taskPromise = nextPromise } queue.push(() => { taskPromise.started = true task().then(taskPromise.resolve).catch(taskPromise.reject) return taskPromise.promise }) if (!running) running = start() return taskPromise.promise } const start = async () => { while (queue.length) { const task = queue.shift()! await task().catch(() => {}) } running = undefined } return { push, flush: () => running || Promise.resolve(), get size() { return queue.length }, } } export const createAsyncQueues = (opts = { dedupeConcurrent: false }) => { const queues: { [queueId: string]: AsyncQueue } = {} const push = (queueId: string, task: Callback) => { if (!queues[queueId]) queues[queueId] = createAsyncQueue(opts) return queues[queueId].push(task) } const flush = (queueId: string) => { if (!queues[queueId]) queues[queueId] = createAsyncQueue(opts) return queues[queueId].flush() } return { push, flush } } class DeferredPromise { started = false resolve: (x: T | PromiseLike) => void = () => {} reject: (x: E) => void = () => {} promise: Promise constructor() { this.promise = new Promise((res, rej) => { this.resolve = res this.reject = rej }) } } // function main() { // const queue = createAsyncQueue() // queue.push(async () => { // console.log(0) // }) // returns 0 // queue.push(async () => { // console.log(1) // return new Promise((resolve, reject) => { // setTimeout(() => { // console.log('12') // resolve() // }, 1000) // }) // }) // returns 3 // queue.push(async () => console.log(2)) // returns 3 // queue.push(async () => console.log(3)) // returns 3 // console.log('hi') // } // main()