PoolCluster.js 6.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288
  1. var Pool = require('./Pool');
  2. var PoolConfig = require('./PoolConfig');
  3. var PoolNamespace = require('./PoolNamespace');
  4. var PoolSelector = require('./PoolSelector');
  5. var Util = require('util');
  6. var EventEmitter = require('events').EventEmitter;
  7. module.exports = PoolCluster;
  8. /**
  9. * PoolCluster
  10. * @constructor
  11. * @param {object} [config] The pool cluster configuration
  12. * @public
  13. */
  14. function PoolCluster(config) {
  15. EventEmitter.call(this);
  16. config = config || {};
  17. this._canRetry = typeof config.canRetry === 'undefined' ? true : config.canRetry;
  18. this._defaultSelector = config.defaultSelector || 'RR';
  19. this._removeNodeErrorCount = config.removeNodeErrorCount || 5;
  20. this._restoreNodeTimeout = config.restoreNodeTimeout || 0;
  21. this._closed = false;
  22. this._findCaches = Object.create(null);
  23. this._lastId = 0;
  24. this._namespaces = Object.create(null);
  25. this._nodes = Object.create(null);
  26. }
  27. Util.inherits(PoolCluster, EventEmitter);
  28. PoolCluster.prototype.add = function add(id, config) {
  29. if (this._closed) {
  30. throw new Error('PoolCluster is closed.');
  31. }
  32. var nodeId = typeof id === 'object'
  33. ? 'CLUSTER::' + (++this._lastId)
  34. : String(id);
  35. if (this._nodes[nodeId] !== undefined) {
  36. throw new Error('Node ID "' + nodeId + '" is already defined in PoolCluster.');
  37. }
  38. var poolConfig = typeof id !== 'object'
  39. ? new PoolConfig(config)
  40. : new PoolConfig(id);
  41. this._nodes[nodeId] = {
  42. id : nodeId,
  43. errorCount : 0,
  44. pool : new Pool({config: poolConfig}),
  45. _offlineUntil : 0
  46. };
  47. this._clearFindCaches();
  48. };
  49. PoolCluster.prototype.end = function end(callback) {
  50. var cb = callback !== undefined
  51. ? callback
  52. : _cb;
  53. if (typeof cb !== 'function') {
  54. throw TypeError('callback argument must be a function');
  55. }
  56. if (this._closed) {
  57. process.nextTick(cb);
  58. return;
  59. }
  60. this._closed = true;
  61. var calledBack = false;
  62. var nodeIds = Object.keys(this._nodes);
  63. var waitingClose = 0;
  64. function onEnd(err) {
  65. if (!calledBack && (err || --waitingClose <= 0)) {
  66. calledBack = true;
  67. cb(err);
  68. }
  69. }
  70. for (var i = 0; i < nodeIds.length; i++) {
  71. var nodeId = nodeIds[i];
  72. var node = this._nodes[nodeId];
  73. waitingClose++;
  74. node.pool.end(onEnd);
  75. }
  76. if (waitingClose === 0) {
  77. process.nextTick(onEnd);
  78. }
  79. };
  80. PoolCluster.prototype.of = function(pattern, selector) {
  81. pattern = pattern || '*';
  82. selector = selector || this._defaultSelector;
  83. selector = selector.toUpperCase();
  84. if (typeof PoolSelector[selector] === 'undefined') {
  85. selector = this._defaultSelector;
  86. }
  87. var key = pattern + selector;
  88. if (typeof this._namespaces[key] === 'undefined') {
  89. this._namespaces[key] = new PoolNamespace(this, pattern, selector);
  90. }
  91. return this._namespaces[key];
  92. };
  93. PoolCluster.prototype.remove = function remove(pattern) {
  94. var foundNodeIds = this._findNodeIds(pattern, true);
  95. for (var i = 0; i < foundNodeIds.length; i++) {
  96. var node = this._getNode(foundNodeIds[i]);
  97. if (node) {
  98. this._removeNode(node);
  99. }
  100. }
  101. };
  102. PoolCluster.prototype.getConnection = function(pattern, selector, cb) {
  103. var namespace;
  104. if (typeof pattern === 'function') {
  105. cb = pattern;
  106. namespace = this.of();
  107. } else {
  108. if (typeof selector === 'function') {
  109. cb = selector;
  110. selector = this._defaultSelector;
  111. }
  112. namespace = this.of(pattern, selector);
  113. }
  114. namespace.getConnection(cb);
  115. };
  116. PoolCluster.prototype._clearFindCaches = function _clearFindCaches() {
  117. this._findCaches = Object.create(null);
  118. };
  119. PoolCluster.prototype._decreaseErrorCount = function _decreaseErrorCount(node) {
  120. var errorCount = node.errorCount;
  121. if (errorCount > this._removeNodeErrorCount) {
  122. errorCount = this._removeNodeErrorCount;
  123. }
  124. if (errorCount < 1) {
  125. errorCount = 1;
  126. }
  127. node.errorCount = errorCount - 1;
  128. if (node._offlineUntil) {
  129. node._offlineUntil = 0;
  130. this.emit('online', node.id);
  131. }
  132. };
  133. PoolCluster.prototype._findNodeIds = function _findNodeIds(pattern, includeOffline) {
  134. var currentTime = 0;
  135. var foundNodeIds = this._findCaches[pattern];
  136. if (foundNodeIds === undefined) {
  137. var expression = patternRegExp(pattern);
  138. var nodeIds = Object.keys(this._nodes);
  139. foundNodeIds = nodeIds.filter(function (id) {
  140. return id.match(expression);
  141. });
  142. this._findCaches[pattern] = foundNodeIds;
  143. }
  144. if (includeOffline) {
  145. return foundNodeIds;
  146. }
  147. return foundNodeIds.filter(function (nodeId) {
  148. var node = this._getNode(nodeId);
  149. if (!node._offlineUntil) {
  150. return true;
  151. }
  152. if (!currentTime) {
  153. currentTime = getMonotonicMilliseconds();
  154. }
  155. return node._offlineUntil <= currentTime;
  156. }, this);
  157. };
  158. PoolCluster.prototype._getNode = function _getNode(id) {
  159. return this._nodes[id] || null;
  160. };
  161. PoolCluster.prototype._increaseErrorCount = function _increaseErrorCount(node) {
  162. var errorCount = ++node.errorCount;
  163. if (this._removeNodeErrorCount > errorCount) {
  164. return;
  165. }
  166. if (this._restoreNodeTimeout > 0) {
  167. node._offlineUntil = getMonotonicMilliseconds() + this._restoreNodeTimeout;
  168. this.emit('offline', node.id);
  169. return;
  170. }
  171. this._removeNode(node);
  172. this.emit('remove', node.id);
  173. };
  174. PoolCluster.prototype._getConnection = function(node, cb) {
  175. var self = this;
  176. node.pool.getConnection(function (err, connection) {
  177. if (err) {
  178. self._increaseErrorCount(node);
  179. cb(err);
  180. return;
  181. } else {
  182. self._decreaseErrorCount(node);
  183. }
  184. connection._clusterId = node.id;
  185. cb(null, connection);
  186. });
  187. };
  188. PoolCluster.prototype._removeNode = function _removeNode(node) {
  189. delete this._nodes[node.id];
  190. this._clearFindCaches();
  191. node.pool.end(_noop);
  192. };
  193. function getMonotonicMilliseconds() {
  194. var ms;
  195. if (typeof process.hrtime === 'function') {
  196. ms = process.hrtime();
  197. ms = ms[0] * 1e3 + ms[1] * 1e-6;
  198. } else {
  199. ms = process.uptime() * 1000;
  200. }
  201. return Math.floor(ms);
  202. }
  203. function isRegExp(val) {
  204. return typeof val === 'object'
  205. && Object.prototype.toString.call(val) === '[object RegExp]';
  206. }
  207. function patternRegExp(pattern) {
  208. if (isRegExp(pattern)) {
  209. return pattern;
  210. }
  211. var source = pattern
  212. .replace(/([.+?^=!:${}()|\[\]\/\\])/g, '\\$1')
  213. .replace(/\*/g, '.*');
  214. return new RegExp('^' + source + '$');
  215. }
  216. function _cb(err) {
  217. if (err) {
  218. throw err;
  219. }
  220. }
  221. function _noop() {}