123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228 |
- var ClientConstants = require('../constants/client');
- var fs = require('fs');
- var Packets = require('../packets');
- var ResultSet = require('../ResultSet');
- var Sequence = require('./Sequence');
- var ServerStatus = require('../constants/server_status');
- var Readable = require('readable-stream');
- var Util = require('util');
- module.exports = Query;
- Util.inherits(Query, Sequence);
- function Query(options, callback) {
- Sequence.call(this, options, callback);
- this.sql = options.sql;
- this.values = options.values;
- this.typeCast = (options.typeCast === undefined)
- ? true
- : options.typeCast;
- this.nestTables = options.nestTables || false;
- this._resultSet = null;
- this._results = [];
- this._fields = [];
- this._index = 0;
- this._loadError = null;
- }
- Query.prototype.start = function() {
- this.emit('packet', new Packets.ComQueryPacket(this.sql));
- };
- Query.prototype.determinePacket = function determinePacket(byte, parser) {
- var resultSet = this._resultSet;
- if (!resultSet) {
- switch (byte) {
- case 0x00: return Packets.OkPacket;
- case 0xfb: return Packets.LocalInfileRequestPacket;
- case 0xff: return Packets.ErrorPacket;
- default: return Packets.ResultSetHeaderPacket;
- }
- }
- if (resultSet.eofPackets.length === 0) {
- return (resultSet.fieldPackets.length < resultSet.resultSetHeaderPacket.fieldCount)
- ? Packets.FieldPacket
- : Packets.EofPacket;
- }
- if (byte === 0xff) {
- return Packets.ErrorPacket;
- }
- if (byte === 0xfe && parser.packetLength() < 9) {
- return Packets.EofPacket;
- }
- return Packets.RowDataPacket;
- };
- Query.prototype['OkPacket'] = function(packet) {
- // try...finally for exception safety
- try {
- if (!this._callback) {
- this.emit('result', packet, this._index);
- } else {
- this._results.push(packet);
- this._fields.push(undefined);
- }
- } finally {
- this._index++;
- this._resultSet = null;
- this._handleFinalResultPacket(packet);
- }
- };
- Query.prototype['ErrorPacket'] = function(packet) {
- var err = this._packetToError(packet);
- var results = (this._results.length > 0)
- ? this._results
- : undefined;
- var fields = (this._fields.length > 0)
- ? this._fields
- : undefined;
- err.index = this._index;
- err.sql = this.sql;
- this.end(err, results, fields);
- };
- Query.prototype['LocalInfileRequestPacket'] = function(packet) {
- if (this._connection.config.clientFlags & ClientConstants.CLIENT_LOCAL_FILES) {
- this._sendLocalDataFile(packet.filename);
- } else {
- this._loadError = new Error('Load local files command is disabled');
- this._loadError.code = 'LOCAL_FILES_DISABLED';
- this._loadError.fatal = false;
- this.emit('packet', new Packets.EmptyPacket());
- }
- };
- Query.prototype['ResultSetHeaderPacket'] = function(packet) {
- this._resultSet = new ResultSet(packet);
- };
- Query.prototype['FieldPacket'] = function(packet) {
- this._resultSet.fieldPackets.push(packet);
- };
- Query.prototype['EofPacket'] = function(packet) {
- this._resultSet.eofPackets.push(packet);
- if (this._resultSet.eofPackets.length === 1 && !this._callback) {
- this.emit('fields', this._resultSet.fieldPackets, this._index);
- }
- if (this._resultSet.eofPackets.length !== 2) {
- return;
- }
- if (this._callback) {
- this._results.push(this._resultSet.rows);
- this._fields.push(this._resultSet.fieldPackets);
- }
- this._index++;
- this._resultSet = null;
- this._handleFinalResultPacket(packet);
- };
- Query.prototype._handleFinalResultPacket = function(packet) {
- if (packet.serverStatus & ServerStatus.SERVER_MORE_RESULTS_EXISTS) {
- return;
- }
- var results = (this._results.length > 1)
- ? this._results
- : this._results[0];
- var fields = (this._fields.length > 1)
- ? this._fields
- : this._fields[0];
- this.end(this._loadError, results, fields);
- };
- Query.prototype['RowDataPacket'] = function(packet, parser, connection) {
- packet.parse(parser, this._resultSet.fieldPackets, this.typeCast, this.nestTables, connection);
- if (this._callback) {
- this._resultSet.rows.push(packet);
- } else {
- this.emit('result', packet, this._index);
- }
- };
- Query.prototype._sendLocalDataFile = function(path) {
- var self = this;
- var localStream = fs.createReadStream(path, {
- flag : 'r',
- encoding : null,
- autoClose : true
- });
- this.on('pause', function () {
- localStream.pause();
- });
- this.on('resume', function () {
- localStream.resume();
- });
- localStream.on('data', function (data) {
- self.emit('packet', new Packets.LocalDataFilePacket(data));
- });
- localStream.on('error', function (err) {
- self._loadError = err;
- localStream.emit('end');
- });
- localStream.on('end', function () {
- self.emit('packet', new Packets.EmptyPacket());
- });
- };
- Query.prototype.stream = function(options) {
- var self = this;
- options = options || {};
- options.objectMode = true;
- var stream = new Readable(options);
- stream._read = function() {
- self._connection && self._connection.resume();
- };
- stream.once('end', function() {
- process.nextTick(function () {
- stream.emit('close');
- });
- });
- this.on('result', function(row, i) {
- if (!stream.push(row)) self._connection.pause();
- stream.emit('result', row, i); // replicate old emitter
- });
- this.on('error', function(err) {
- stream.emit('error', err); // Pass on any errors
- });
- this.on('end', function() {
- stream.push(null); // pushing null, indicating EOF
- });
- this.on('fields', function(fields, i) {
- stream.emit('fields', fields, i); // replicate old emitter
- });
- return stream;
- };
|