diff --git a/README.markdown b/README.markdown index b56abc3..7e2bf5a 100644 --- a/README.markdown +++ b/README.markdown @@ -5,7 +5,7 @@ Usage: var limestone = require("./limestone").SphinxClient(), sys = require("sys"); - limestone.connect(9312, // port. 9312 is standard Sphinx port + limestone.connect(9312, // port. 9312 is standard Sphinx port. also 'host:port' allowed function(err) { // callback if (err) { sys.puts('Connection error: ' + err); @@ -77,3 +77,20 @@ You can ask sphinx to open a persistent connection. You can then make several re } ); }); + +Limestone is queueing now: +You can safely call limestone.query or limestone.build_excerpts methods outside the scope of the callback functions, provided the connection is made persistent. Limestone will enqueue the sphinx commands and run them sequentially. + +This works: + + limestone.connect(9312, // port. 9312 is standard Sphinx port + function(err) { // callback + ... + limestone.query( + {'query':'test', maxmatches:1}, + function(err, answer) { + .... + }); + }); + + limestone.query({'second query':'test'}, function(err, answer){..}); // won't crash with previous diff --git a/limestone.js b/limestone.js index c8b2e43..ee32877 100644 --- a/limestone.js +++ b/limestone.js @@ -100,32 +100,39 @@ exports.SphinxClient = function() { self.Sphinx = Sphinx; - var server_conn; - var connection_status; + var server_conn = null; var response_output; - var conn_in_progress = 0; + var _connected = false; + var _queue = []; + var _persistent = false; - var search_commands = []; // Connect to Sphinx server - self.connect = function(port, persistent, callback) { + self.connect = function() { - // arguments: port, persistent, callback. + // arguments: ([host:port], [persistent], callback). var args = Array.prototype.slice.call(arguments); var callback = args.pop(); - var port = args.length ? args.shift(): Sphinx.port; - var persistent = args.length ? args.shift() : false; + var hostport = args.length ? args.shift() + '' : ':'+Sphinx.port; + var persistent = _persistent = args.length ? args.shift() : false; - // very ugly method of making sure no attempt to connection is made until all previous are done - if(conn_in_progress == 1){ - setTimeout(self.connect,10,port,callback); - return; - } - - server_conn = tcp.createConnection(port); - conn_in_progress = 1; + if(hostport.indexOf(':')==-1){ + hostport = isNaN(hostport) ? hostport + ':' + Sphinx.port : ':' + hostport; + } + hostport = hostport.split(':'); + + var host = hostport[0].trim().length ? hostport[0].trim(): 'localhost' ; + var port = hostport[1].trim().length ? hostport[1].trim() : Sphinx.port; + + + server_conn = tcp.createConnection(port, host); + server_conn.on('error', function(x){ + console.log('Error: '+x); + server_conn.end(); + callback(x); + }); // disable Nagle algorithm server_conn.setNoDelay(true); //server_conn.setEncoding('binary'); @@ -151,13 +158,27 @@ exports.SphinxClient = function() { var protocol_version_raw = data.toReader(); var protocol_version = protocol_version_raw.int32(); + // if there still data? process and callback + if(!protocol_version_raw.empty()) { + status_code = protocol_version_raw.int16(); + version = protocol_version_raw.int16(); + server_message = protocol_version_raw.lstring(); + if(status_code == Sphinx.statusCode.ERROR){ + errmsg = 'Server issued ERROR: '+server_message; + } + if(status_code == Sphinx.statusCode.RETRY){ + errmsg = 'Server issued RETRY: '+server_message; + } + if(errmsg){ + callback(new Error(errmsg)); + } + }// if !protocol_version_raw.empty() var data_unpacked = {'': protocol_version}; if (data_unpacked[""] >= 1) { - // Simple connection status indicator - connection_status = 1; + //all ok, send my version server_conn.write(version_number.toBuffer()); if(persistent){ @@ -170,6 +191,8 @@ exports.SphinxClient = function() { } server_conn.on('data', readResponseData); + _connected = true; + server_conn.emit('sphinx.connected'); // Use callback // promise.emitSuccess(); @@ -177,20 +200,14 @@ exports.SphinxClient = function() { } else { callback(new Error('Wrong protocol version: ' + protocol_version)); - conn_in_progress = 0; server_conn.end(); } }); - server_conn.on('error', function(exp) { - console.log('Error: ' + exp); - server_conn.end(); - conn_in_progress = 0; - }); } else { callback(new Error('Connection is ' + server_conn.readyState + ' in OnConnect')); server_conn.end(); - conn_in_progress = 0; + _connected = false; } }); @@ -201,7 +218,6 @@ exports.SphinxClient = function() { self.query = function(query_raw, callback) { var query = new Object(); - initResponseOutput(callback); // Default query parameters var query_parameters = { @@ -254,11 +270,6 @@ exports.SphinxClient = function() { query = query_raw.toString(); } - /* if (connection_status != 1) { - console.log("You must connect to server before issuing queries"); - return false; - - } */ var request = Buffer.makeWriter(); request.push.int16(Sphinx.command.SEARCH); @@ -397,16 +408,13 @@ exports.SphinxClient = function() { req_length.toBuffer().copy(request_buf, 4, 0); //console.log('Sending search request of ' + request_buf.length + ' bytes'); + _enqueue(request_buf, callback, Sphinx.clientCommand.SEARCH); - server_conn.write(request_buf); - // we also add the command to the search_commands queue - search_commands.push(Sphinx.clientCommand.SEARCH); }; self.build_excerpts = function(docs, index, words, passage_opts_raw, callback){ var passage_opts = new Object(); - initResponseOutput(callback); var passage_parameters = { before_match : '', @@ -490,21 +498,53 @@ exports.SphinxClient = function() { //console.log('Sending build excerpt request of ' + request_buf.length + 'bytes'); - server_conn.write(request_buf); - // we also add the command to the search_commands queue - search_commands.push(Sphinx.clientCommand.EXCERPT); + _enqueue(request_buf, callback, Sphinx.clientCommand.EXCERPT); }; // build_excerpts self.disconnect = function() { - conn_in_progress = 0; server_conn.end(); }; + function _enqueue(req_buf , cb, sc) { + if(!server_conn || !server_conn.writable){ + cb(new Error("Trying to enqueue. Not connected")); + } + _queue.push({request_buffer: req_buf, callback: cb, search_command: sc}); + if(_queue.length === 1) + { + if(_connected) { + initResponseOutput(cb); + server_conn.write(req_buf); + } else { + server_conn.once('sphinx.connected', function(){ + initResponseOutput(cb); + server_conn.write(req_buf); + }); + } + } + } + + function _dequeue() { + _queue.shift(); + if(!_queue.length){ + return; + } + if(!_persistent){ + server_conn = null; + return; + } + if(!server_conn){ + throw new Error("Trying to dequeue. Not connected"); + } + // we run the next server request in line + initResponseOutput(_queue[0]['callback']); + server_conn.write(_queue[0]['request_buffer']); + } + function readResponseData(data) { // Got response! - // Command must match the one used in query response_output.append(data); - response_output.runCallbackIfDone(search_commands.shift()); + response_output.runCallbackIfDone(_queue[0]['search_command']); } function initResponseOutput(query_callback) { @@ -559,14 +599,21 @@ exports.SphinxClient = function() { if (this.status == Sphinx.statusCode.ERROR) { errmsg += "Server issued ERROR: " + this.data; } + if (this.status == Sphinx.statusCode.RETRY){ + errmsg += "Server issued RETRY: " + this.data; + }// if RETRY return errmsg; }, runCallbackIfDone : function(search_command) { if (this.done()) { var answer; + var cloned = new Buffer(response_output.data.length); + // clone the response data, so we can dequeue and let the server free to modify again response_output + response_output.data.copy(cloned); + _dequeue(); var errmsg = this.checkResponse(search_command); if (!errmsg) { - answer = parseResponse(response_output.data, search_command); + answer = parseResponse(cloned, search_command); } query_callback(errmsg, answer); }