var ClientConstants = require('../constants/client'); var fs = require('fs'); var Packets = require('../packets'); var ResultSet = require('../ResultSet'); var Sequence = require('./Sequence'); var ServerStatus = require('../constants/server_status'); var Readable = require('readable-stream'); var Util = require('util'); module.exports = Query; Util.inherits(Query, Sequence); function Query(options, callback) { Sequence.call(this, options, callback); this.sql = options.sql; this.values = options.values; this.typeCast = (options.typeCast === undefined) ? true : options.typeCast; this.nestTables = options.nestTables || false; this._resultSet = null; this._results = []; this._fields = []; this._index = 0; this._loadError = null; } Query.prototype.start = function() { this.emit('packet', new Packets.ComQueryPacket(this.sql)); }; Query.prototype.determinePacket = function determinePacket(byte, parser) { var resultSet = this._resultSet; if (!resultSet) { switch (byte) { case 0x00: return Packets.OkPacket; case 0xfb: return Packets.LocalInfileRequestPacket; case 0xff: return Packets.ErrorPacket; default: return Packets.ResultSetHeaderPacket; } } if (resultSet.eofPackets.length === 0) { return (resultSet.fieldPackets.length < resultSet.resultSetHeaderPacket.fieldCount) ? Packets.FieldPacket : Packets.EofPacket; } if (byte === 0xff) { return Packets.ErrorPacket; } if (byte === 0xfe && parser.packetLength() < 9) { return Packets.EofPacket; } return Packets.RowDataPacket; }; Query.prototype['OkPacket'] = function(packet) { // try...finally for exception safety try { if (!this._callback) { this.emit('result', packet, this._index); } else { this._results.push(packet); this._fields.push(undefined); } } finally { this._index++; this._resultSet = null; this._handleFinalResultPacket(packet); } }; Query.prototype['ErrorPacket'] = function(packet) { var err = this._packetToError(packet); var results = (this._results.length > 0) ? this._results : undefined; var fields = (this._fields.length > 0) ? this._fields : undefined; err.index = this._index; err.sql = this.sql; this.end(err, results, fields); }; Query.prototype['LocalInfileRequestPacket'] = function(packet) { if (this._connection.config.clientFlags & ClientConstants.CLIENT_LOCAL_FILES) { this._sendLocalDataFile(packet.filename); } else { this._loadError = new Error('Load local files command is disabled'); this._loadError.code = 'LOCAL_FILES_DISABLED'; this._loadError.fatal = false; this.emit('packet', new Packets.EmptyPacket()); } }; Query.prototype['ResultSetHeaderPacket'] = function(packet) { this._resultSet = new ResultSet(packet); }; Query.prototype['FieldPacket'] = function(packet) { this._resultSet.fieldPackets.push(packet); }; Query.prototype['EofPacket'] = function(packet) { this._resultSet.eofPackets.push(packet); if (this._resultSet.eofPackets.length === 1 && !this._callback) { this.emit('fields', this._resultSet.fieldPackets, this._index); } if (this._resultSet.eofPackets.length !== 2) { return; } if (this._callback) { this._results.push(this._resultSet.rows); this._fields.push(this._resultSet.fieldPackets); } this._index++; this._resultSet = null; this._handleFinalResultPacket(packet); }; Query.prototype._handleFinalResultPacket = function(packet) { if (packet.serverStatus & ServerStatus.SERVER_MORE_RESULTS_EXISTS) { return; } var results = (this._results.length > 1) ? this._results : this._results[0]; var fields = (this._fields.length > 1) ? this._fields : this._fields[0]; this.end(this._loadError, results, fields); }; Query.prototype['RowDataPacket'] = function(packet, parser, connection) { packet.parse(parser, this._resultSet.fieldPackets, this.typeCast, this.nestTables, connection); if (this._callback) { this._resultSet.rows.push(packet); } else { this.emit('result', packet, this._index); } }; Query.prototype._sendLocalDataFile = function(path) { var self = this; var localStream = fs.createReadStream(path, { flag : 'r', encoding : null, autoClose : true }); this.on('pause', function () { localStream.pause(); }); this.on('resume', function () { localStream.resume(); }); localStream.on('data', function (data) { self.emit('packet', new Packets.LocalDataFilePacket(data)); }); localStream.on('error', function (err) { self._loadError = err; localStream.emit('end'); }); localStream.on('end', function () { self.emit('packet', new Packets.EmptyPacket()); }); }; Query.prototype.stream = function(options) { var self = this; options = options || {}; options.objectMode = true; var stream = new Readable(options); stream._read = function() { self._connection && self._connection.resume(); }; stream.once('end', function() { process.nextTick(function () { stream.emit('close'); }); }); this.on('result', function(row, i) { if (!stream.push(row)) self._connection.pause(); stream.emit('result', row, i); // replicate old emitter }); this.on('error', function(err) { stream.emit('error', err); // Pass on any errors }); this.on('end', function() { stream.push(null); // pushing null, indicating EOF }); this.on('fields', function(fields, i) { stream.emit('fields', fields, i); // replicate old emitter }); return stream; };