diff --git a/limestone.js b/limestone.js index cb21e2d..b4804dc 100644 --- a/limestone.js +++ b/limestone.js @@ -95,7 +95,7 @@ exports.SphinxClient = function() { "FLOAT": 5, "BIGINT": 6, "STRING": 7, - "MULTI": 0x40000000 + "MULTI": 0x40000000 }; self.Sphinx = Sphinx; @@ -128,11 +128,19 @@ exports.SphinxClient = function() { server_conn = tcp.createConnection(port, host); - server_conn.on('error', function(x){ - console.log('Error: '+x); - server_conn.end(); - callback(x); - }); + server_conn.on('error', function(x){ + console.log('Error: '+x); + server_conn.end(); + callback(x); + }); + + server_conn.on("close", function(x){ + if (x) { + console.log('closed'); + callback(new Error("The socket has closed due to an unknown error")); + } + }); + // disable Nagle algorithm server_conn.setNoDelay(true); //server_conn.setEncoding('binary'); @@ -140,10 +148,6 @@ exports.SphinxClient = function() { response_output = null; //var promise = new process.Promise(); - - server_conn.addListener('error', function(e) { - callback(e); - }); server_conn.addListener('connect', function () { @@ -152,8 +156,8 @@ exports.SphinxClient = function() { // console.log('Sending version number...'); // Here we must send 4 bytes, '0x00000001' if (server_conn.readyState == 'open') { - var version_number = Buffer.makeWriter(); - version_number.push.int32(1); + var version_number = Buffer.makeWriter(); + version_number.push.int32(1); // Waiting for answer server_conn.once('data', function(data) { /*if (response_output) { @@ -181,27 +185,32 @@ exports.SphinxClient = function() { if (data_unpacked[""] >= 1) { + if (persistent){ + server_conn.once('drain', function(){ + var pers_req = Buffer.makeWriter(); + pers_req.push.int16(Sphinx.command.PERSIST); + pers_req.push.int16(0); + pers_req.push.int32(4); + pers_req.push.int32(1); + server_conn.write(pers_req.toBuffer()); + server_conn.once('drain', function(){ + server_conn.on('data', readResponseData); + _connected = true; + server_conn.emit('sphinx.connected'); + callback(null); + }); + }); + } else { + server_conn.once('drain', function(){ + server_conn.on('data', readResponseData); + _connected = true; + server_conn.emit('sphinx.connected'); + callback(null); + }); + } //all ok, send my version - server_conn.write(version_number.toBuffer()); - - if(persistent){ - var pers_req = Buffer.makeWriter(); - pers_req.push.int16(Sphinx.command.PERSIST); - pers_req.push.int16(0); - pers_req.push.int32(4); - pers_req.push.int32(1); - server_conn.write(pers_req.toBuffer()); - } - - server_conn.on('data', readResponseData); - _connected = true; - server_conn.emit('sphinx.connected'); - - // Use callback - // promise.emitSuccess(); - callback(null); - + server_conn.write(version_number.toBuffer()); } else { callback(new Error('Wrong protocol version: ' + protocol_version)); server_conn.end(); @@ -265,7 +274,7 @@ exports.SphinxClient = function() { if (query_raw.query) { for (x in query_parameters) { if (query_raw.hasOwnProperty(x)) { - query[x] = query_raw[x]; + query[x] = query_raw[x]; } else { query[x] = query_parameters[x]; } @@ -274,7 +283,6 @@ exports.SphinxClient = function() { query = query_raw.toString(); } - var request = Buffer.makeWriter(); request.push.int16(Sphinx.command.SEARCH); request.push.int16(Sphinx.clientCommand.SEARCH); @@ -292,9 +300,9 @@ exports.SphinxClient = function() { request.push.int32(query.sort); - request.push.lstring(query.sortby); + request.push.lstring(query.sortby); request.push.lstring(query.query); // Query text - request.push.int32(query.weights.length); + request.push.int32(query.weights.length); for (var weight in query.weights) { request.push.int32(parseInt(weight)); } @@ -306,10 +314,10 @@ exports.SphinxClient = function() { //request.push.int32(0); request.push.int64(0, query.min_id); // This is actually supposed to be two 64-bit numbers //request.push.int32(0); // However, there is a caveat about using 64-bit ids - request.push.int64(0, query.max_id); + request.push.int64(0, query.max_id); //console.log('Found ' + query.filters.length + ' filters'); - request.push.int32(query.filters.length); + request.push.int32(query.filters.length); for (var filter_id in query.filters) { var filter = query.filters[filter_id]; //console.log('Found filter of type ' + filter.type) @@ -343,7 +351,7 @@ exports.SphinxClient = function() { } request.push.int32(filter.exclude); } - + request.push.int32(query_parameters.groupfunc); request.push.lstring(query_parameters.groupby); // Groupby length @@ -355,7 +363,7 @@ exports.SphinxClient = function() { request.push.int32(query_parameters.retrycount); // Retrycount request.push.int32(query_parameters.retrydelay); // Retrydelay - request.push.lstring(query_parameters.groupdistinct); // Group distinct + request.push.lstring(query_parameters.groupdistinct); // Group distinct if (query_parameters.anchor.length == 0) { request.push.int32(0); // no anchor given @@ -411,7 +419,7 @@ exports.SphinxClient = function() { req_length.push.int32(request_buf.length - 8); req_length.toBuffer().copy(request_buf, 4, 0); - //console.log('Sending search request of ' + request_buf.length + ' bytes'); + //console.log('Sending search request of ' + request_buf.length + ' bytes '); _enqueue(request_buf, callback, Sphinx.clientCommand.SEARCH); }; @@ -501,7 +509,6 @@ exports.SphinxClient = function() { req_length.toBuffer().copy(request_buf,4,0); //console.log('Sending build excerpt request of ' + request_buf.length + 'bytes'); - _enqueue(request_buf, callback, Sphinx.clientCommand.EXCERPT); }; // build_excerpts @@ -510,45 +517,48 @@ exports.SphinxClient = function() { }; function _enqueue(req_buf , cb, sc) { - if(!server_conn || !server_conn.writable){ - cb(new Error("Trying to enqueue. Not connected")); - } - _queue.push({request_buffer: req_buf, callback: cb, search_command: sc}); - if(_queue.length === 1) - { - if(_connected) { - initResponseOutput(cb); - server_conn.write(req_buf); - } else { - server_conn.once('sphinx.connected', function(){ - initResponseOutput(cb); - server_conn.write(req_buf); - }); + if(!server_conn || !server_conn.writable){ + cb(new Error("Trying to enqueue. Not connected")); + return; + } + _queue.push({request_buffer: req_buf, callback: cb, search_command: sc}); + if (_queue.length === 1) { + if (_connected) { + initResponseOutput(cb); + server_conn.write(req_buf); + } else { + server_conn.once('sphinx.connected', function(){ + initResponseOutput(cb); + server_conn.write(req_buf); + }); + } } - } } function _dequeue() { - _queue.shift(); - if(!_queue.length){ - return; - } - if(!_persistent){ - server_conn = null; - return; - } - if(!server_conn){ - throw new Error("Trying to dequeue. Not connected"); - } - // we run the next server request in line - initResponseOutput(_queue[0]['callback']); - server_conn.write(_queue[0]['request_buffer']); + _queue.shift(); + if (!_queue.length) { + return; + } + + if(!_persistent){ + server_conn = null; + return; + } + + if(!server_conn){ + throw new Error("Trying to dequeue. Not connected"); + } + + // we run the next server request in line + initResponseOutput(_queue[0]['callback']); + server_conn.write(_queue[0]['request_buffer']); } function readResponseData(data) { // Got response! response_output.append(data); - response_output.runCallbackIfDone(_queue[0]['search_command']); + response_output.runCallbackIfDone(_queue[0]['search_command']); } function initResponseOutput(query_callback) {