import EventEmitter from 'events' /** * ## default options */ let defaults = { // start unpaused ? active: true, // requests per `ratePer` ms rate: 40, // ms per `rate` requests ratePer: 40000, // max concurrent requests concurrent: 20 } /** * ## Throttle * The throttle object. * * @class * @param {object} options - key value options */ class Throttle extends EventEmitter { constructor (options) { super() // instance properties this._options({ _requestTimes: [0], _current: 0, _buffer: [], _serials: {}, _timeout: false }) this._options(defaults) this._options(options) } /** * ## _options * updates options on instance * * @method * @param {Object} options - key value object * @returns null */ _options (options) { for (let property in options) { if (options.hasOwnProperty(property)) { this[property] = options[property] } } } /** * ## options * thin wrapper for _options * * * calls `this.cycle()` * * adds alternate syntax * * alternate syntax: * throttle.options('active', true) * throttle.options({active: true}) * * @method * @param {Object} options - either key value object or keyname * @param {Mixed} [value] - value for key * @returns null */ options (options, value) { if ( (typeof options === 'string') && (value) ) { options = { options: value } } this._options(options) this.cycle() } /** * ## next * checks whether instance has available capacity and calls throttle.send() * * @returns {Boolean} */ next () { let throttle = this // make requestTimes `throttle.rate` long. Oldest request will be 0th index throttle._requestTimes = throttle._requestTimes.slice(throttle.rate * -1) if ( // paused !(throttle.active) || // at concurrency limit (throttle._current >= throttle.concurrent) || // less than `ratePer` throttle._isRateBound() || // something waiting in the throttle !(throttle._buffer.length) ) { return false } let idx = throttle._buffer.findIndex((request) => { return !request.serial || !throttle._serials[request.serial] }) if (idx === -1) { throttle._isSerialBound = true return false } throttle.send(throttle._buffer.splice(idx, 1)[0]) return true } /** * ## serial * updates throttle.\_serials and throttle.\_isRateBound * * serial subthrottles allow some requests to be serialised, whilst maintaining * their place in the queue. The _serials structure keeps track of what serial * queues are waiting for a response. * * ``` * throttle._serials = { * 'example.com/end/point': true, * 'example.com/another': false * } * ``` * * @param {Request} request superagent request * @param {Boolean} state new state for serial */ serial (request, state) { let serials = this._serials let throttle = this if (request.serial === false) { return } if (state === undefined) { return serials[request.serial] } if (state === false) { throttle._isSerialBound = false } serials[request.serial] = state } /** * ## _isRateBound * returns true if throttle is bound by rate * * @returns {Boolean} */ _isRateBound () { let throttle = this return ( ((Date.now() - throttle._requestTimes[0]) < throttle.ratePer) && (throttle._buffer.length > 0) ) } /** * ## cycle * an iterator of sorts. Should be called when * * - something added to throttle (check if it can be sent immediately) * - `ratePer` ms have elapsed since nth last call where n is `rate` (may have * available rate) * - some request has ended (may have available concurrency) * * @param {Request} request the superagent request * @returns null */ cycle (request) { let throttle = this if (request) { throttle._buffer.push(request) } clearTimeout(throttle._timeout) // fire requests // throttle.next will return false if there's no capacity or throttle is // drained while (throttle.next()) {} // if bound by rate, set timeout to reassess later. if (throttle._isRateBound()) { let timeout // defined rate timeout = throttle.ratePer // less ms elapsed since oldest request timeout -= (Date.now() - throttle._requestTimes[0]) // plus 1 ms to ensure you don't fire a request exactly ratePer ms later timeout += 1 throttle._timeout = setTimeout(function () { throttle.cycle() }, timeout) } } /** * ## send * * sends a queued request. * * @param {Request} request superagent request * @returns null */ send (request) { let throttle = this throttle.serial(request, true) // declare callback within this enclosure, for access to throttle & request function cleanup (err, response) { throttle._current -= 1 if (err && EventEmitter.listenerCount(throttle, 'error')) { throttle.emit('error', response) } throttle.emit('received', request) if ( (!throttle._buffer.length) && (!throttle._current) ) { throttle.emit('drained') } throttle.serial(request, false) throttle.cycle() // original `callback` was stored at `request._maskedCallback` request._maskedCallback(err, response) } // original `request.end` was stored at `request._maskedEnd` request._maskedEnd(cleanup) throttle._requestTimes.push(Date.now()) throttle._current += 1 this.emit('sent', request) } /** * ## plugin * * `superagent` `use` function should refer to this plugin method a la * `.use(throttle.plugin())` * * mask the original `.end` and store the callback passed in * * @method * @param {string} serial any string is ok, it's just a namespace * @returns null */ plugin (serial) { let throttle = this // let patch = function(request) { return (request) => { request.throttle = throttle request.serial = serial || false // replace request.end request._maskedEnd = request.end request.end = function (callback) { // when superagent receives a redirect header it essentially resets // the original request and calls `end` again with the new target url // that means throttle hasn't cleaned up yet, and still considers the // request to be in flight. // Therefore, when called by a redirect, just pass through to maskedEnd if (request._redirects > 0) return request._maskedEnd(callback) request._maskedCallback = callback || function () {} // place this request in the queue request.throttle.cycle(request) return request } return request } } } module.exports = Throttle