0% found this document useful (0 votes)
38 views6 pages

Pool

Uploaded by

Prakash Shah
Copyright
© © All Rights Reserved
We take content rights seriously. If you suspect this is your content, claim it here.
Available Formats
Download as TXT, PDF, TXT or read online on Scribd
0% found this document useful (0 votes)
38 views6 pages

Pool

Uploaded by

Prakash Shah
Copyright
© © All Rights Reserved
We take content rights seriously. If you suspect this is your content, claim it here.
Available Formats
Download as TXT, PDF, TXT or read online on Scribd
You are on page 1/ 6

"use strict";

Object.defineProperty(exports, "__esModule", { value: true });


exports.Pool = exports.RequestError = exports.ServiceNotAvailableError = void 0;
const exponential_1 = require("./backoff/exponential");
const host_1 = require("./host");
const http = require("http");
const https = require("https");
const querystring = require("querystring");
/**
* Status codes that will cause a host to be marked as 'failed' if we get
* them from a request to Influx.
* @type {Array}
*/
const resubmitErrorCodes = [
"ETIMEDOUT",
"ESOCKETTIMEDOUT",
"ECONNRESET",
"ECONNREFUSED",
"EHOSTUNREACH",
];
/**
* An ServiceNotAvailableError is returned as an error from requests that
* result in a > 500 error code.
*/
class ServiceNotAvailableError extends Error {
constructor(message) {
super();
this.message = message;
Object.setPrototypeOf(this, ServiceNotAvailableError.prototype);
}
}
exports.ServiceNotAvailableError = ServiceNotAvailableError;
/**
* An RequestError is returned as an error from requests that
* result in a 300 <= error code <= 500.
*/
class RequestError extends Error {
constructor(req, res, body) {
super();
this.req = req;
this.res = res;
this.message = `A ${res.statusCode} ${res.statusMessage} error occurred: $
{body}`;
Object.setPrototypeOf(this, RequestError.prototype);
}
static Create(req, res, callback) {
let body = "";
res.on("data", (str) => {
body += str.toString();
});
res.on("end", () => callback(new RequestError(req, res, body)));
}
}
exports.RequestError = RequestError;
/**
* Creates a function generation that returns a wrapper which only allows
* through the first call of any function that it generated.
*/
function doOnce() {
let handled = false;
return (fn) => {
return (arg) => {
if (handled) {
return;
}
handled = true;
fn(arg);
};
};
}
function setToArray(itemSet) {
const output = [];
itemSet.forEach((value) => {
output.push(value);
});
return output;
}
const request = (options, callback) => {
if (options.protocol === "https:") {
return https.request(options, callback);
}
return http.request(options, callback);
};
/**
*
* The Pool maintains a list available Influx hosts and dispatches requests
* to them. If there are errors connecting to hosts, it will disable that
* host for a period of time.
*/
class Pool {
/**
* Creates a new Pool instance.
* @param {IPoolOptions} options
*/
constructor(options) {
this._options = Object.assign({ backoff: new
exponential_1.ExponentialBackoff({
initial: 300,
max: 10 * 1000,
random: 1,
}), maxRetries: 2, requestTimeout: 30 * 1000 }, options);
this._index = 0;
this._hostsAvailable = new Set();
this._hostsDisabled = new Set();
this._timeout = this._options.requestTimeout;
}
/**
* Returns a list of currently active hosts.
* @return {Host[]}
*/
getHostsAvailable() {
return setToArray(this._hostsAvailable);
}
/**
* Returns a list of hosts that are currently disabled due to network
* errors.
* @return {Host[]}
*/
getHostsDisabled() {
return setToArray(this._hostsDisabled);
}
/**
* Inserts a new host to the pool.
*/
addHost(url, options = {}) {
const host = new host_1.Host(url, this._options.backoff.reset(), options);
this._hostsAvailable.add(host);
return host;
}
/**
* Returns true if there's any host available to by queried.
* @return {Boolean}
*/
hostIsAvailable() {
return this._hostsAvailable.size > 0;
}
/**
* Makes a request and calls back with the response, parsed as JSON.
* An error is returned on a non-2xx status code or on a parsing exception.
*/
json(options) {
return this.text(options).then((res) => JSON.parse(res));
}
/**
* Makes a request and resolves with the plain text response,
* if possible. An error is raised on a non-2xx status code.
*/
text(options) {
return new Promise((resolve, reject) => {
this.stream(options, (err, res) => {
if (err) {
return reject(err);
}
let output = "";
res.on("data", (str) => {
output += str.toString();
});
res.on("end", () => resolve(output));
});
});
}
/**
* Makes a request and discards any response body it receives.
* An error is returned on a non-2xx status code.
*/
discard(options) {
return new Promise((resolve, reject) => {
this.stream(options, (err, res) => {
if (err) {
return reject(err);
}
res.on("data", () => {
/* ignore */
});
res.on("end", () => resolve());
});
});
}
/**
* Ping sends out a request to all available Influx servers, reporting on
* their response time and version number.
*/
ping(timeout, path = "/ping", auth = undefined) {
const todo = [];
setToArray(this._hostsAvailable)
.concat(setToArray(this._hostsDisabled))
.forEach((host) => {
const start = Date.now();
const url = host.url;
const once = doOnce();
return todo.push(new Promise((resolve) => {
const headers = {};
if (typeof auth !== "undefined") {
const encodedAuth = Buffer.from(auth).toString("base64");
headers["Authorization"] = `Basic ${encodedAuth}`;
}
const req = request(Object.assign({ hostname: url.hostname, method:
"GET", path, port: Number(url.port), protocol: url.protocol, timeout, headers:
headers }, host.options), once((res) => {
resolve({
url,
res: res.resume(),
online: res.statusCode < 300,
rtt: Date.now() - start,
version: res.headers["x-influxdb-version"],
});
}));
const fail = once(() => {
req.abort();
resolve({
online: false,
res: null,
rtt: Infinity,
url,
version: null,
});
});
// Support older Nodes and polyfills which don't allow .timeout()
in
// the request options, wrapped in a conditional for even worse
// polyfills. See:
https://github.com/node-influx/node-influx/issues/221
if (typeof req.setTimeout === "function") {
req.setTimeout(timeout, () => {
fail.call(fail, arguments);
}); // Tslint:disable-line
}
req.on("timeout", fail);
req.on("error", fail);
req.end();
}));
});
return Promise.all(todo);
}
/**
* Makes a request and calls back with the IncomingMessage stream,
* if possible. An error is returned on a non-2xx status code.
*/
stream(options, callback) {
if (!this.hostIsAvailable()) {
return callback(new ServiceNotAvailableError("No host available"),
null);
}
const once = doOnce();
const host = this._getHost();
let path = host.url.pathname === "/" ? "" : host.url.pathname;
path += options.path;
if (options.query) {
path += "?" + querystring.stringify(options.query);
}
const req = request(Object.assign({ headers: {
"content-length": options.body ? Buffer.from(options.body).length :
0,
}, hostname: host.url.hostname, method: options.method, path, port:
Number(host.url.port), protocol: host.url.protocol, timeout: this._timeout },
host.options), once((res) => {
res.setEncoding("utf8");
if (res.statusCode >= 500) {
res.on("data", () => {
/* ignore */
});
res.on("end", () => {
return this._handleRequestError(new
ServiceNotAvailableError(res.statusMessage), host, options, callback);
});
return;
}
if (res.statusCode >= 300) {
return RequestError.Create(req, res, (err) => callback(err, res));
}
host.success();
return callback(undefined, res);
}));
// Handle network or HTTP parsing errors:
req.on("error", once((err) => {
this._handleRequestError(err, host, options, callback);
}));
// Handle timeouts:
req.on("timeout", once(() => {
req.abort();
this._handleRequestError(new ServiceNotAvailableError("Request timed
out"), host, options, callback);
}));
// Support older Nodes and polyfills which don't allow .timeout() in the
// request options, wrapped in a conditional for even worse polyfills. See:
// https://github.com/node-influx/node-influx/issues/221
if (typeof req.setTimeout === "function") {
req.setTimeout(host.options.timeout || this._timeout); //
Tslint:disable-line
}
// Write out the body:
if (options.body) {
req.write(options.body);
}
req.end();
}
/**
* Returns the next available host for querying.
* @return {Host}
*/
_getHost() {
const available = setToArray(this._hostsAvailable);
const host = available[this._index];
this._index = (this._index + 1) % available.length;
return host;
}
/**
* Re-enables the provided host, returning it to the pool to query.
* @param {Host} host
*/
_enableHost(host) {
this._hostsDisabled.delete(host);
this._hostsAvailable.add(host);
}
/**
* Disables the provided host, removing it from the query pool. It will be
* re-enabled after a backoff interval
*/
_disableHost(host) {
const delay = host.fail();
if (delay > 0) {
this._hostsAvailable.delete(host);
this._hostsDisabled.add(host);
this._index %= Math.max(1, this._hostsAvailable.size);
setTimeout(() => this._enableHost(host), delay);
}
}
_handleRequestError(err, host, options, callback) {
if (!(err instanceof ServiceNotAvailableError) &&
!resubmitErrorCodes.includes(err.code)) {
return callback(err, null);
}
this._disableHost(host);
const retries = options.retries || 0;
if (retries < this._options.maxRetries && this.hostIsAvailable()) {
options.retries = retries + 1;
return this.stream(options, callback);
}
callback(err, null);
}
}
exports.Pool = Pool;

You might also like