Skip to content

Commit

Permalink
Merge branch 'JoeZ99-master'
Browse files Browse the repository at this point in the history
  • Loading branch information
kurokikaze committed Feb 15, 2012
2 parents db82b47 + 414fcd4 commit 6780b7c
Show file tree
Hide file tree
Showing 2 changed files with 107 additions and 43 deletions.
19 changes: 18 additions & 1 deletion README.markdown
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ Usage:
var limestone = require("./limestone").SphinxClient(),
sys = require("sys");

limestone.connect(9312, // port. 9312 is standard Sphinx port
limestone.connect(9312, // port. 9312 is standard Sphinx port. also 'host:port' allowed
function(err) { // callback
if (err) {
sys.puts('Connection error: ' + err);
Expand Down Expand Up @@ -77,3 +77,20 @@ You can ask sphinx to open a persistent connection. You can then make several re
}
);
});

Limestone is queueing now:
You can safely call limestone.query or limestone.build_excerpts methods outside the scope of the callback functions, provided the connection is made persistent. Limestone will enqueue the sphinx commands and run them sequentially.

This works:

limestone.connect(9312, // port. 9312 is standard Sphinx port
function(err) { // callback
...
limestone.query(
{'query':'test', maxmatches:1},
function(err, answer) {
....
});
});

limestone.query({'second query':'test'}, function(err, answer){..}); // won't crash with previous
131 changes: 89 additions & 42 deletions limestone.js
Original file line number Diff line number Diff line change
Expand Up @@ -100,32 +100,39 @@ exports.SphinxClient = function() {

self.Sphinx = Sphinx;

var server_conn;
var connection_status;
var server_conn = null;
var response_output;
var conn_in_progress = 0;
var _connected = false;
var _queue = [];
var _persistent = false;

var search_commands = [];

// Connect to Sphinx server
self.connect = function(port, persistent, callback) {
self.connect = function() {

// arguments: port, persistent, callback.
// arguments: ([host:port], [persistent], callback).
var args = Array.prototype.slice.call(arguments);

var callback = args.pop();
var port = args.length ? args.shift(): Sphinx.port;
var persistent = args.length ? args.shift() : false;
var hostport = args.length ? args.shift() + '' : ':'+Sphinx.port;
var persistent = _persistent = args.length ? args.shift() : false;


// very ugly method of making sure no attempt to connection is made until all previous are done
if(conn_in_progress == 1){
setTimeout(self.connect,10,port,callback);
return;
}

server_conn = tcp.createConnection(port);
conn_in_progress = 1;
if(hostport.indexOf(':')==-1){
hostport = isNaN(hostport) ? hostport + ':' + Sphinx.port : ':' + hostport;
}
hostport = hostport.split(':');

var host = hostport[0].trim().length ? hostport[0].trim(): 'localhost' ;
var port = hostport[1].trim().length ? hostport[1].trim() : Sphinx.port;


server_conn = tcp.createConnection(port, host);
server_conn.on('error', function(x){
console.log('Error: '+x);
server_conn.end();
callback(x);
});
// disable Nagle algorithm
server_conn.setNoDelay(true);
//server_conn.setEncoding('binary');
Expand All @@ -151,13 +158,27 @@ exports.SphinxClient = function() {

var protocol_version_raw = data.toReader();
var protocol_version = protocol_version_raw.int32();
// if there still data? process and callback
if(!protocol_version_raw.empty()) {
status_code = protocol_version_raw.int16();
version = protocol_version_raw.int16();
server_message = protocol_version_raw.lstring();
if(status_code == Sphinx.statusCode.ERROR){
errmsg = 'Server issued ERROR: '+server_message;
}
if(status_code == Sphinx.statusCode.RETRY){
errmsg = 'Server issued RETRY: '+server_message;
}
if(errmsg){
callback(new Error(errmsg));
}
}// if !protocol_version_raw.empty()
var data_unpacked = {'': protocol_version};

if (data_unpacked[""] >= 1) {

// Simple connection status indicator
connection_status = 1;

//all ok, send my version
server_conn.write(version_number.toBuffer());

if(persistent){
Expand All @@ -170,27 +191,23 @@ exports.SphinxClient = function() {
}

server_conn.on('data', readResponseData);
_connected = true;
server_conn.emit('sphinx.connected');

// Use callback
// promise.emitSuccess();
callback(null);

} else {
callback(new Error('Wrong protocol version: ' + protocol_version));
conn_in_progress = 0;
server_conn.end();
}

});
server_conn.on('error', function(exp) {
console.log('Error: ' + exp);
server_conn.end();
conn_in_progress = 0;
});
} else {
callback(new Error('Connection is ' + server_conn.readyState + ' in OnConnect'));
server_conn.end();
conn_in_progress = 0;
_connected = false;
}
});

Expand All @@ -201,7 +218,6 @@ exports.SphinxClient = function() {
self.query = function(query_raw, callback) {
var query = new Object();

initResponseOutput(callback);

// Default query parameters
var query_parameters = {
Expand Down Expand Up @@ -254,11 +270,6 @@ exports.SphinxClient = function() {
query = query_raw.toString();
}

/* if (connection_status != 1) {
console.log("You must connect to server before issuing queries");
return false;
} */

var request = Buffer.makeWriter();
request.push.int16(Sphinx.command.SEARCH);
Expand Down Expand Up @@ -397,16 +408,13 @@ exports.SphinxClient = function() {
req_length.toBuffer().copy(request_buf, 4, 0);

//console.log('Sending search request of ' + request_buf.length + ' bytes');
_enqueue(request_buf, callback, Sphinx.clientCommand.SEARCH);

server_conn.write(request_buf);
// we also add the command to the search_commands queue
search_commands.push(Sphinx.clientCommand.SEARCH);
};

self.build_excerpts = function(docs, index, words, passage_opts_raw, callback){
var passage_opts = new Object();

initResponseOutput(callback);

var passage_parameters = {
before_match : '<b>',
Expand Down Expand Up @@ -490,21 +498,53 @@ exports.SphinxClient = function() {

//console.log('Sending build excerpt request of ' + request_buf.length + 'bytes');

server_conn.write(request_buf);
// we also add the command to the search_commands queue
search_commands.push(Sphinx.clientCommand.EXCERPT);
_enqueue(request_buf, callback, Sphinx.clientCommand.EXCERPT);
}; // build_excerpts

self.disconnect = function() {
conn_in_progress = 0;
server_conn.end();
};

function _enqueue(req_buf , cb, sc) {
if(!server_conn || !server_conn.writable){
cb(new Error("Trying to enqueue. Not connected"));
}
_queue.push({request_buffer: req_buf, callback: cb, search_command: sc});
if(_queue.length === 1)
{
if(_connected) {
initResponseOutput(cb);
server_conn.write(req_buf);
} else {
server_conn.once('sphinx.connected', function(){
initResponseOutput(cb);
server_conn.write(req_buf);
});
}
}
}

function _dequeue() {
_queue.shift();
if(!_queue.length){
return;
}
if(!_persistent){
server_conn = null;
return;
}
if(!server_conn){
throw new Error("Trying to dequeue. Not connected");
}
// we run the next server request in line
initResponseOutput(_queue[0]['callback']);
server_conn.write(_queue[0]['request_buffer']);
}

function readResponseData(data) {
// Got response!
// Command must match the one used in query
response_output.append(data);
response_output.runCallbackIfDone(search_commands.shift());
response_output.runCallbackIfDone(_queue[0]['search_command']);
}

function initResponseOutput(query_callback) {
Expand Down Expand Up @@ -559,14 +599,21 @@ exports.SphinxClient = function() {
if (this.status == Sphinx.statusCode.ERROR) {
errmsg += "Server issued ERROR: " + this.data;
}
if (this.status == Sphinx.statusCode.RETRY){
errmsg += "Server issued RETRY: " + this.data;
}// if RETRY
return errmsg;
},
runCallbackIfDone : function(search_command) {
if (this.done()) {
var answer;
var cloned = new Buffer(response_output.data.length);
// clone the response data, so we can dequeue and let the server free to modify again response_output
response_output.data.copy(cloned);
_dequeue();
var errmsg = this.checkResponse(search_command);
if (!errmsg) {
answer = parseResponse(response_output.data, search_command);
answer = parseResponse(cloned, search_command);
}
query_callback(errmsg, answer);
}
Expand Down

0 comments on commit 6780b7c

Please sign in to comment.