Protocol.js 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463
  1. var Parser = require('./Parser');
  2. var Sequences = require('./sequences');
  3. var Packets = require('./packets');
  4. var Stream = require('stream').Stream;
  5. var Util = require('util');
  6. var PacketWriter = require('./PacketWriter');
  7. module.exports = Protocol;
  8. Util.inherits(Protocol, Stream);
  9. function Protocol(options) {
  10. Stream.call(this);
  11. options = options || {};
  12. this.readable = true;
  13. this.writable = true;
  14. this._config = options.config || {};
  15. this._connection = options.connection;
  16. this._callback = null;
  17. this._fatalError = null;
  18. this._quitSequence = null;
  19. this._handshake = false;
  20. this._handshaked = false;
  21. this._ended = false;
  22. this._destroyed = false;
  23. this._queue = [];
  24. this._handshakeInitializationPacket = null;
  25. this._parser = new Parser({
  26. onError : this.handleParserError.bind(this),
  27. onPacket : this._parsePacket.bind(this),
  28. config : this._config
  29. });
  30. }
  31. Protocol.prototype.write = function(buffer) {
  32. this._parser.write(buffer);
  33. return true;
  34. };
  35. Protocol.prototype.handshake = function handshake(options, callback) {
  36. if (typeof options === 'function') {
  37. callback = options;
  38. options = {};
  39. }
  40. options = options || {};
  41. options.config = this._config;
  42. var sequence = this._enqueue(new Sequences.Handshake(options, callback));
  43. this._handshake = true;
  44. return sequence;
  45. };
  46. Protocol.prototype.query = function query(options, callback) {
  47. return this._enqueue(new Sequences.Query(options, callback));
  48. };
  49. Protocol.prototype.changeUser = function changeUser(options, callback) {
  50. return this._enqueue(new Sequences.ChangeUser(options, callback));
  51. };
  52. Protocol.prototype.ping = function ping(options, callback) {
  53. if (typeof options === 'function') {
  54. callback = options;
  55. options = {};
  56. }
  57. return this._enqueue(new Sequences.Ping(options, callback));
  58. };
  59. Protocol.prototype.stats = function stats(options, callback) {
  60. if (typeof options === 'function') {
  61. callback = options;
  62. options = {};
  63. }
  64. return this._enqueue(new Sequences.Statistics(options, callback));
  65. };
  66. Protocol.prototype.quit = function quit(options, callback) {
  67. if (typeof options === 'function') {
  68. callback = options;
  69. options = {};
  70. }
  71. var self = this;
  72. var sequence = this._enqueue(new Sequences.Quit(options, callback));
  73. sequence.on('end', function () {
  74. self.end();
  75. });
  76. return this._quitSequence = sequence;
  77. };
  78. Protocol.prototype.end = function() {
  79. if (this._ended) {
  80. return;
  81. }
  82. this._ended = true;
  83. if (this._quitSequence && (this._quitSequence._ended || this._queue[0] === this._quitSequence)) {
  84. this._quitSequence.end();
  85. this.emit('end');
  86. return;
  87. }
  88. var err = new Error('Connection lost: The server closed the connection.');
  89. err.fatal = true;
  90. err.code = 'PROTOCOL_CONNECTION_LOST';
  91. this._delegateError(err);
  92. };
  93. Protocol.prototype.pause = function() {
  94. this._parser.pause();
  95. // Since there is a file stream in query, we must transmit pause/resume event to current sequence.
  96. var seq = this._queue[0];
  97. if (seq && seq.emit) {
  98. seq.emit('pause');
  99. }
  100. };
  101. Protocol.prototype.resume = function() {
  102. this._parser.resume();
  103. // Since there is a file stream in query, we must transmit pause/resume event to current sequence.
  104. var seq = this._queue[0];
  105. if (seq && seq.emit) {
  106. seq.emit('resume');
  107. }
  108. };
  109. Protocol.prototype._enqueue = function(sequence) {
  110. if (!this._validateEnqueue(sequence)) {
  111. return sequence;
  112. }
  113. if (this._config.trace) {
  114. // Long stack trace support
  115. sequence._callSite = sequence._callSite || new Error();
  116. }
  117. this._queue.push(sequence);
  118. this.emit('enqueue', sequence);
  119. var self = this;
  120. sequence
  121. .on('error', function(err) {
  122. self._delegateError(err, sequence);
  123. })
  124. .on('packet', function(packet) {
  125. sequence._timer.active();
  126. self._emitPacket(packet);
  127. })
  128. .on('timeout', function() {
  129. var err = new Error(sequence.constructor.name + ' inactivity timeout');
  130. err.code = 'PROTOCOL_SEQUENCE_TIMEOUT';
  131. err.fatal = true;
  132. err.timeout = sequence._timeout;
  133. self._delegateError(err, sequence);
  134. });
  135. if (sequence.constructor === Sequences.Handshake) {
  136. sequence.on('start-tls', function () {
  137. sequence._timer.active();
  138. self._connection._startTLS(function(err) {
  139. if (err) {
  140. // SSL negotiation error are fatal
  141. err.code = 'HANDSHAKE_SSL_ERROR';
  142. err.fatal = true;
  143. sequence.end(err);
  144. return;
  145. }
  146. sequence._timer.active();
  147. sequence._tlsUpgradeCompleteHandler();
  148. });
  149. });
  150. sequence.on('end', function () {
  151. self._handshaked = true;
  152. if (!self._fatalError) {
  153. self.emit('handshake', self._handshakeInitializationPacket);
  154. }
  155. });
  156. }
  157. sequence.on('end', function () {
  158. self._dequeue(sequence);
  159. });
  160. if (this._queue.length === 1) {
  161. this._parser.resetPacketNumber();
  162. this._startSequence(sequence);
  163. }
  164. return sequence;
  165. };
  166. Protocol.prototype._validateEnqueue = function _validateEnqueue(sequence) {
  167. var err;
  168. var prefix = 'Cannot enqueue ' + sequence.constructor.name;
  169. if (this._fatalError) {
  170. err = new Error(prefix + ' after fatal error.');
  171. err.code = 'PROTOCOL_ENQUEUE_AFTER_FATAL_ERROR';
  172. } else if (this._quitSequence) {
  173. err = new Error(prefix + ' after invoking quit.');
  174. err.code = 'PROTOCOL_ENQUEUE_AFTER_QUIT';
  175. } else if (this._destroyed) {
  176. err = new Error(prefix + ' after being destroyed.');
  177. err.code = 'PROTOCOL_ENQUEUE_AFTER_DESTROY';
  178. } else if ((this._handshake || this._handshaked) && sequence.constructor === Sequences.Handshake) {
  179. err = new Error(prefix + ' after already enqueuing a Handshake.');
  180. err.code = 'PROTOCOL_ENQUEUE_HANDSHAKE_TWICE';
  181. } else {
  182. return true;
  183. }
  184. var self = this;
  185. err.fatal = false;
  186. // add error handler
  187. sequence.on('error', function (err) {
  188. self._delegateError(err, sequence);
  189. });
  190. process.nextTick(function () {
  191. sequence.end(err);
  192. });
  193. return false;
  194. };
  195. Protocol.prototype._parsePacket = function() {
  196. var sequence = this._queue[0];
  197. if (!sequence) {
  198. var err = new Error('Received packet with no active sequence.');
  199. err.code = 'PROTOCOL_STRAY_PACKET';
  200. err.fatal = true;
  201. this._delegateError(err);
  202. return;
  203. }
  204. var Packet = this._determinePacket(sequence);
  205. var packet = new Packet({protocol41: this._config.protocol41});
  206. var packetName = Packet.name;
  207. // Special case: Faster dispatch, and parsing done inside sequence
  208. if (Packet === Packets.RowDataPacket) {
  209. sequence.RowDataPacket(packet, this._parser, this._connection);
  210. if (this._config.debug) {
  211. this._debugPacket(true, packet);
  212. }
  213. return;
  214. }
  215. if (this._config.debug) {
  216. this._parsePacketDebug(packet);
  217. } else {
  218. packet.parse(this._parser);
  219. }
  220. if (Packet === Packets.HandshakeInitializationPacket) {
  221. this._handshakeInitializationPacket = packet;
  222. this.emit('initialize', packet);
  223. }
  224. sequence._timer.active();
  225. if (!sequence[packetName]) {
  226. var err = new Error('Received packet in the wrong sequence.');
  227. err.code = 'PROTOCOL_INCORRECT_PACKET_SEQUENCE';
  228. err.fatal = true;
  229. this._delegateError(err);
  230. return;
  231. }
  232. sequence[packetName](packet);
  233. };
  234. Protocol.prototype._parsePacketDebug = function _parsePacketDebug(packet) {
  235. try {
  236. packet.parse(this._parser);
  237. } finally {
  238. this._debugPacket(true, packet);
  239. }
  240. };
  241. Protocol.prototype._emitPacket = function(packet) {
  242. var packetWriter = new PacketWriter();
  243. packet.write(packetWriter);
  244. this.emit('data', packetWriter.toBuffer(this._parser));
  245. if (this._config.debug) {
  246. this._debugPacket(false, packet);
  247. }
  248. };
  249. Protocol.prototype._determinePacket = function(sequence) {
  250. var firstByte = this._parser.peak();
  251. if (sequence.determinePacket) {
  252. var Packet = sequence.determinePacket(firstByte, this._parser);
  253. if (Packet) {
  254. return Packet;
  255. }
  256. }
  257. switch (firstByte) {
  258. case 0x00: return Packets.OkPacket;
  259. case 0xfe: return Packets.EofPacket;
  260. case 0xff: return Packets.ErrorPacket;
  261. }
  262. throw new Error('Could not determine packet, firstByte = ' + firstByte);
  263. };
  264. Protocol.prototype._dequeue = function(sequence) {
  265. sequence._timer.stop();
  266. // No point in advancing the queue, we are dead
  267. if (this._fatalError) {
  268. return;
  269. }
  270. this._queue.shift();
  271. var sequence = this._queue[0];
  272. if (!sequence) {
  273. this.emit('drain');
  274. return;
  275. }
  276. this._parser.resetPacketNumber();
  277. this._startSequence(sequence);
  278. };
  279. Protocol.prototype._startSequence = function(sequence) {
  280. if (sequence._timeout > 0 && isFinite(sequence._timeout)) {
  281. sequence._timer.start(sequence._timeout);
  282. }
  283. if (sequence.constructor === Sequences.ChangeUser) {
  284. sequence.start(this._handshakeInitializationPacket);
  285. } else {
  286. sequence.start();
  287. }
  288. };
  289. Protocol.prototype.handleNetworkError = function(err) {
  290. err.fatal = true;
  291. var sequence = this._queue[0];
  292. if (sequence) {
  293. sequence.end(err);
  294. } else {
  295. this._delegateError(err);
  296. }
  297. };
  298. Protocol.prototype.handleParserError = function handleParserError(err) {
  299. var sequence = this._queue[0];
  300. if (sequence) {
  301. sequence.end(err);
  302. } else {
  303. this._delegateError(err);
  304. }
  305. };
  306. Protocol.prototype._delegateError = function(err, sequence) {
  307. // Stop delegating errors after the first fatal error
  308. if (this._fatalError) {
  309. return;
  310. }
  311. if (err.fatal) {
  312. this._fatalError = err;
  313. }
  314. if (this._shouldErrorBubbleUp(err, sequence)) {
  315. // Can't use regular 'error' event here as that always destroys the pipe
  316. // between socket and protocol which is not what we want (unless the
  317. // exception was fatal).
  318. this.emit('unhandledError', err);
  319. } else if (err.fatal) {
  320. // Send fatal error to all sequences in the queue
  321. var queue = this._queue;
  322. process.nextTick(function () {
  323. queue.forEach(function (sequence) {
  324. sequence.end(err);
  325. });
  326. queue.length = 0;
  327. });
  328. }
  329. // Make sure the stream we are piping to is getting closed
  330. if (err.fatal) {
  331. this.emit('end', err);
  332. }
  333. };
  334. Protocol.prototype._shouldErrorBubbleUp = function(err, sequence) {
  335. if (sequence) {
  336. if (sequence.hasErrorHandler()) {
  337. return false;
  338. } else if (!err.fatal) {
  339. return true;
  340. }
  341. }
  342. return (err.fatal && !this._hasPendingErrorHandlers());
  343. };
  344. Protocol.prototype._hasPendingErrorHandlers = function() {
  345. return this._queue.some(function(sequence) {
  346. return sequence.hasErrorHandler();
  347. });
  348. };
  349. Protocol.prototype.destroy = function() {
  350. this._destroyed = true;
  351. this._parser.pause();
  352. if (this._connection.state !== 'disconnected') {
  353. if (!this._ended) {
  354. this.end();
  355. }
  356. }
  357. };
  358. Protocol.prototype._debugPacket = function(incoming, packet) {
  359. var connection = this._connection;
  360. var direction = incoming
  361. ? '<--'
  362. : '-->';
  363. var packetName = packet.constructor.name;
  364. var threadId = connection && connection.threadId !== null
  365. ? ' (' + connection.threadId + ')'
  366. : '';
  367. // check for debug packet restriction
  368. if (Array.isArray(this._config.debug) && this._config.debug.indexOf(packetName) === -1) {
  369. return;
  370. }
  371. var packetPayload = Util.inspect(packet).replace(/^[^{]+/, '');
  372. console.log('%s%s %s %s\n', direction, threadId, packetName, packetPayload);
  373. };