From e039051906d8bd206caf651cd2d7f64589db46c0 Mon Sep 17 00:00:00 2001 From: Joe Z Date: Fri, 2 Dec 2011 07:41:58 -0500 Subject: [PATCH] when sending protocol version and "persistent" flag, wait for the drain events to avoid collision. Only works in node 0.6 --- limestone.js | 56 +++++++++++++++++++++++++++++++++++----------------- 1 file changed, 38 insertions(+), 18 deletions(-) diff --git a/limestone.js b/limestone.js index ceec0e0..ebf3c7e 100644 --- a/limestone.js +++ b/limestone.js @@ -121,13 +121,21 @@ exports.SphinxClient = function() { server_conn.end(); callback(x); }); + + server_conn.on("close", function(x){ + if(x){ + console.log('closed'); + callback(new Error("The socket has closed due to an unknown error")); + } + }); + + // disable Nagle algorithm server_conn.setNoDelay(true); server_conn.addListener('connect', function () { // Sending protocol version - // Here we must send 4 bytes, '0x00000001' if (server_conn.readyState == 'open') { var version_number = Buffer.makeWriter(); version_number.push.int32(1); @@ -136,6 +144,7 @@ exports.SphinxClient = function() { var protocol_version_raw = data.toReader(); var protocol_version = protocol_version_raw.int32(); // if there still data? process and callback + // if sphinx has reached its max num of children, it sends retry. if(!protocol_version_raw.empty()) { status_code = protocol_version_raw.int16(); version = protocol_version_raw.int16(); @@ -148,29 +157,39 @@ exports.SphinxClient = function() { } if(errmsg){ callback(new Error(errmsg)); + return; } } - + var data_unpacked = {'': protocol_version}; if (data_unpacked[""] >= 1) { - //all ok, send my version - server_conn.write(version_number.toBuffer()); if(persistent){ - var pers_req = Buffer.makeWriter(); - pers_req.push.int16(Sphinx.command.PERSIST); - pers_req.push.int16(0); - pers_req.push.int32(4); - pers_req.push.int32(1); - server_conn.write(pers_req.toBuffer()); + server_conn.once('drain', function(){ + var pers_req = Buffer.makeWriter(); + pers_req.push.int16(Sphinx.command.PERSIST); + pers_req.push.int16(0); + pers_req.push.int32(4); + pers_req.push.int32(1); + server_conn.write(pers_req.toBuffer()); + server_conn.once('drain', function(){ + server_conn.on('data', readResponseData); + _connected = true; + server_conn.emit('sphinx.connected'); + callback(null); + }); + }); + } else { + server_conn.once('drain', function(){ + server_conn.on('data', readResponseData); + _connected = true; + server_conn.emit('sphinx.connected'); + callback(null); + }); } - server_conn.on('data', readResponseData); - _connected = true; - server_conn.emit('sphinx.connected'); - - // Use callback - callback(null); - + + //all ok, send my version + server_conn.write(version_number.toBuffer()); } else { callback(new Error('Wrong protocol version: ' + protocol_version)); server_conn.end(); @@ -458,13 +477,14 @@ exports.SphinxClient = function() { function _enqueue(req_buf , cb, sc) { if(!server_conn || !server_conn.writable){ cb(new Error("Trying to enqueue. Not connected")); + return; } _queue.push({request_buffer: req_buf, callback: cb, search_command: sc}); if(_queue.length === 1) { if(_connected) { initResponseOutput(cb); - server_conn.write(req_buf); + server_conn.write(req_buf); } else { server_conn.once('sphinx.connected', function(){ initResponseOutput(cb);