Query.js 5.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228
  1. var ClientConstants = require('../constants/client');
  2. var fs = require('fs');
  3. var Packets = require('../packets');
  4. var ResultSet = require('../ResultSet');
  5. var Sequence = require('./Sequence');
  6. var ServerStatus = require('../constants/server_status');
  7. var Readable = require('readable-stream');
  8. var Util = require('util');
  9. module.exports = Query;
  10. Util.inherits(Query, Sequence);
  11. function Query(options, callback) {
  12. Sequence.call(this, options, callback);
  13. this.sql = options.sql;
  14. this.values = options.values;
  15. this.typeCast = (options.typeCast === undefined)
  16. ? true
  17. : options.typeCast;
  18. this.nestTables = options.nestTables || false;
  19. this._resultSet = null;
  20. this._results = [];
  21. this._fields = [];
  22. this._index = 0;
  23. this._loadError = null;
  24. }
  25. Query.prototype.start = function() {
  26. this.emit('packet', new Packets.ComQueryPacket(this.sql));
  27. };
  28. Query.prototype.determinePacket = function determinePacket(byte, parser) {
  29. var resultSet = this._resultSet;
  30. if (!resultSet) {
  31. switch (byte) {
  32. case 0x00: return Packets.OkPacket;
  33. case 0xfb: return Packets.LocalInfileRequestPacket;
  34. case 0xff: return Packets.ErrorPacket;
  35. default: return Packets.ResultSetHeaderPacket;
  36. }
  37. }
  38. if (resultSet.eofPackets.length === 0) {
  39. return (resultSet.fieldPackets.length < resultSet.resultSetHeaderPacket.fieldCount)
  40. ? Packets.FieldPacket
  41. : Packets.EofPacket;
  42. }
  43. if (byte === 0xff) {
  44. return Packets.ErrorPacket;
  45. }
  46. if (byte === 0xfe && parser.packetLength() < 9) {
  47. return Packets.EofPacket;
  48. }
  49. return Packets.RowDataPacket;
  50. };
  51. Query.prototype['OkPacket'] = function(packet) {
  52. // try...finally for exception safety
  53. try {
  54. if (!this._callback) {
  55. this.emit('result', packet, this._index);
  56. } else {
  57. this._results.push(packet);
  58. this._fields.push(undefined);
  59. }
  60. } finally {
  61. this._index++;
  62. this._resultSet = null;
  63. this._handleFinalResultPacket(packet);
  64. }
  65. };
  66. Query.prototype['ErrorPacket'] = function(packet) {
  67. var err = this._packetToError(packet);
  68. var results = (this._results.length > 0)
  69. ? this._results
  70. : undefined;
  71. var fields = (this._fields.length > 0)
  72. ? this._fields
  73. : undefined;
  74. err.index = this._index;
  75. err.sql = this.sql;
  76. this.end(err, results, fields);
  77. };
  78. Query.prototype['LocalInfileRequestPacket'] = function(packet) {
  79. if (this._connection.config.clientFlags & ClientConstants.CLIENT_LOCAL_FILES) {
  80. this._sendLocalDataFile(packet.filename);
  81. } else {
  82. this._loadError = new Error('Load local files command is disabled');
  83. this._loadError.code = 'LOCAL_FILES_DISABLED';
  84. this._loadError.fatal = false;
  85. this.emit('packet', new Packets.EmptyPacket());
  86. }
  87. };
  88. Query.prototype['ResultSetHeaderPacket'] = function(packet) {
  89. this._resultSet = new ResultSet(packet);
  90. };
  91. Query.prototype['FieldPacket'] = function(packet) {
  92. this._resultSet.fieldPackets.push(packet);
  93. };
  94. Query.prototype['EofPacket'] = function(packet) {
  95. this._resultSet.eofPackets.push(packet);
  96. if (this._resultSet.eofPackets.length === 1 && !this._callback) {
  97. this.emit('fields', this._resultSet.fieldPackets, this._index);
  98. }
  99. if (this._resultSet.eofPackets.length !== 2) {
  100. return;
  101. }
  102. if (this._callback) {
  103. this._results.push(this._resultSet.rows);
  104. this._fields.push(this._resultSet.fieldPackets);
  105. }
  106. this._index++;
  107. this._resultSet = null;
  108. this._handleFinalResultPacket(packet);
  109. };
  110. Query.prototype._handleFinalResultPacket = function(packet) {
  111. if (packet.serverStatus & ServerStatus.SERVER_MORE_RESULTS_EXISTS) {
  112. return;
  113. }
  114. var results = (this._results.length > 1)
  115. ? this._results
  116. : this._results[0];
  117. var fields = (this._fields.length > 1)
  118. ? this._fields
  119. : this._fields[0];
  120. this.end(this._loadError, results, fields);
  121. };
  122. Query.prototype['RowDataPacket'] = function(packet, parser, connection) {
  123. packet.parse(parser, this._resultSet.fieldPackets, this.typeCast, this.nestTables, connection);
  124. if (this._callback) {
  125. this._resultSet.rows.push(packet);
  126. } else {
  127. this.emit('result', packet, this._index);
  128. }
  129. };
  130. Query.prototype._sendLocalDataFile = function(path) {
  131. var self = this;
  132. var localStream = fs.createReadStream(path, {
  133. flag : 'r',
  134. encoding : null,
  135. autoClose : true
  136. });
  137. this.on('pause', function () {
  138. localStream.pause();
  139. });
  140. this.on('resume', function () {
  141. localStream.resume();
  142. });
  143. localStream.on('data', function (data) {
  144. self.emit('packet', new Packets.LocalDataFilePacket(data));
  145. });
  146. localStream.on('error', function (err) {
  147. self._loadError = err;
  148. localStream.emit('end');
  149. });
  150. localStream.on('end', function () {
  151. self.emit('packet', new Packets.EmptyPacket());
  152. });
  153. };
  154. Query.prototype.stream = function(options) {
  155. var self = this;
  156. options = options || {};
  157. options.objectMode = true;
  158. var stream = new Readable(options);
  159. stream._read = function() {
  160. self._connection && self._connection.resume();
  161. };
  162. stream.once('end', function() {
  163. process.nextTick(function () {
  164. stream.emit('close');
  165. });
  166. });
  167. this.on('result', function(row, i) {
  168. if (!stream.push(row)) self._connection.pause();
  169. stream.emit('result', row, i); // replicate old emitter
  170. });
  171. this.on('error', function(err) {
  172. stream.emit('error', err); // Pass on any errors
  173. });
  174. this.on('end', function() {
  175. stream.push(null); // pushing null, indicating EOF
  176. });
  177. this.on('fields', function(fields, i) {
  178. stream.emit('fields', fields, i); // replicate old emitter
  179. });
  180. return stream;
  181. };