Home Reference Source

lib/endpoints/Queue.js

// TODO: Product category at least has a hourly request quota that is
// not tracked anywhere currently.

// TODO: so, it turns out that the throttle headers don't come through on throttled requests
// only on the request that was last successful.
// So, we need to fix mws-simple to give headers (does it already do that?) for non-throttled
// requests, so that we can keep track of actual throttle in Queue.
// We need to do this because we can not guarantee that our Queue instance is the only
// thing in existence that is potentially affecting Quota (previous instances may have, or
// completely external accesses from other tools), so we need to make sure that our data is
// adjusted to compensate for what Amazon tells us regarding quota information.

class QueueItem {
    constructor({
        api,
        category,
        action,
        params,
        options,
        resolver,
        rejecter,
        onComplete,
        onFailure,
    }) {
        this.api = api;
        this.category = category;
        this.action = action;
        this.params = params;
        this.options = options;
        this.resolver = resolver;
        this.rejecter = rejecter;
        this.onComplete = onComplete;
        this.onFailure = onFailure;

        this.run = this.run.bind(this);
    }

    async run() {
        try {
            const res = await this.api.doRequest(this.params, this.options);
            this.resolver(res);
            this.onComplete();
        } catch (err) {
            // notify Queue that the request failed, so Queue can determine if it should reject
            // or retry.
            this.onFailure(err, this);
        }
    }
}

class Queue {
    constructor({
        api,
        category,
        action,
        maxInFlight,
        restoreRate,
    }) {
        this.api = api;
        this.category = category;
        this.action = action;
        this.inFlight = 0;
        this.maxInFlight = maxInFlight || 20;
        this.restoreRate = restoreRate || 0;
        this.queue = [];
        this.queueTimer = null;
        // toggle this to true when we hit a throttle, toggle false when queue inFlight === 0
        // TODO: remove this when we can keep track of throttling from headers
        this.singleDrain = false;

        this.throttle = this.throttle.bind(this);
        this.setThrottleTimer = this.setThrottleTimer.bind(this);
        this.onQueueTimer = this.onQueueTimer.bind(this);
        this.drainQueue = this.drainQueue.bind(this);
        this.complete = this.complete.bind(this);
        this.fail = this.fail.bind(this);
        this.runQueue = this.runQueue.bind(this);
        this.request = this.request.bind(this);
        this.throttleCalls = 0;
    }

    throttle() {
        this.throttleCalls += 1;
        this.singleDrain = true;
        this.setThrottleTimer();
    }

    setThrottleTimer() {
        if (this.queueTimer) {
            clearTimeout(this.queueTimer);
        }
        const time = (((60 / this.restoreRate) || 1) * 1000) + 250;
        // console.warn('* setThrottleTimer', time, this.throttleCalls);
        this.queueTimer = setTimeout(this.onQueueTimer, time);
    }

    onQueueTimer() {
        // console.warn('* throttle timeout, draining');
        this.queueTimer = null;
        this.drainQueue();
    }

    drainQueue() {
        if (this.queueTimer) {
            // console.warn('* ignoring drain request, waiting on throttle timer');
            return;
        }
        if (!this.queue.length) {
            // console.warn('* ignoring drain request, queue empty');
            return;
        }

        // TODO: this should schedule staggered runs, so that if we get another
        // throttle response, we get halted.
        // console.warn('* drainQueue length at start', this.queue.length);

        if (!this.singleDrain && this.queue.length > 1) {
            while (this.queue.length && this.inFlight < this.maxInFlight - 1) {
                this.runQueue();
            }
        } else {
            this.runQueue();
        }

        if (this.inFlight >= this.maxInFlight * 0.5) {
            this.throttle();
        }
        // console.warn('* drainQueue at end', this.queue.length, this.inFlight);
    }

    complete() {
        this.inFlight -= 1;
        if (this.inFlight < 1) {
            this.inFlight = 0;
            if (this.singleDrain) {
                // assume quota was blown, enforce a max restoreRate timeout before clearing singleDrain
                const time = (((60 / this.restoreRate) || 1) * 1000 * this.maxInFlight) + 250;
                if (this.resetDrainTimeout) {
                    clearTimeout(this.resetDrainTimeout);
                }
                this.resetDrainTimeout = setTimeout(() => { this.singleDrain = false; }, time);
            }
        }
        setImmediate(this.drainQueue);
    }

    fail(err, failedItem) {
        // console.warn('* Queue.fail', failedItem.category, failedItem.action, err);
        const { error } = err;
        if (error instanceof this.api.mws.ServerError) {
            if (error.code === 503) {
                console.warn('* retry -- throttle hit for', failedItem.category, failedItem.action);
                this.throttle();
                this.queue.unshift(failedItem);
                this.inFlight -= 1;
                // setTimeout(() => { this.inFlight -= 1; }, 10000);
                return;
            }
        }
        // console.warn('* non-throttle failure', error);
        failedItem.rejecter(err);
        this.complete();
    }

    runQueue() {
        if (this.queueTimer) {
            // console.warn('* ignoring run request, throttle timer running');
            return;
        }
        if (this.inFlight >= this.maxInFlight) {
            console.warn('* RUNQUEUE ERROR, INFLIGHT  >= MAXINFLIGHT');
            return;
        }
        const item = this.queue.shift();
        if (item) {
            // console.warn('* runQueue', item.category, item.action, this.queue.length);
            this.inFlight += 1;
            item.run();
        }
    }

    request(params, options) {
        // console.warn('* request', this.category, this.action);
        return new Promise((resolve, reject) => {
            const action = new QueueItem({
                api: this.api,
                category: this.category,
                action: this.action,
                params,
                options,
                resolver: resolve,
                rejecter: reject,
                onComplete: this.complete,
                onFailure: this.fail,
            });
            this.queue.push(action);
            setImmediate(this.drainQueue);
        });
    }
}

const queueMap = new Map();

const getQueue = (category, action, seller) => {
    const lookupKey = `${seller}/${category}/${action}`;
    return queueMap.get(lookupKey);
};

const registerQueue = (q, category, action, seller) => {
    const lookupKey = `${seller}/${category}/${action}`;
    queueMap.set(lookupKey, q);
};

module.exports = {
    Queue,
    getQueue,
    registerQueue,
};