Skip to content

Commit

Permalink
Added remote host connect support
Browse files Browse the repository at this point in the history
  • Loading branch information
JoeZ99 committed Jan 11, 2012
1 parent f0efcc0 commit 20b1a87
Showing 1 changed file with 49 additions and 38 deletions.
87 changes: 49 additions & 38 deletions limestone.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ var tcp = require('net');

exports.SphinxClient = function() {
var self = { };

var buffer_extras = require('./buffer_extras');

var Sphinx = {
Expand Down Expand Up @@ -86,7 +86,7 @@ exports.SphinxClient = function() {
"RANGE" : 1,
"FLOATRANGE" : 2
};

Sphinx.attribute = {
"INTEGER": 1,
"TIMESTAMP": 2,
Expand All @@ -95,7 +95,7 @@ exports.SphinxClient = function() {
"FLOAT": 5,
"BIGINT": 6,
"STRING": 7,
"MULTI": 0x40000000
"MULTI": 0x40000000
};

var server_conn = null;
Expand All @@ -104,17 +104,28 @@ exports.SphinxClient = function() {
var _queue = [];
var _persistent = false;



// Connect to Sphinx server
self.connect = function() {

// arguments: (port, [persistent], callback).
// arguments: ([host:port], [persistent], callback).
var args = Array.prototype.slice.call(arguments);
var callback = args.pop();
var port = args.length ? args.shift(): Sphinx.port;
var hostport = args.length ? args.shift() + '' : ':'+Sphinx.port;
if(hostport.indexOf(':')==-1){
hostport = isNaN(hostport) ? hostport + ':' + Sphinx.port : ':' + hostport;
}
hostport = hostport.split(':');

var host = hostport[0].trim().length ? hostport[0].trim(): 'localhost' ;
var port = hostport[1].trim().length ? hostport[1].trim() : Sphinx.port;



var persistent = _persistent = args.length ? args.shift() : false;
server_conn = tcp.createConnection(port);
console.log('connecting to : '+host+':'+port);
server_conn = tcp.createConnection(port, host);
server_conn.on('error', function(x){
console.log('Error: '+x);
server_conn.end();
Expand All @@ -123,7 +134,7 @@ exports.SphinxClient = function() {
// disable Nagle algorithm
server_conn.setNoDelay(true);

server_conn.addListener('connect',
server_conn.addListener('connect',
function () {
// Sending protocol version
// Here we must send 4 bytes, '0x00000001'
Expand Down Expand Up @@ -154,7 +165,7 @@ exports.SphinxClient = function() {
if (data_unpacked[""] >= 1) {
//all ok, send my version
server_conn.write(version_number.toBuffer());

if(persistent){
var pers_req = Buffer.makeWriter();
pers_req.push.int16(Sphinx.command.PERSIST);
Expand All @@ -166,15 +177,15 @@ exports.SphinxClient = function() {
server_conn.on('data', readResponseData);
_connected = true;
server_conn.emit('sphinx.connected');

// Use callback
callback(null);

} else {
callback(new Error('Wrong protocol version: ' + protocol_version));
server_conn.end();
}

});
} else {
callback(new Error('Connection is ' + server_conn.readyState + ' in OnConnect'));
Expand Down Expand Up @@ -218,7 +229,7 @@ exports.SphinxClient = function() {
error : "", // per-reply fields (for single-query case)
warning : "",
connerror : false,

reqs : [], // requests storage (for multi-query case)
mbenc : "",
arrayresult : true,
Expand All @@ -228,7 +239,7 @@ exports.SphinxClient = function() {
if (query_raw.query) {
for (x in query_parameters) {
if (query_raw.hasOwnProperty(x)) {
query[x] = query_raw[x];
query[x] = query_raw[x];
} else {
query[x] = query_parameters[x];
}
Expand All @@ -237,38 +248,38 @@ exports.SphinxClient = function() {
query = query_raw.toString();
}

var request = Buffer.makeWriter();
var request = Buffer.makeWriter();
request.push.int16(Sphinx.command.SEARCH);
request.push.int16(Sphinx.clientCommand.SEARCH);

request.push.int32(0); // This will be request length
request.push.int32(0);
request.push.int32(1);

request.push.int32(query.offset);

request.push.int32(query.limit);

request.push.int32(query.mode);
request.push.int32(query.ranker);

request.push.int32(query.sort);
request.push.lstring(query.sortby);

request.push.lstring(query.sortby);
request.push.lstring(query.query); // Query text
request.push.int32(query.weights.length);
request.push.int32(query.weights.length);
for (var weight in query.weights) {
request.push.int32(parseInt(weight));
}

request.push.lstring(query.indexes); // Indexes
request.push.lstring(query.indexes); // Indexes

request.push.int32(1); // id64 range marker

request.push.int64(0, query.min_id); // This is actually supposed to be two 64-bit numbers
request.push.int64(0, query.max_id);
request.push.int64(0, query.max_id);

request.push.int32(query.filters.length);
request.push.int32(query.filters.length);
for (var filter in query.filters) {
request.push.int32(filter.attr.length);
request.push_lstring(filter.attr);
Expand All @@ -290,7 +301,7 @@ exports.SphinxClient = function() {
break;
}
}

request.push.int32(query_parameters.groupfunc);
request.push.lstring(query_parameters.groupby); // Groupby length

Expand Down Expand Up @@ -320,19 +331,19 @@ exports.SphinxClient = function() {
request.push.int32(query_parameters.indexweights[i]);
}

request.push.int32(query_parameters.maxquerytime);
request.push.int32(query_parameters.maxquerytime);
// per-field weights (preferred method)
request.push.int32(Object.keys(query.fieldweights).length);
for (var field_name in query.fieldweights) {
request.push.lstring(field_name);
request.push.int32(query.fieldweights[field_name]);
}

request.push.lstring(query_parameters.comment);
request.push.lstring(query_parameters.comment);

request.push.int32(query_parameters.overrides.length);
for (var i in query_parameters.overrides) {
request.push.lstring(query_parameters.overrides[i].attr);
request.push.lstring(query_parameters.overrides[i].attr);
request.push.int32(query_parameters.overrides[i].type);
request.push.int32(query_parameters.overrides[i].values.length);
for (var id in query_parameters.overrides[i].values) {
Expand All @@ -358,8 +369,8 @@ exports.SphinxClient = function() {
req_length.push.int32(request_buf.length - 8);
req_length.toBuffer().copy(request_buf, 4, 0);

console.log('Sending search request of ' + request_buf.length + ' bytes ');
_enqueue(request_buf, callback, Sphinx.clientCommand.SEARCH);
console.log('Sending search request of ' + request_buf.length + ' bytes ');
_enqueue(request_buf, callback, Sphinx.clientCommand.SEARCH);

};

Expand Down Expand Up @@ -399,7 +410,7 @@ exports.SphinxClient = function() {
'allow_empty' : 256,
'emit_zones' : 256
};

for (x in flag_properties) {
if (passage_opts_raw.hasOwnProperty(x)) {
flags |= flag_properties[x];
Expand All @@ -412,7 +423,7 @@ exports.SphinxClient = function() {
request.push.int16(Sphinx.command.EXCERPT);
request.push.int16(Sphinx.clientCommand.EXCERPT);
request.push.int32(0); // This will be request length

// request 'body' (flags, options, docs)

request.push.int32(0);
Expand All @@ -422,7 +433,7 @@ exports.SphinxClient = function() {
request.push.lstring(index);

request.push.lstring(words);

// options
request.push.lstring(passage_opts.before_match);
request.push.lstring(passage_opts.after_match);
Expand Down Expand Up @@ -470,7 +481,7 @@ exports.SphinxClient = function() {
server_conn.write(req_buf);
});
}
}
}
}

function _dequeue() {
Expand Down Expand Up @@ -544,7 +555,7 @@ exports.SphinxClient = function() {
if (this.status == Sphinx.statusCode.RETRY){
errmsg += "Server issued RETRY: " + this.data;
}

return errmsg;
},
runCallbackIfDone : function(search_command) {
Expand Down Expand Up @@ -678,7 +689,7 @@ exports.SphinxClient = function() {
output.words[word]["docs"] = response.int32();
output.words[word]["hits"] = response.int32();
}

return output;
};

Expand Down

0 comments on commit 20b1a87

Please sign in to comment.