From ab879fa678ff4f046506c3396ca0c2645a2e2f48 Mon Sep 17 00:00:00 2001 From: Joe Z Date: Mon, 21 Nov 2011 21:38:00 -0500 Subject: [PATCH 01/14] Queueing added to limestone. 'query' and 'buildexcerpts' calls are queued and run sequentially to ensure no other command is executed while the acutual command is being run. --- limestone.js | 932 +++++++++++++++++++++++++-------------------------- 1 file changed, 456 insertions(+), 476 deletions(-) diff --git a/limestone.js b/limestone.js index a0458c6..f42733b 100644 --- a/limestone.js +++ b/limestone.js @@ -2,206 +2,174 @@ var tcp = require('net'); exports.SphinxClient = function() { var self = { }; - + var buffer_extras = require('./buffer_extras'); var Sphinx = { - port : 9312 + port : 9312 }; // All search modes Sphinx.searchMode = { - "ALL":0, - "ANY":1, - "PHRASE":2, - "BOOLEAN":3, - "EXTENDED":4, - "FULLSCAN":5, - "EXTENDED2":6 // extended engine V2 (TEMPORARY, WILL BE REMOVED) + "ALL":0, + "ANY":1, + "PHRASE":2, + "BOOLEAN":3, + "EXTENDED":4, + "FULLSCAN":5, + "EXTENDED2":6 // extended engine V2 (TEMPORARY, WILL BE REMOVED) }; // All ranking modes Sphinx.rankingMode = { - "PROXIMITY_BM25" : 0, ///< default mode, phrase proximity major factor and BM25 minor one - "BM25" : 1, ///< statistical mode, BM25 ranking only (faster but worse quality) - "NONE" : 2, ///< no ranking, all matches get a weight of 1 - "WORDCOUNT" : 3, ///< simple word-count weighting, rank is a weighted sum of per-field keyword occurence counts - "PROXIMITY" : 4, - "MATCHANY" : 5, - "FIELDMASK" : 6, - "SPH04" : 7, - "TOTAL" : 8 + "PROXIMITY_BM25" : 0, ///< default mode, phrase proximity major factor and BM25 minor one + "BM25" : 1, ///< statistical mode, BM25 ranking only (faster but worse quality) + "NONE" : 2, ///< no ranking, all matches get a weight of 1 + "WORDCOUNT" : 3, ///< simple word-count weighting, rank is a weighted sum of per-field keyword occurence counts + "PROXIMITY" : 4, + "MATCHANY" : 5, + "FIELDMASK" : 6, + "SPH04" : 7, + "TOTAL" : 8 }; Sphinx.sortMode = { - "RELEVANCE" : 0, - "ATTR_DESC" : 1, - "ATTR_ASC" : 2, - "TIME_SEGMENTS" : 3, - "EXTENDED" : 4, - "EXPR" : 5 + "RELEVANCE" : 0, + "ATTR_DESC" : 1, + "ATTR_ASC" : 2, + "TIME_SEGMENTS" : 3, + "EXTENDED" : 4, + "EXPR" : 5 }; Sphinx.groupFunc = { - "DAY" : 0, - "WEEK" : 1, - "MONTH" : 2, - "YEAR" : 3, - "ATTR" : 4, - "ATTRPAIR" : 5 + "DAY" : 0, + "WEEK" : 1, + "MONTH" : 2, + "YEAR" : 3, + "ATTR" : 4, + "ATTRPAIR" : 5 }; // Commands Sphinx.command = { - "SEARCH" : 0, - "EXCERPT" : 1, - "UPDATE" : 2, - "KEYWORDS" : 3, - "PERSIST" : 4, - "STATUS" : 5, - "QUERY" : 6, - "FLUSHATTRS" : 7 + "SEARCH" : 0, + "EXCERPT" : 1, + "UPDATE" : 2, + "KEYWORDS" : 3, + "PERSIST" : 4, + "STATUS" : 5, + "QUERY" : 6, + "FLUSHATTRS" : 7 }; // Current version client commands Sphinx.clientCommand = { - "SEARCH" : 0x118, - "EXCERPT" : 0x103, - "UPDATE" : 0x102, - "KEYWORDS" : 0x100, - "STATUS" : 0x100, - "QUERY" : 0x100, - "FLUSHATTRS": 0x100 + "SEARCH" : 0x118, + "EXCERPT" : 0x103, + "UPDATE" : 0x102, + "KEYWORDS" : 0x100, + "STATUS" : 0x100, + "QUERY" : 0x100, + "FLUSHATTRS": 0x100 }; Sphinx.statusCode = { - "OK": 0, - "ERROR": 1, - "RETRY": 2, - "WARNING": 3 + "OK": 0, + "ERROR": 1, + "RETRY": 2, + "WARNING": 3 }; Sphinx.filterTypes = { - "VALUES" : 0, - "RANGE" : 1, - "FLOATRANGE" : 2 + "VALUES" : 0, + "RANGE" : 1, + "FLOATRANGE" : 2 }; - + Sphinx.attribute = { - "INTEGER": 1, - "TIMESTAMP": 2, - "ORDINAL": 3, - "BOOL": 4, - "FLOAT": 5, - "BIGINT": 6, - "STRING": 7, - "MULTI": 0x40000000 + "INTEGER": 1, + "TIMESTAMP": 2, + "ORDINAL": 3, + "BOOL": 4, + "FLOAT": 5, + "BIGINT": 6, + "STRING": 7, + "MULTI": 0x40000000 }; - var server_conn; - var connection_status; + var server_conn = null; var response_output; - var conn_in_progress = 0; - - var search_commands = []; + var _connected = false; + var _queue = []; + var _persistent = false; + + // Connect to Sphinx server - self.connect = function(port, persistent, callback) { + self.connect = function() { - // arguments: port, persistent, callback. + // arguments: (port, [persistent], callback). var args = Array.prototype.slice.call(arguments); - var callback = args.pop(); var port = args.length ? args.shift(): Sphinx.port; - var persistent = args.length ? args.shift() : false; - - - // very ugly method of making sure no attempt to connection is made until all previous are done - if(conn_in_progress == 1){ - setTimeout(self.connect,10,port,callback); - return; - } - - server_conn = tcp.createConnection(port); - conn_in_progress = 1; - // disable Nagle algorithm - server_conn.setNoDelay(true); - //server_conn.setEncoding('binary'); - - response_output = null; - - //var promise = new process.Promise(); - - server_conn.addListener('connect', function () { - - // console.log('Connected, sending protocol version... State is ' + server_conn.readyState); - // Sending protocol version - // console.log('Sending version number...'); - // Here we must send 4 bytes, '0x00000001' - if (server_conn.readyState == 'open') { - var version_number = Buffer.makeWriter(); - version_number.push.int32(1); - // Waiting for answer - server_conn.once('data', function(data) { - /*if (response_output) { - console.log('connect: Data received from server'); - }*/ - - var protocol_version_raw = data.toReader(); - var protocol_version = protocol_version_raw.int32(); - var data_unpacked = {'': protocol_version}; - - if (data_unpacked[""] >= 1) { - - // Simple connection status indicator - connection_status = 1; - - server_conn.write(version_number.toBuffer()); - - if(persistent){ - var pers_req = Buffer.makeWriter(); - pers_req.push.int16(Sphinx.command.PERSIST); - pers_req.push.int16(0); - pers_req.push.int32(4); - pers_req.push.int32(1); - server_conn.write(pers_req.toBuffer()); - } - - server_conn.on('data', readResponseData); - - // Use callback - // promise.emitSuccess(); - callback(null); - - } else { - callback(new Error('Wrong protocol version: ' + protocol_version)); - conn_in_progress = 0; - server_conn.end(); - } - - }); - server_conn.on('error', function(exp) { - console.log('Error: ' + exp); - server_conn.end(); - conn_in_progress = 0; - }); - } else { - callback(new Error('Connection is ' + server_conn.readyState + ' in OnConnect')); - server_conn.end(); - conn_in_progress = 0; - } - }); - + var persistent = _persistent = args.length ? args.shift() : false; + + server_conn = tcp.createConnection(port); + // disable Nagle algorithm + server_conn.setNoDelay(true); + server_conn.addListener('connect', + function () { + // Sending protocol version + // Here we must send 4 bytes, '0x00000001' + if (server_conn.readyState == 'open') { + var version_number = Buffer.makeWriter(); + version_number.push.int32(1); + // Waiting for answer + server_conn.once('data', function(data) { + var protocol_version_raw = data.toReader(); + var protocol_version = protocol_version_raw.int32(); + var data_unpacked = {'': protocol_version}; + if (data_unpacked[""] >= 1) { + server_conn.write(version_number.toBuffer()); + + if(persistent){ + var pers_req = Buffer.makeWriter(); + pers_req.push.int16(Sphinx.command.PERSIST); + pers_req.push.int16(0); + pers_req.push.int32(4); + pers_req.push.int32(1); + server_conn.write(pers_req.toBuffer()); + } + server_conn.emit('sphinx.connected'); + _connected = true; + server_conn.on('data', readResponseData); + + // Use callback + callback(null); + + } else { + callback(new Error('Wrong protocol version: ' + protocol_version)); + server_conn.end(); + } + + }); + server_conn.on('error', function(exp) { + console.log('Error: ' + exp); + server_conn.end(); + }); + } else { + callback(new Error('Connection is ' + server_conn.readyState + ' in OnConnect')); + server_conn.end(); + _connected = false; + } + }); }; - // console.log('Connecting to searchd...'); - self.query = function(query_raw, callback) { - var query = new Object(); + var query = new Object(); - initResponseOutput(callback); - - var query_parameters = { + var query_parameters = { offset : 0, limit : 20, mode : Sphinx.searchMode.ALL, @@ -226,9 +194,9 @@ exports.SphinxClient = function() { fieldweights : {}, overrides : [], selectlist : "*", - indexes : '*', - comment : '', - query : "", + indexes : '*', + comment : '', + query : "", error : "", // per-reply fields (for single-query case) warning : "", connerror : false, @@ -237,164 +205,149 @@ exports.SphinxClient = function() { mbenc : "", arrayresult : true, timeout : 0 - }; - - if (query_raw.query) { - for (x in query_parameters) { - if (query_raw.hasOwnProperty(x)) { - query[x] = query_raw[x]; - } else { - query[x] = query_parameters[x]; - } - } - } else { - query = query_raw.toString(); - } - - /* if (connection_status != 1) { - console.log("You must connect to server before issuing queries"); - return false; - - } */ + }; + + if (query_raw.query) { + for (x in query_parameters) { + if (query_raw.hasOwnProperty(x)) { + query[x] = query_raw[x]; + } else { + query[x] = query_parameters[x]; + } + } + } else { + query = query_raw.toString(); + } var request = Buffer.makeWriter(); - request.push.int16(Sphinx.command.SEARCH); + request.push.int16(Sphinx.command.SEARCH); request.push.int16(Sphinx.clientCommand.SEARCH); - - request.push.int32(0); // This will be request length - request.push.int32(0); - request.push.int32(1); - + + request.push.int32(0); // This will be request length + request.push.int32(0); + request.push.int32(1); + request.push.int32(query.offset); - + request.push.int32(query.limit); request.push.int32(query.mode); request.push.int32(query.ranker); request.push.int32(query.sort); - - request.push.lstring(query.sortby); - request.push.lstring(query.query); // Query text - request.push.int32(query.weights.length); - for (var weight in query.weights) { - request.push.int32(parseInt(weight)); - } - - request.push.lstring(query.indexes); // Indexes used JEZ - - request.push.int32(1); // id64 range marker - - //request.push.int32(0); - request.push.int64(0, query.min_id); // This is actually supposed to be two 64-bit numbers - //request.push.int32(0); // However, there is a caveat about using 64-bit ids - request.push.int64(0, query.max_id); - - request.push.int32(query.filters.length); - for (var filter in query.filters) { - request.push.int32(filter.attr.length); - request.push_lstring(filter.attr); - request.push.int32(filter.type); - switch (filter.type) { - case Sphinx.filterTypes.VALUES: - request.push.int32(filter.values.length); - for (var value in filter.values) { - //request.push.int32(0); // should be a 64-bit number - request.push.int64(0, value); - } - break; - case Sphinx.filterTypes.RANGE: - //request.push.int32(0); // should be a 64-bit number - request.push.int64(0, filter.min); - //request.push.int32(0); // should be a 64-bit number - request.push.int64(0, filter.max); - break; - case Sphinx.filterTypes.FLOATRANGE: - request.push.float(filter.min); - request.push.float(filter.max); - break; - } - } - - request.push.int32(query_parameters.groupfunc); - request.push.lstring(query_parameters.groupby); // Groupby length - - request.push.int32(query_parameters.maxmatches); // Maxmatches, default to 1000 - - request.push.lstring(query_parameters.groupsort); // Groupsort - - request.push.int32(query_parameters.cutoff); // Cutoff - request.push.int32(query_parameters.retrycount); // Retrycount - request.push.int32(query_parameters.retrydelay); // Retrydelay - - request.push.lstring(query_parameters.groupdistinct); // Group distinct - - if (query_parameters.anchor.length == 0) { - request.push.int32(0); // no anchor given - } else { - request.push.int32(1); // anchor point in radians - request.push.lstring(query_parameters.anchor["attrlat"]); // Group distinct - request.push.lstring(query_parameters.anchor["attrlong"]); // Group distinct - request.push.float(query_parameters.anchor["lat"]); - request.push.float(query_parameters.anchor["long"]); - } - - request.push.int32(query_parameters.indexweights.length); - for (var i in query_parameters.indexweights) { - request.push.int32(i); - request.push.int32(query_parameters.indexweights[i]); - } - - request.push.int32(query_parameters.maxquerytime); - // per-field weights (preferred method) - request.push.int32(Object.keys(query.fieldweights).length); - for (var field_name in query.fieldweights) { + + request.push.lstring(query.sortby); + request.push.lstring(query.query); // Query text + request.push.int32(query.weights.length); + for (var weight in query.weights) { + request.push.int32(parseInt(weight)); + } + + request.push.lstring(query.indexes); // Indexes + + request.push.int32(1); // id64 range marker + + request.push.int64(0, query.min_id); // This is actually supposed to be two 64-bit numbers + request.push.int64(0, query.max_id); + + request.push.int32(query.filters.length); + for (var filter in query.filters) { + request.push.int32(filter.attr.length); + request.push_lstring(filter.attr); + request.push.int32(filter.type); + switch (filter.type) { + case Sphinx.filterTypes.VALUES: + request.push.int32(filter.values.length); + for (var value in filter.values) { + request.push.int64(0, value); + } + break; + case Sphinx.filterTypes.RANGE: + request.push.int64(0, filter.min); + request.push.int64(0, filter.max); + break; + case Sphinx.filterTypes.FLOATRANGE: + request.push.float(filter.min); + request.push.float(filter.max); + break; + } + } + + request.push.int32(query_parameters.groupfunc); + request.push.lstring(query_parameters.groupby); // Groupby length + + request.push.int32(query_parameters.maxmatches); // Maxmatches, default to 1000 + + request.push.lstring(query_parameters.groupsort); // Groupsort + + request.push.int32(query_parameters.cutoff); // Cutoff + request.push.int32(query_parameters.retrycount); // Retrycount + request.push.int32(query_parameters.retrydelay); // Retrydelay + + request.push.lstring(query_parameters.groupdistinct); // Group distinct + + if (query_parameters.anchor.length == 0) { + request.push.int32(0); // no anchor given + } else { + request.push.int32(1); // anchor point in radians + request.push.lstring(query_parameters.anchor["attrlat"]); // Group distinct + request.push.lstring(query_parameters.anchor["attrlong"]); // Group distinct + request.push.float(query_parameters.anchor["lat"]); + request.push.float(query_parameters.anchor["long"]); + } + + request.push.int32(query_parameters.indexweights.length); + for (var i in query_parameters.indexweights) { + request.push.int32(i); + request.push.int32(query_parameters.indexweights[i]); + } + + request.push.int32(query_parameters.maxquerytime); + // per-field weights (preferred method) + request.push.int32(Object.keys(query.fieldweights).length); + for (var field_name in query.fieldweights) { request.push.lstring(field_name); - request.push.int32(query.fieldweights[field_name]); - } - - request.push.lstring(query_parameters.comment); - - request.push.int32(query_parameters.overrides.length); - for (var i in query_parameters.overrides) { - request.push.lstring(query_parameters.overrides[i].attr); - request.push.int32(query_parameters.overrides[i].type); - request.push.int32(query_parameters.overrides[i].values.length); - for (var id in query_parameters.overrides[i].values) { - request.push.int64(id); - switch (query_parameters.overrides[i].type) { - case Sphinx.attribute.FLOAT: - request.push.float(query_parameters.overrides[i].values[id]); - break; - case Sphinx.attribute.BIGINT: - request.push.int64(query_parameters.overrides[i].values[id]); - break; - default: - request.push.int32(query_parameters.overrides[i].values[id]); - break; - } - } - } - - request.push.lstring(query_parameters.selectlist); // Select-list - - var request_buf = request.toBuffer(); - var req_length = Buffer.makeWriter(); - req_length.push.int32(request_buf.length - 8); - req_length.toBuffer().copy(request_buf, 4, 0); - - console.log('Sending search request of ' + request_buf.length + ' bytes'); - - server_conn.write(request_buf); - // we also add the command to the search_commands queue - search_commands.push(Sphinx.clientCommand.SEARCH); + request.push.int32(query.fieldweights[field_name]); + } + + request.push.lstring(query_parameters.comment); + + request.push.int32(query_parameters.overrides.length); + for (var i in query_parameters.overrides) { + request.push.lstring(query_parameters.overrides[i].attr); + request.push.int32(query_parameters.overrides[i].type); + request.push.int32(query_parameters.overrides[i].values.length); + for (var id in query_parameters.overrides[i].values) { + request.push.int64(id); + switch (query_parameters.overrides[i].type) { + case Sphinx.attribute.FLOAT: + request.push.float(query_parameters.overrides[i].values[id]); + break; + case Sphinx.attribute.BIGINT: + request.push.int64(query_parameters.overrides[i].values[id]); + break; + default: + request.push.int32(query_parameters.overrides[i].values[id]); + break; + } + } + } + + request.push.lstring(query_parameters.selectlist); // Select-list + + var request_buf = request.toBuffer(); + var req_length = Buffer.makeWriter(); + req_length.push.int32(request_buf.length - 8); + req_length.toBuffer().copy(request_buf, 4, 0); + + console.log('Sending search request of ' + request_buf.length + ' bytes'); + + _enqueue(request_buf, callback, Sphinx.clientCommand.SEARCH); }; self.build_excerpts = function(docs, index, words, passage_opts_raw, callback){ var passage_opts = new Object(); - initResponseOutput(callback); - var passage_parameters = { before_match : '', after_match : '', @@ -405,7 +358,7 @@ exports.SphinxClient = function() { limit_words : 0, around : 5, start_passage_id : 1, - passage_boundary : 'none', + passage_boundary : 'none' } for (x in passage_parameters) { @@ -476,89 +429,116 @@ exports.SphinxClient = function() { req_length.toBuffer().copy(request_buf,4,0); console.log('Sending build excerpt request of ' + request_buf.length + 'bytes'); - - server_conn.write(request_buf); - // we also add the command to the search_commands queue - search_commands.push(Sphinx.clientCommand.EXCERPT); + _enqueue(request_buf, callback, Sphinx.clientCommand.EXCERPT); }; // build_excerpts self.disconnect = function() { - conn_in_progress = 0; - server_conn.end(); + server_conn.end(); }; + function _enqueue(req_buf , cb, sc) { + if(!server_conn || !server_conn.writable){ + throw new Error('Trying to enqueue. Not connected'); + } + _queue.push({request_buffer: req_buf, callback: cb, search_command: sc}); + if(_queue.length === 1) + { + if(_connected) { + initResponseOutput(cb); + server_conn.write(req_buf); + } else { + server_conn.once('sphinx.connected', function(){ + initResponseOutput(cb); + server_conn.write(req_buf); + }); + } + } + } + + function _dequeue() { + _queue.shift(); + if(!_queue.length){ + return; + } + if(!_persistent){ + server_conn = null; + return; + } + if(!server_conn){ + throw new Error("Trying to dequeue. Not connected"); + } + // we run the next server request in line + initResponseOutput(_queue[0]['callback']); + server_conn.write(_queue[0]['request_buffer']); + } + function readResponseData(data) { - // Got response! - // Command must match the one used in query - response_output.append(data); - response_output.runCallbackIfDone(search_commands.shift()); + // Got response! + // Command must match the one used in query + response_output.append(data); + response_output.runCallbackIfDone(_queue[0]['search_command']); } function initResponseOutput(query_callback) { - response_output = { - status : null, - version : null, - length : 0, - data : new Buffer(0), - parseHeader : function() { - if (this.status === null && this.data.length >= 8) { - // console.log('Answer length: ' + (this.data.length)); - var decoder = this.data.toReader(); - // var decoder = new bits.Decoder(this.data); - - this.status = decoder.int16(); - this.version = decoder.int16(); - this.length = decoder.int32(); - // console.log('Receiving answer with status ' + this.status + ', version ' + this.version + ' and length ' + this.length); + response_output = { + status : null, + version : null, + length : 0, + data : new Buffer(0), + parseHeader : function() { + if (this.status === null && this.data.length >= 8) { + var decoder = this.data.toReader(); + // var decoder = new bits.Decoder(this.data); + + this.status = decoder.int16(); + this.version = decoder.int16(); + this.length = decoder.int32(); this.data = this.data.slice(8, this.data.length); - // this.data = decoder.string(this.data.length - 8); - } - }, - append : function(data) { - //this.data.write(data.toString('utf-8'), 'utf-8'); - // console.log('Appending ' + data.length + ' bytes'); - var new_buffer = new Buffer(this.data.length + data.length); - this.data.copy(new_buffer, 0, 0); - data.copy(new_buffer, this.data.length, 0); - this.data = new_buffer; - // console.log('Data length after appending: ' + this.data.length); - this.parseHeader(); - }, - done : function() { - // console.log('Length: ' + this.data.length + ' / ' + this.length); - return this.data.length >= this.length; - }, - checkResponse : function(search_command) { - var errmsg = ''; - if (this.length !== this.data.length) { - errmsg += "Failed to read searchd response (status=" + this.status + ", ver=" + this.version + ", len=" + this.length + ", read=" + this.data.length + ")"; - } - - if (this.version < search_command) { - errmsg += "Searchd command older than client's version, some options might not work"; - } - - if (this.status == Sphinx.statusCode.WARNING) { - errmsg += "Server issued WARNING: " + this.data; - } - - if (this.status == Sphinx.statusCode.ERROR) { - errmsg += "Server issued ERROR: " + this.data; - } - return errmsg; - }, - runCallbackIfDone : function(search_command) { - if (this.done()) { - var answer; - var errmsg = this.checkResponse(search_command); - if (!errmsg) { - answer = parseResponse(response_output.data, search_command); - } - query_callback(errmsg, answer); - } - } - }; + // this.data = decoder.string(this.data.length - 8); + } + }, + append : function(data) { + var new_buffer = new Buffer(this.data.length + data.length); + this.data.copy(new_buffer, 0, 0); + data.copy(new_buffer, this.data.length, 0); + this.data = new_buffer; + this.parseHeader(); + }, + done : function() { + return this.data.length >= this.length; + }, + checkResponse : function(search_command) { + var errmsg = ''; + if (this.length !== this.data.length) { + errmsg += "Failed to read searchd response (status=" + this.status + ", ver=" + this.version + ", len=" + this.length + ", read=" + this.data.length + ")"; + } + + if (this.version < search_command) { + errmsg += "Searchd command older than client's version, some options might not work"; + } + + if (this.status == Sphinx.statusCode.WARNING) { + errmsg += "Server issued WARNING: " + this.data; + } + + if (this.status == Sphinx.statusCode.ERROR) { + errmsg += "Server issued ERROR: " + this.data; + } + return errmsg; + }, + runCallbackIfDone : function(search_command) { + if (this.done()) { + var answer; + var errmsg = this.checkResponse(search_command); + if (!errmsg) { + answer = parseResponse(response_output.data, search_command); + } + _dequeue(); + query_callback(errmsg, answer); + } + } + }; } var parseResponse = function (data, search_command) { @@ -570,112 +550,112 @@ exports.SphinxClient = function() { } var parseSearchResponse = function (data) { - var output = {}; - // var response = new bits.Decoder(data); - var response = data.toReader(); - var i; - output.status = response.int32(); + var output = {}; + // var response = new bits.Decoder(data); + var response = data.toReader(); + var i; + output.status = response.int32(); if (output.status != 0) { - return(response.lstring()); + return(response.lstring()); + } + output.num_fields = response.int32(); + + output.fields = []; + output.attributes = []; + output.matches = []; + + // Get fields + for (i = 0; i < output.num_fields; i++) { + var field = {}; + + field.name = response.lstring(); + + output.fields.push(field); } - output.num_fields = response.int32(); - - output.fields = []; - output.attributes = []; - output.matches = []; - - // Get fields - for (i = 0; i < output.num_fields; i++) { - var field = {}; - - field.name = response.lstring(); - - output.fields.push(field); - } - - output.num_attrs = response.int32(); - - // Get attributes - for (i = 0; i < output.num_attrs; i++) { - var attribute = {}; - - attribute.name = response.lstring(); - attribute.type = response.int32(); - output.attributes.push(attribute); - } - - output.match_count = response.int32(); - output.id64 = response.int32(); - - // Get matches - for (i = 0; i < output.match_count; i++) { - var match = {}; - - // Here server tells us which format for document IDs - // it uses: int64 or int32 - if (output.id64 == 1) { - // get the 64-bit result, but only use the lower half for now - var id64 = response.int64(); - match.doc = id64[1]; - match.weight = response.int32(); - } else { - // Good news: document id fits our integers size :) - match.doc = response.int32(); - match.weight = response.int32(); - } - - match.attrs = {}; - - // - var attr_value; - // var attribute; - for (attribute in output.attributes) { - // BIGINT size attributes (64 bits) - if (output.attributes[attribute].type == Sphinx.attribute.BIGINT) { - attr_value = response.int32(); - attr_value = response.int32(); - match.attrs[output.attributes[attribute].name] = attr_value; - continue; - } - - // FLOAT size attributes (32 bits) - if (output.attributes[attribute].type == Sphinx.attribute.FLOAT) { - attr_value = response.int32(); - match.attrs[output.attributes[attribute].name] = attr_value; - continue; - } - - // STRING attributes - if (output.attributes[attribute].type == Sphinx.attribute.STRING) { - attr_value = response.lstring(); - match.attrs[output.attributes[attribute].name] = attr_value; - continue; - } - - // We don't need this branch right now, - // as it is covered by previous `if` - // @todo: implement MULTI attribute type - attr_value = response.int32(); - match.attrs[output.attributes[attribute].name] = attr_value; - } - - output.matches.push(match); - - } - - output.total = response.int32(); - output.total_found = response.int32(); - output.msecs = response.int32(); - output.words_count = response.int32(); - output.words = new Object(); - for (i = 0; i < output.words_count; i++) { - var word = response.lstring(); - output.words[word] = new Object(); - output.words[word]["docs"] = response.int32(); - output.words[word]["hits"] = response.int32(); - } - - return output; + + output.num_attrs = response.int32(); + + // Get attributes + for (i = 0; i < output.num_attrs; i++) { + var attribute = {}; + + attribute.name = response.lstring(); + attribute.type = response.int32(); + output.attributes.push(attribute); + } + + output.match_count = response.int32(); + output.id64 = response.int32(); + + // Get matches + for (i = 0; i < output.match_count; i++) { + var match = {}; + + // Here server tells us which format for document IDs + // it uses: int64 or int32 + if (output.id64 == 1) { + // get the 64-bit result, but only use the lower half for now + var id64 = response.int64(); + match.doc = id64[1]; + match.weight = response.int32(); + } else { + // Good news: document id fits our integers size :) + match.doc = response.int32(); + match.weight = response.int32(); + } + + match.attrs = {}; + + // + var attr_value; + // var attribute; + for (attribute in output.attributes) { + // BIGINT size attributes (64 bits) + if (output.attributes[attribute].type == Sphinx.attribute.BIGINT) { + attr_value = response.int32(); + attr_value = response.int32(); + match.attrs[output.attributes[attribute].name] = attr_value; + continue; + } + + // FLOAT size attributes (32 bits) + if (output.attributes[attribute].type == Sphinx.attribute.FLOAT) { + attr_value = response.int32(); + match.attrs[output.attributes[attribute].name] = attr_value; + continue; + } + + // STRING attributes + if (output.attributes[attribute].type == Sphinx.attribute.STRING) { + attr_value = response.lstring(); + match.attrs[output.attributes[attribute].name] = attr_value; + continue; + } + + // We don't need this branch right now, + // as it is covered by previous `if` + // @todo: implement MULTI attribute type + attr_value = response.int32(); + match.attrs[output.attributes[attribute].name] = attr_value; + } + + output.matches.push(match); + + } + + output.total = response.int32(); + output.total_found = response.int32(); + output.msecs = response.int32(); + output.words_count = response.int32(); + output.words = new Object(); + for (i = 0; i < output.words_count; i++) { + var word = response.lstring(); + output.words[word] = new Object(); + output.words[word]["docs"] = response.int32(); + output.words[word]["hits"] = response.int32(); + } + + return output; }; var parseExcerptResponse = function (data) { From 3ed0937e0dea40d4bdfb6df5cd317b85eecb3cc6 Mon Sep 17 00:00:00 2001 From: Joe Z Date: Mon, 21 Nov 2011 21:49:15 -0500 Subject: [PATCH 02/14] readme file updated --- README.markdown | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) diff --git a/README.markdown b/README.markdown index b56abc3..46f4ac6 100644 --- a/README.markdown +++ b/README.markdown @@ -77,3 +77,19 @@ You can ask sphinx to open a persistent connection. You can then make several re } ); }); + +Limestone is queueing now: +You can safely call limestone.query or limestone.build_excerpts methods outside the scope of the callback functions, provided the connection is made persistent. Limestone will enqueue the sphinx commands and run them sequentially. + +This works: + limestone.connect(9312, // port. 9312 is standard Sphinx port + function(err) { // callback + ... + limestone.query( + {'query':'test', maxmatches:1}, + function(err, answer) { + .... + }); + }); + + limestone.query({'second query':'test'}, function(err, answer){..}); // won't crash with previous From e9aee8b2d79a7b6f213274a19c19e58c8710b999 Mon Sep 17 00:00:00 2001 From: Joe Z Date: Mon, 21 Nov 2011 21:52:30 -0500 Subject: [PATCH 03/14] small readme fix --- README.markdown | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.markdown b/README.markdown index 46f4ac6..8dbb3d5 100644 --- a/README.markdown +++ b/README.markdown @@ -82,7 +82,7 @@ Limestone is queueing now: You can safely call limestone.query or limestone.build_excerpts methods outside the scope of the callback functions, provided the connection is made persistent. Limestone will enqueue the sphinx commands and run them sequentially. This works: - limestone.connect(9312, // port. 9312 is standard Sphinx port + limestone.connect(9312, // port. 9312 is standard Sphinx port function(err) { // callback ... limestone.query( From 05f3229b7fdeaa847f3beeea6bf5bede9a96394a Mon Sep 17 00:00:00 2001 From: Joe Z Date: Mon, 21 Nov 2011 21:56:56 -0500 Subject: [PATCH 04/14] small readme fix II --- README.markdown | 1 + 1 file changed, 1 insertion(+) diff --git a/README.markdown b/README.markdown index 8dbb3d5..a738468 100644 --- a/README.markdown +++ b/README.markdown @@ -82,6 +82,7 @@ Limestone is queueing now: You can safely call limestone.query or limestone.build_excerpts methods outside the scope of the callback functions, provided the connection is made persistent. Limestone will enqueue the sphinx commands and run them sequentially. This works: + limestone.connect(9312, // port. 9312 is standard Sphinx port function(err) { // callback ... From 34e25c61c5a882f2f16b328044cf21414ebee3c7 Mon Sep 17 00:00:00 2001 From: Joe Z Date: Wed, 23 Nov 2011 11:50:34 -0500 Subject: [PATCH 05/14] _dequeue moved up. - dequeue op can be done at the very instant the sphinx server is free from its duty. no need to wait for "parseResponse" function --- limestone.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/limestone.js b/limestone.js index f42733b..f530438 100644 --- a/limestone.js +++ b/limestone.js @@ -529,12 +529,12 @@ exports.SphinxClient = function() { }, runCallbackIfDone : function(search_command) { if (this.done()) { + _dequeue(); var answer; var errmsg = this.checkResponse(search_command); if (!errmsg) { answer = parseResponse(response_output.data, search_command); } - _dequeue(); query_callback(errmsg, answer); } } From fc8789fe08daec98b03bcf2948612b5a2a8ec9d4 Mon Sep 17 00:00:00 2001 From: Joe Z Date: Thu, 24 Nov 2011 13:52:18 -0500 Subject: [PATCH 06/14] Better handling of 'error' connection event. also, better handling for concurrency. - before dequeuing, clone the response_output.data buffer so it can be used by parseResponse at the same time that the sphinx server is free again to build a new response_output in the meanwhile --- limestone.js | 31 ++++++++++++++++++------------- 1 file changed, 18 insertions(+), 13 deletions(-) diff --git a/limestone.js b/limestone.js index f530438..4f4ad06 100644 --- a/limestone.js +++ b/limestone.js @@ -114,10 +114,15 @@ exports.SphinxClient = function() { var callback = args.pop(); var port = args.length ? args.shift(): Sphinx.port; var persistent = _persistent = args.length ? args.shift() : false; - - server_conn = tcp.createConnection(port); + server_conn = tcp.createConnection(port); + server_conn.once('error', function(x){ + console.log('Error: '+x); + server_conn.end(); + callback(x); + }); // disable Nagle algorithm server_conn.setNoDelay(true); + server_conn.addListener('connect', function () { // Sending protocol version @@ -154,10 +159,6 @@ exports.SphinxClient = function() { } }); - server_conn.on('error', function(exp) { - console.log('Error: ' + exp); - server_conn.end(); - }); } else { callback(new Error('Connection is ' + server_conn.readyState + ' in OnConnect')); server_conn.end(); @@ -340,7 +341,7 @@ exports.SphinxClient = function() { req_length.push.int32(request_buf.length - 8); req_length.toBuffer().copy(request_buf, 4, 0); - console.log('Sending search request of ' + request_buf.length + ' bytes'); + console.log('Sending search request of ' + request_buf.length + ' bytes'); _enqueue(request_buf, callback, Sphinx.clientCommand.SEARCH); }; @@ -359,7 +360,7 @@ exports.SphinxClient = function() { around : 5, start_passage_id : 1, passage_boundary : 'none' - } + }; for (x in passage_parameters) { if (passage_opts_raw.hasOwnProperty(x)) { @@ -380,7 +381,7 @@ exports.SphinxClient = function() { 'load_files' : 128, 'allow_empty' : 256, 'emit_zones' : 256 - } + }; for (x in flag_properties) { if (passage_opts_raw.hasOwnProperty(x)) { @@ -529,12 +530,16 @@ exports.SphinxClient = function() { }, runCallbackIfDone : function(search_command) { if (this.done()) { - _dequeue(); var answer; + var cloned = new Buffer(response_output.data.length); + // clone the response data, so we can dequeue and let the server free to modify again response_output + response_output.data.copy(cloned); + _dequeue(); var errmsg = this.checkResponse(search_command); if (!errmsg) { - answer = parseResponse(response_output.data, search_command); + answer = parseResponse(cloned, search_command); } + query_callback(errmsg, answer); } } @@ -547,11 +552,11 @@ exports.SphinxClient = function() { } else if (search_command == Sphinx.clientCommand.EXCERPT) { return parseExcerptResponse(data); } - } + return null; + }; var parseSearchResponse = function (data) { var output = {}; - // var response = new bits.Decoder(data); var response = data.toReader(); var i; output.status = response.int32(); From 08783760ef9be20170ea00bf174c068c52ed250d Mon Sep 17 00:00:00 2001 From: Joe Z Date: Tue, 29 Nov 2011 00:29:07 -0500 Subject: [PATCH 07/14] make limestone recognizes when sphinx sends a "retry" message --- limestone.js | 33 +++++++++++++++++++++++++++------ 1 file changed, 27 insertions(+), 6 deletions(-) diff --git a/limestone.js b/limestone.js index 4f4ad06..26189d0 100644 --- a/limestone.js +++ b/limestone.js @@ -1,4 +1,5 @@ var tcp = require('net'); +var uuid = require('node-uuid'); exports.SphinxClient = function() { var self = { }; @@ -134,8 +135,25 @@ exports.SphinxClient = function() { server_conn.once('data', function(data) { var protocol_version_raw = data.toReader(); var protocol_version = protocol_version_raw.int32(); + // if there still data? process and callback + if(!protocol_version_raw.empty()) { + status_code = protocol_version_raw.int16(); + version = protocol_version_raw.int16(); + server_message = protocol_version_raw.lstring(); + if(status_code == Sphinx.statusCode.ERROR){ + errmsg = 'Server issued ERROR: '+server_message; + } + if(status_code == Sphinx.statusCode.RETRY){ + errmsg = 'Server issued RETRY: '+server_message; + } + if(errmsg){ + callback(errmsg); + } + } + var data_unpacked = {'': protocol_version}; if (data_unpacked[""] >= 1) { + //all ok, send my version server_conn.write(version_number.toBuffer()); if(persistent){ @@ -146,9 +164,9 @@ exports.SphinxClient = function() { pers_req.push.int32(1); server_conn.write(pers_req.toBuffer()); } - server_conn.emit('sphinx.connected'); - _connected = true; server_conn.on('data', readResponseData); + _connected = true; + server_conn.emit('sphinx.connected'); // Use callback callback(null); @@ -341,7 +359,7 @@ exports.SphinxClient = function() { req_length.push.int32(request_buf.length - 8); req_length.toBuffer().copy(request_buf, 4, 0); - console.log('Sending search request of ' + request_buf.length + ' bytes'); + console.log('Sending search request of ' + request_buf.length + ' bytes '); _enqueue(request_buf, callback, Sphinx.clientCommand.SEARCH); }; @@ -439,6 +457,7 @@ exports.SphinxClient = function() { function _enqueue(req_buf , cb, sc) { if(!server_conn || !server_conn.writable){ + throw new Error('Trying to enqueue. Not connected'); } _queue.push({request_buffer: req_buf, callback: cb, search_command: sc}); @@ -475,7 +494,6 @@ exports.SphinxClient = function() { function readResponseData(data) { // Got response! - // Command must match the one used in query response_output.append(data); response_output.runCallbackIfDone(_queue[0]['search_command']); } @@ -489,14 +507,12 @@ exports.SphinxClient = function() { parseHeader : function() { if (this.status === null && this.data.length >= 8) { var decoder = this.data.toReader(); - // var decoder = new bits.Decoder(this.data); this.status = decoder.int16(); this.version = decoder.int16(); this.length = decoder.int32(); this.data = this.data.slice(8, this.data.length); - // this.data = decoder.string(this.data.length - 8); } }, append : function(data) { @@ -526,6 +542,11 @@ exports.SphinxClient = function() { if (this.status == Sphinx.statusCode.ERROR) { errmsg += "Server issued ERROR: " + this.data; } + + if (this.status == Sphinx.statusCode.RETRY){ + errmsg += "Server issued RETRY: " + this.data; + } + return errmsg; }, runCallbackIfDone : function(search_command) { From 428fd0516099c9043f84a4fdacc7869a69f162cf Mon Sep 17 00:00:00 2001 From: Joe Z Date: Tue, 29 Nov 2011 12:50:23 -0500 Subject: [PATCH 08/14] small fixes --- limestone.js | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/limestone.js b/limestone.js index 26189d0..ceec0e0 100644 --- a/limestone.js +++ b/limestone.js @@ -147,7 +147,7 @@ exports.SphinxClient = function() { errmsg = 'Server issued RETRY: '+server_message; } if(errmsg){ - callback(errmsg); + callback(new Error(errmsg)); } } @@ -360,8 +360,8 @@ exports.SphinxClient = function() { req_length.toBuffer().copy(request_buf, 4, 0); console.log('Sending search request of ' + request_buf.length + ' bytes '); - - _enqueue(request_buf, callback, Sphinx.clientCommand.SEARCH); + _enqueue(request_buf, callback, Sphinx.clientCommand.SEARCH); + }; self.build_excerpts = function(docs, index, words, passage_opts_raw, callback){ @@ -457,8 +457,7 @@ exports.SphinxClient = function() { function _enqueue(req_buf , cb, sc) { if(!server_conn || !server_conn.writable){ - - throw new Error('Trying to enqueue. Not connected'); + cb(new Error("Trying to enqueue. Not connected")); } _queue.push({request_buffer: req_buf, callback: cb, search_command: sc}); if(_queue.length === 1) From c4a8cdc7c01b7b8f08c5a78e4caf9332d3bc3365 Mon Sep 17 00:00:00 2001 From: Joe Z Date: Fri, 9 Dec 2011 07:34:00 -0500 Subject: [PATCH 09/14] qf --- limestone.js | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/limestone.js b/limestone.js index ceec0e0..ebd5842 100644 --- a/limestone.js +++ b/limestone.js @@ -116,8 +116,8 @@ exports.SphinxClient = function() { var port = args.length ? args.shift(): Sphinx.port; var persistent = _persistent = args.length ? args.shift() : false; server_conn = tcp.createConnection(port); - server_conn.once('error', function(x){ - console.log('Error: '+x); + server_conn.on('error', function(x){ + console.log('Error: '+x); server_conn.end(); callback(x); }); From f0efcc0a452f01406cda1cab060c7beb9ddf3cfe Mon Sep 17 00:00:00 2001 From: Joe Z Date: Thu, 15 Dec 2011 10:56:44 -0500 Subject: [PATCH 10/14] uuid removed --- limestone.js | 1 - 1 file changed, 1 deletion(-) diff --git a/limestone.js b/limestone.js index ebd5842..7c2cda5 100644 --- a/limestone.js +++ b/limestone.js @@ -1,5 +1,4 @@ var tcp = require('net'); -var uuid = require('node-uuid'); exports.SphinxClient = function() { var self = { }; From 20b1a87f8a5eba7ccdedb96b36b815f397e13ec8 Mon Sep 17 00:00:00 2001 From: Joe Z Date: Wed, 11 Jan 2012 17:15:57 -0500 Subject: [PATCH 11/14] Added remote host connect support --- limestone.js | 87 +++++++++++++++++++++++++++++----------------------- 1 file changed, 49 insertions(+), 38 deletions(-) diff --git a/limestone.js b/limestone.js index 7c2cda5..3ad3458 100644 --- a/limestone.js +++ b/limestone.js @@ -2,7 +2,7 @@ var tcp = require('net'); exports.SphinxClient = function() { var self = { }; - + var buffer_extras = require('./buffer_extras'); var Sphinx = { @@ -86,7 +86,7 @@ exports.SphinxClient = function() { "RANGE" : 1, "FLOATRANGE" : 2 }; - + Sphinx.attribute = { "INTEGER": 1, "TIMESTAMP": 2, @@ -95,7 +95,7 @@ exports.SphinxClient = function() { "FLOAT": 5, "BIGINT": 6, "STRING": 7, - "MULTI": 0x40000000 + "MULTI": 0x40000000 }; var server_conn = null; @@ -104,17 +104,28 @@ exports.SphinxClient = function() { var _queue = []; var _persistent = false; - - + + // Connect to Sphinx server self.connect = function() { - // arguments: (port, [persistent], callback). + // arguments: ([host:port], [persistent], callback). var args = Array.prototype.slice.call(arguments); var callback = args.pop(); - var port = args.length ? args.shift(): Sphinx.port; + var hostport = args.length ? args.shift() + '' : ':'+Sphinx.port; + if(hostport.indexOf(':')==-1){ + hostport = isNaN(hostport) ? hostport + ':' + Sphinx.port : ':' + hostport; + } + hostport = hostport.split(':'); + + var host = hostport[0].trim().length ? hostport[0].trim(): 'localhost' ; + var port = hostport[1].trim().length ? hostport[1].trim() : Sphinx.port; + + + var persistent = _persistent = args.length ? args.shift() : false; - server_conn = tcp.createConnection(port); + console.log('connecting to : '+host+':'+port); + server_conn = tcp.createConnection(port, host); server_conn.on('error', function(x){ console.log('Error: '+x); server_conn.end(); @@ -123,7 +134,7 @@ exports.SphinxClient = function() { // disable Nagle algorithm server_conn.setNoDelay(true); - server_conn.addListener('connect', + server_conn.addListener('connect', function () { // Sending protocol version // Here we must send 4 bytes, '0x00000001' @@ -154,7 +165,7 @@ exports.SphinxClient = function() { if (data_unpacked[""] >= 1) { //all ok, send my version server_conn.write(version_number.toBuffer()); - + if(persistent){ var pers_req = Buffer.makeWriter(); pers_req.push.int16(Sphinx.command.PERSIST); @@ -166,15 +177,15 @@ exports.SphinxClient = function() { server_conn.on('data', readResponseData); _connected = true; server_conn.emit('sphinx.connected'); - + // Use callback callback(null); - + } else { callback(new Error('Wrong protocol version: ' + protocol_version)); server_conn.end(); } - + }); } else { callback(new Error('Connection is ' + server_conn.readyState + ' in OnConnect')); @@ -218,7 +229,7 @@ exports.SphinxClient = function() { error : "", // per-reply fields (for single-query case) warning : "", connerror : false, - + reqs : [], // requests storage (for multi-query case) mbenc : "", arrayresult : true, @@ -228,7 +239,7 @@ exports.SphinxClient = function() { if (query_raw.query) { for (x in query_parameters) { if (query_raw.hasOwnProperty(x)) { - query[x] = query_raw[x]; + query[x] = query_raw[x]; } else { query[x] = query_parameters[x]; } @@ -237,38 +248,38 @@ exports.SphinxClient = function() { query = query_raw.toString(); } - var request = Buffer.makeWriter(); + var request = Buffer.makeWriter(); request.push.int16(Sphinx.command.SEARCH); request.push.int16(Sphinx.clientCommand.SEARCH); - + request.push.int32(0); // This will be request length request.push.int32(0); request.push.int32(1); - + request.push.int32(query.offset); - + request.push.int32(query.limit); request.push.int32(query.mode); request.push.int32(query.ranker); - + request.push.int32(query.sort); - - request.push.lstring(query.sortby); + + request.push.lstring(query.sortby); request.push.lstring(query.query); // Query text - request.push.int32(query.weights.length); + request.push.int32(query.weights.length); for (var weight in query.weights) { request.push.int32(parseInt(weight)); } - request.push.lstring(query.indexes); // Indexes + request.push.lstring(query.indexes); // Indexes request.push.int32(1); // id64 range marker request.push.int64(0, query.min_id); // This is actually supposed to be two 64-bit numbers - request.push.int64(0, query.max_id); + request.push.int64(0, query.max_id); - request.push.int32(query.filters.length); + request.push.int32(query.filters.length); for (var filter in query.filters) { request.push.int32(filter.attr.length); request.push_lstring(filter.attr); @@ -290,7 +301,7 @@ exports.SphinxClient = function() { break; } } - + request.push.int32(query_parameters.groupfunc); request.push.lstring(query_parameters.groupby); // Groupby length @@ -320,7 +331,7 @@ exports.SphinxClient = function() { request.push.int32(query_parameters.indexweights[i]); } - request.push.int32(query_parameters.maxquerytime); + request.push.int32(query_parameters.maxquerytime); // per-field weights (preferred method) request.push.int32(Object.keys(query.fieldweights).length); for (var field_name in query.fieldweights) { @@ -328,11 +339,11 @@ exports.SphinxClient = function() { request.push.int32(query.fieldweights[field_name]); } - request.push.lstring(query_parameters.comment); + request.push.lstring(query_parameters.comment); request.push.int32(query_parameters.overrides.length); for (var i in query_parameters.overrides) { - request.push.lstring(query_parameters.overrides[i].attr); + request.push.lstring(query_parameters.overrides[i].attr); request.push.int32(query_parameters.overrides[i].type); request.push.int32(query_parameters.overrides[i].values.length); for (var id in query_parameters.overrides[i].values) { @@ -358,8 +369,8 @@ exports.SphinxClient = function() { req_length.push.int32(request_buf.length - 8); req_length.toBuffer().copy(request_buf, 4, 0); - console.log('Sending search request of ' + request_buf.length + ' bytes '); - _enqueue(request_buf, callback, Sphinx.clientCommand.SEARCH); + console.log('Sending search request of ' + request_buf.length + ' bytes '); + _enqueue(request_buf, callback, Sphinx.clientCommand.SEARCH); }; @@ -399,7 +410,7 @@ exports.SphinxClient = function() { 'allow_empty' : 256, 'emit_zones' : 256 }; - + for (x in flag_properties) { if (passage_opts_raw.hasOwnProperty(x)) { flags |= flag_properties[x]; @@ -412,7 +423,7 @@ exports.SphinxClient = function() { request.push.int16(Sphinx.command.EXCERPT); request.push.int16(Sphinx.clientCommand.EXCERPT); request.push.int32(0); // This will be request length - + // request 'body' (flags, options, docs) request.push.int32(0); @@ -422,7 +433,7 @@ exports.SphinxClient = function() { request.push.lstring(index); request.push.lstring(words); - + // options request.push.lstring(passage_opts.before_match); request.push.lstring(passage_opts.after_match); @@ -470,7 +481,7 @@ exports.SphinxClient = function() { server_conn.write(req_buf); }); } - } + } } function _dequeue() { @@ -544,7 +555,7 @@ exports.SphinxClient = function() { if (this.status == Sphinx.statusCode.RETRY){ errmsg += "Server issued RETRY: " + this.data; } - + return errmsg; }, runCallbackIfDone : function(search_command) { @@ -678,7 +689,7 @@ exports.SphinxClient = function() { output.words[word]["docs"] = response.int32(); output.words[word]["hits"] = response.int32(); } - + return output; }; From 2aa158f309cad578289f62c7a9b806480dba5d88 Mon Sep 17 00:00:00 2001 From: Joe Z Date: Wed, 11 Jan 2012 20:10:19 -0500 Subject: [PATCH 12/14] removing spureous logline --- README.markdown | 2 +- limestone.js | 4 +--- 2 files changed, 2 insertions(+), 4 deletions(-) diff --git a/README.markdown b/README.markdown index a738468..7e2bf5a 100644 --- a/README.markdown +++ b/README.markdown @@ -5,7 +5,7 @@ Usage: var limestone = require("./limestone").SphinxClient(), sys = require("sys"); - limestone.connect(9312, // port. 9312 is standard Sphinx port + limestone.connect(9312, // port. 9312 is standard Sphinx port. also 'host:port' allowed function(err) { // callback if (err) { sys.puts('Connection error: ' + err); diff --git a/limestone.js b/limestone.js index 3ad3458..6bf9863 100644 --- a/limestone.js +++ b/limestone.js @@ -121,10 +121,8 @@ exports.SphinxClient = function() { var host = hostport[0].trim().length ? hostport[0].trim(): 'localhost' ; var port = hostport[1].trim().length ? hostport[1].trim() : Sphinx.port; - - var persistent = _persistent = args.length ? args.shift() : false; - console.log('connecting to : '+host+':'+port); + server_conn = tcp.createConnection(port, host); server_conn.on('error', function(x){ console.log('Error: '+x); From b1facbc06a13aaa345c4af886cc577dbc1893223 Mon Sep 17 00:00:00 2001 From: Joe Z Date: Fri, 13 Jan 2012 16:36:05 -0500 Subject: [PATCH 13/14] code formatted so it fits better with @kurokikaze/limestone --- limestone.js | 877 +++++++++++++++++++++++++++------------------------ 1 file changed, 462 insertions(+), 415 deletions(-) diff --git a/limestone.js b/limestone.js index 6bf9863..3ffec1c 100644 --- a/limestone.js +++ b/limestone.js @@ -6,98 +6,100 @@ exports.SphinxClient = function() { var buffer_extras = require('./buffer_extras'); var Sphinx = { - port : 9312 + port : 9312 }; // All search modes Sphinx.searchMode = { - "ALL":0, - "ANY":1, - "PHRASE":2, - "BOOLEAN":3, - "EXTENDED":4, - "FULLSCAN":5, - "EXTENDED2":6 // extended engine V2 (TEMPORARY, WILL BE REMOVED) + "ALL":0, + "ANY":1, + "PHRASE":2, + "BOOLEAN":3, + "EXTENDED":4, + "FULLSCAN":5, + "EXTENDED2":6 // extended engine V2 (TEMPORARY, WILL BE REMOVED) }; // All ranking modes Sphinx.rankingMode = { - "PROXIMITY_BM25" : 0, ///< default mode, phrase proximity major factor and BM25 minor one - "BM25" : 1, ///< statistical mode, BM25 ranking only (faster but worse quality) - "NONE" : 2, ///< no ranking, all matches get a weight of 1 - "WORDCOUNT" : 3, ///< simple word-count weighting, rank is a weighted sum of per-field keyword occurence counts - "PROXIMITY" : 4, - "MATCHANY" : 5, - "FIELDMASK" : 6, - "SPH04" : 7, - "TOTAL" : 8 + "PROXIMITY_BM25" : 0, ///< default mode, phrase proximity major factor and BM25 minor one + "BM25" : 1, ///< statistical mode, BM25 ranking only (faster but worse quality) + "NONE" : 2, ///< no ranking, all matches get a weight of 1 + "WORDCOUNT" : 3, ///< simple word-count weighting, rank is a weighted sum of per-field keyword occurence counts + "PROXIMITY" : 4, + "MATCHANY" : 5, + "FIELDMASK" : 6, + "SPH04" : 7, + "TOTAL" : 8 }; Sphinx.sortMode = { - "RELEVANCE" : 0, - "ATTR_DESC" : 1, - "ATTR_ASC" : 2, - "TIME_SEGMENTS" : 3, - "EXTENDED" : 4, - "EXPR" : 5 + "RELEVANCE" : 0, + "ATTR_DESC" : 1, + "ATTR_ASC" : 2, + "TIME_SEGMENTS" : 3, + "EXTENDED" : 4, + "EXPR" : 5 }; Sphinx.groupFunc = { - "DAY" : 0, - "WEEK" : 1, - "MONTH" : 2, - "YEAR" : 3, - "ATTR" : 4, - "ATTRPAIR" : 5 + "DAY" : 0, + "WEEK" : 1, + "MONTH" : 2, + "YEAR" : 3, + "ATTR" : 4, + "ATTRPAIR" : 5 }; // Commands Sphinx.command = { - "SEARCH" : 0, - "EXCERPT" : 1, - "UPDATE" : 2, - "KEYWORDS" : 3, - "PERSIST" : 4, - "STATUS" : 5, - "QUERY" : 6, - "FLUSHATTRS" : 7 + "SEARCH" : 0, + "EXCERPT" : 1, + "UPDATE" : 2, + "KEYWORDS" : 3, + "PERSIST" : 4, + "STATUS" : 5, + "QUERY" : 6, + "FLUSHATTRS" : 7 }; // Current version client commands Sphinx.clientCommand = { - "SEARCH" : 0x118, - "EXCERPT" : 0x103, - "UPDATE" : 0x102, - "KEYWORDS" : 0x100, - "STATUS" : 0x100, - "QUERY" : 0x100, - "FLUSHATTRS": 0x100 + "SEARCH" : 0x118, + "EXCERPT" : 0x103, + "UPDATE" : 0x102, + "KEYWORDS" : 0x100, + "STATUS" : 0x100, + "QUERY" : 0x100, + "FLUSHATTRS": 0x100 }; Sphinx.statusCode = { - "OK": 0, - "ERROR": 1, - "RETRY": 2, - "WARNING": 3 + "OK": 0, + "ERROR": 1, + "RETRY": 2, + "WARNING": 3 }; Sphinx.filterTypes = { - "VALUES" : 0, - "RANGE" : 1, - "FLOATRANGE" : 2 + "VALUES" : 0, + "RANGE" : 1, + "FLOATRANGE" : 2 }; Sphinx.attribute = { - "INTEGER": 1, - "TIMESTAMP": 2, - "ORDINAL": 3, - "BOOL": 4, - "FLOAT": 5, - "BIGINT": 6, - "STRING": 7, - "MULTI": 0x40000000 + "INTEGER": 1, + "TIMESTAMP": 2, + "ORDINAL": 3, + "BOOL": 4, + "FLOAT": 5, + "BIGINT": 6, + "STRING": 7, + "MULTI": 0x40000000 }; + self.Sphinx = Sphinx; + var server_conn = null; var response_output; var _connected = false; @@ -105,14 +107,17 @@ exports.SphinxClient = function() { var _persistent = false; - // Connect to Sphinx server self.connect = function() { // arguments: ([host:port], [persistent], callback). var args = Array.prototype.slice.call(arguments); + var callback = args.pop(); var hostport = args.length ? args.shift() + '' : ':'+Sphinx.port; + var persistent = _persistent = args.length ? args.shift() : false; + + if(hostport.indexOf(':')==-1){ hostport = isNaN(hostport) ? hostport + ':' + Sphinx.port : ':' + hostport; } @@ -121,82 +126,102 @@ exports.SphinxClient = function() { var host = hostport[0].trim().length ? hostport[0].trim(): 'localhost' ; var port = hostport[1].trim().length ? hostport[1].trim() : Sphinx.port; - var persistent = _persistent = args.length ? args.shift() : false; - server_conn = tcp.createConnection(port, host); + server_conn = tcp.createConnection(port, host); server_conn.on('error', function(x){ console.log('Error: '+x); server_conn.end(); callback(x); }); - // disable Nagle algorithm - server_conn.setNoDelay(true); - - server_conn.addListener('connect', - function () { - // Sending protocol version - // Here we must send 4 bytes, '0x00000001' - if (server_conn.readyState == 'open') { - var version_number = Buffer.makeWriter(); - version_number.push.int32(1); - // Waiting for answer - server_conn.once('data', function(data) { - var protocol_version_raw = data.toReader(); - var protocol_version = protocol_version_raw.int32(); - // if there still data? process and callback - if(!protocol_version_raw.empty()) { - status_code = protocol_version_raw.int16(); - version = protocol_version_raw.int16(); - server_message = protocol_version_raw.lstring(); - if(status_code == Sphinx.statusCode.ERROR){ - errmsg = 'Server issued ERROR: '+server_message; - } - if(status_code == Sphinx.statusCode.RETRY){ - errmsg = 'Server issued RETRY: '+server_message; - } - if(errmsg){ - callback(new Error(errmsg)); - } - } - - var data_unpacked = {'': protocol_version}; - if (data_unpacked[""] >= 1) { - //all ok, send my version - server_conn.write(version_number.toBuffer()); - - if(persistent){ - var pers_req = Buffer.makeWriter(); - pers_req.push.int16(Sphinx.command.PERSIST); - pers_req.push.int16(0); - pers_req.push.int32(4); - pers_req.push.int32(1); - server_conn.write(pers_req.toBuffer()); - } - server_conn.on('data', readResponseData); - _connected = true; - server_conn.emit('sphinx.connected'); - - // Use callback - callback(null); - - } else { - callback(new Error('Wrong protocol version: ' + protocol_version)); - server_conn.end(); - } - - }); - } else { - callback(new Error('Connection is ' + server_conn.readyState + ' in OnConnect')); - server_conn.end(); - _connected = false; - } - }); + // disable Nagle algorithm + server_conn.setNoDelay(true); + //server_conn.setEncoding('binary'); + + response_output = null; + + //var promise = new process.Promise(); + + server_conn.addListener('connect', function () { + + // console.log('Connected, sending protocol version... State is ' + server_conn.readyState); + // Sending protocol version + // console.log('Sending version number...'); + // Here we must send 4 bytes, '0x00000001' + if (server_conn.readyState == 'open') { + var version_number = Buffer.makeWriter(); + version_number.push.int32(1); + // Waiting for answer + server_conn.once('data', function(data) { + /*if (response_output) { + console.log('connect: Data received from server'); + }*/ + + var protocol_version_raw = data.toReader(); + var protocol_version = protocol_version_raw.int32(); + + // if there still data? process and callback + if(!protocol_version_raw.empty()) { + status_code = protocol_version_raw.int16(); + version = protocol_version_raw.int16(); + server_message = protocol_version_raw.lstring(); + if(status_code == Sphinx.statusCode.ERROR){ + errmsg = 'Server issued ERROR: '+server_message; + } + if(status_code == Sphinx.statusCode.RETRY){ + errmsg = 'Server issued RETRY: '+server_message; + } + if(errmsg){ + callback(new Error(errmsg)); + } + }// if !protocol_version_raw.empty() + var data_unpacked = {'': protocol_version}; + + if (data_unpacked[""] >= 1) { + + + //all ok, send my version + server_conn.write(version_number.toBuffer()); + + if(persistent){ + var pers_req = Buffer.makeWriter(); + pers_req.push.int16(Sphinx.command.PERSIST); + pers_req.push.int16(0); + pers_req.push.int32(4); + pers_req.push.int32(1); + server_conn.write(pers_req.toBuffer()); + } + + server_conn.on('data', readResponseData); + _connected = true; + server_conn.emit('sphinx.connected'); + + // Use callback + // promise.emitSuccess(); + callback(null); + + } else { + callback(new Error('Wrong protocol version: ' + protocol_version)); + server_conn.end(); + } + + }); + } else { + callback(new Error('Connection is ' + server_conn.readyState + ' in OnConnect')); + server_conn.end(); + _connected = false; + } + }); + }; + // console.log('Connecting to searchd...'); + self.query = function(query_raw, callback) { - var query = new Object(); + var query = new Object(); - var query_parameters = { + + // Default query parameters + var query_parameters = { offset : 0, limit : 20, mode : Sphinx.searchMode.ALL, @@ -221,160 +246,177 @@ exports.SphinxClient = function() { fieldweights : {}, overrides : [], selectlist : "*", - indexes : '*', - comment : '', - query : "", + indexes : '*', + comment : '', + query : "", error : "", // per-reply fields (for single-query case) warning : "", connerror : false, - + reqs : [], // requests storage (for multi-query case) mbenc : "", arrayresult : true, timeout : 0 - }; - - if (query_raw.query) { - for (x in query_parameters) { - if (query_raw.hasOwnProperty(x)) { - query[x] = query_raw[x]; - } else { - query[x] = query_parameters[x]; - } - } - } else { - query = query_raw.toString(); - } - - var request = Buffer.makeWriter(); - request.push.int16(Sphinx.command.SEARCH); - request.push.int16(Sphinx.clientCommand.SEARCH); + }; + + if (query_raw.query) { + for (x in query_parameters) { + if (query_raw.hasOwnProperty(x)) { + query[x] = query_raw[x]; + } else { + query[x] = query_parameters[x]; + } + } + } else { + query = query_raw.toString(); + } - request.push.int32(0); // This will be request length - request.push.int32(0); - request.push.int32(1); + var request = Buffer.makeWriter(); + request.push.int16(Sphinx.command.SEARCH); + request.push.int16(Sphinx.clientCommand.SEARCH); + + request.push.int32(0); // This will be request length + request.push.int32(0); + request.push.int32(1); + request.push.int32(query.offset); - + request.push.int32(query.limit); request.push.int32(query.mode); request.push.int32(query.ranker); - + request.push.int32(query.sort); + + request.push.lstring(query.sortby); + request.push.lstring(query.query); // Query text + request.push.int32(query.weights.length); + for (var weight in query.weights) { + request.push.int32(parseInt(weight)); + } - request.push.lstring(query.sortby); - request.push.lstring(query.query); // Query text - request.push.int32(query.weights.length); - for (var weight in query.weights) { - request.push.int32(parseInt(weight)); - } - - request.push.lstring(query.indexes); // Indexes - - request.push.int32(1); // id64 range marker - - request.push.int64(0, query.min_id); // This is actually supposed to be two 64-bit numbers - request.push.int64(0, query.max_id); - - request.push.int32(query.filters.length); - for (var filter in query.filters) { - request.push.int32(filter.attr.length); - request.push_lstring(filter.attr); - request.push.int32(filter.type); - switch (filter.type) { - case Sphinx.filterTypes.VALUES: - request.push.int32(filter.values.length); - for (var value in filter.values) { - request.push.int64(0, value); - } - break; - case Sphinx.filterTypes.RANGE: - request.push.int64(0, filter.min); - request.push.int64(0, filter.max); - break; - case Sphinx.filterTypes.FLOATRANGE: - request.push.float(filter.min); - request.push.float(filter.max); - break; - } - } - - request.push.int32(query_parameters.groupfunc); - request.push.lstring(query_parameters.groupby); // Groupby length + request.push.lstring(query.indexes); // Indexes used JEZ + + request.push.int32(1); // id64 range marker + + //request.push.int32(0); + request.push.int64(0, query.min_id); // This is actually supposed to be two 64-bit numbers + //request.push.int32(0); // However, there is a caveat about using 64-bit ids + request.push.int64(0, query.max_id); + + //console.log('Found ' + query.filters.length + ' filters'); + request.push.int32(query.filters.length); + for (var filter_id in query.filters) { + var filter = query.filters[filter_id]; + //console.log('Found filter of type ' + filter.type) + if (!filter.attr) { + filter.attr = ""; + } + if (!filter.exclude) { + filter.exclude = 0; + } + //request.push.int32(filter.attr.length);//WTF? length is included in lstring + request.push.lstring(filter.attr); + request.push.int32(filter.type); + switch (filter.type) { + case Sphinx.filterTypes.VALUES: + request.push.int32(filter.values.length); // Count of values + for (var value_id in filter.values) { + //request.push.int32(0); // should be a 64-bit number + request.push.int64(0, filter.values[value_id]); + } + break; + case Sphinx.filterTypes.RANGE: + //request.push.int32(0); // should be a 64-bit number + request.push.int64(0, filter.min); + //request.push.int32(0); // should be a 64-bit number + request.push.int64(0, filter.max); + break; + case Sphinx.filterTypes.FLOATRANGE: + request.push.float(filter.min); + request.push.float(filter.max); + break; + } + request.push.int32(filter.exclude); + } + + request.push.int32(query_parameters.groupfunc); + request.push.lstring(query_parameters.groupby); // Groupby length - request.push.int32(query_parameters.maxmatches); // Maxmatches, default to 1000 + request.push.int32(query_parameters.maxmatches); // Maxmatches, default to 1000 - request.push.lstring(query_parameters.groupsort); // Groupsort + request.push.lstring(query_parameters.groupsort); // Groupsort - request.push.int32(query_parameters.cutoff); // Cutoff - request.push.int32(query_parameters.retrycount); // Retrycount - request.push.int32(query_parameters.retrydelay); // Retrydelay + request.push.int32(query_parameters.cutoff); // Cutoff + request.push.int32(query_parameters.retrycount); // Retrycount + request.push.int32(query_parameters.retrydelay); // Retrydelay - request.push.lstring(query_parameters.groupdistinct); // Group distinct + request.push.lstring(query_parameters.groupdistinct); // Group distinct - if (query_parameters.anchor.length == 0) { - request.push.int32(0); // no anchor given - } else { - request.push.int32(1); // anchor point in radians - request.push.lstring(query_parameters.anchor["attrlat"]); // Group distinct - request.push.lstring(query_parameters.anchor["attrlong"]); // Group distinct - request.push.float(query_parameters.anchor["lat"]); - request.push.float(query_parameters.anchor["long"]); - } + if (query_parameters.anchor.length == 0) { + request.push.int32(0); // no anchor given + } else { + request.push.int32(1); // anchor point in radians + request.push.lstring(query_parameters.anchor["attrlat"]); // Group distinct + request.push.lstring(query_parameters.anchor["attrlong"]); // Group distinct + request.push.float(query_parameters.anchor["lat"]); + request.push.float(query_parameters.anchor["long"]); + } - request.push.int32(query_parameters.indexweights.length); - for (var i in query_parameters.indexweights) { - request.push.int32(i); - request.push.int32(query_parameters.indexweights[i]); - } + request.push.int32(query_parameters.indexweights.length); + for (var i in query_parameters.indexweights) { + request.push.int32(i); + request.push.int32(query_parameters.indexweights[i]); + } - request.push.int32(query_parameters.maxquerytime); - // per-field weights (preferred method) - request.push.int32(Object.keys(query.fieldweights).length); - for (var field_name in query.fieldweights) { + request.push.int32(query_parameters.maxquerytime); + // per-field weights (preferred method) + request.push.int32(Object.keys(query.fieldweights).length); + for (var field_name in query.fieldweights) { request.push.lstring(field_name); - request.push.int32(query.fieldweights[field_name]); - } + request.push.int32(query.fieldweights[field_name]); + } - request.push.lstring(query_parameters.comment); - - request.push.int32(query_parameters.overrides.length); - for (var i in query_parameters.overrides) { - request.push.lstring(query_parameters.overrides[i].attr); - request.push.int32(query_parameters.overrides[i].type); - request.push.int32(query_parameters.overrides[i].values.length); - for (var id in query_parameters.overrides[i].values) { - request.push.int64(id); - switch (query_parameters.overrides[i].type) { - case Sphinx.attribute.FLOAT: - request.push.float(query_parameters.overrides[i].values[id]); - break; - case Sphinx.attribute.BIGINT: - request.push.int64(query_parameters.overrides[i].values[id]); - break; - default: - request.push.int32(query_parameters.overrides[i].values[id]); - break; - } - } - } + request.push.lstring(query_parameters.comment); + + request.push.int32(query_parameters.overrides.length); + for (var i in query_parameters.overrides) { + request.push.lstring(query_parameters.overrides[i].attr); + request.push.int32(query_parameters.overrides[i].type); + request.push.int32(query_parameters.overrides[i].values.length); + for (var id in query_parameters.overrides[i].values) { + request.push.int64(id); + switch (query_parameters.overrides[i].type) { + case Sphinx.attribute.FLOAT: + request.push.float(query_parameters.overrides[i].values[id]); + break; + case Sphinx.attribute.BIGINT: + request.push.int64(query_parameters.overrides[i].values[id]); + break; + default: + request.push.int32(query_parameters.overrides[i].values[id]); + break; + } + } + } - request.push.lstring(query_parameters.selectlist); // Select-list + request.push.lstring(query_parameters.selectlist); // Select-list - var request_buf = request.toBuffer(); - var req_length = Buffer.makeWriter(); - req_length.push.int32(request_buf.length - 8); - req_length.toBuffer().copy(request_buf, 4, 0); + var request_buf = request.toBuffer(); + var req_length = Buffer.makeWriter(); + req_length.push.int32(request_buf.length - 8); + req_length.toBuffer().copy(request_buf, 4, 0); - console.log('Sending search request of ' + request_buf.length + ' bytes '); - _enqueue(request_buf, callback, Sphinx.clientCommand.SEARCH); + //console.log('Sending search request of ' + request_buf.length + ' bytes'); + _enqueue(request_buf, callback, Sphinx.clientCommand.SEARCH); }; self.build_excerpts = function(docs, index, words, passage_opts_raw, callback){ var passage_opts = new Object(); + var passage_parameters = { before_match : '', after_match : '', @@ -385,8 +427,8 @@ exports.SphinxClient = function() { limit_words : 0, around : 5, start_passage_id : 1, - passage_boundary : 'none' - }; + passage_boundary : 'none', + } for (x in passage_parameters) { if (passage_opts_raw.hasOwnProperty(x)) { @@ -407,8 +449,8 @@ exports.SphinxClient = function() { 'load_files' : 128, 'allow_empty' : 256, 'emit_zones' : 256 - }; - + } + for (x in flag_properties) { if (passage_opts_raw.hasOwnProperty(x)) { flags |= flag_properties[x]; @@ -421,7 +463,7 @@ exports.SphinxClient = function() { request.push.int16(Sphinx.command.EXCERPT); request.push.int16(Sphinx.clientCommand.EXCERPT); request.push.int32(0); // This will be request length - + // request 'body' (flags, options, docs) request.push.int32(0); @@ -431,7 +473,7 @@ exports.SphinxClient = function() { request.push.lstring(index); request.push.lstring(words); - + // options request.push.lstring(passage_opts.before_match); request.push.lstring(passage_opts.after_match); @@ -455,12 +497,12 @@ exports.SphinxClient = function() { req_length.push.int32(request_buf.length - 8); req_length.toBuffer().copy(request_buf,4,0); - console.log('Sending build excerpt request of ' + request_buf.length + 'bytes'); + //console.log('Sending build excerpt request of ' + request_buf.length + 'bytes'); _enqueue(request_buf, callback, Sphinx.clientCommand.EXCERPT); }; // build_excerpts self.disconnect = function() { - server_conn.end(); + server_conn.end(); }; function _enqueue(req_buf , cb, sc) { @@ -500,78 +542,83 @@ exports.SphinxClient = function() { } function readResponseData(data) { - // Got response! - response_output.append(data); + // Got response! + response_output.append(data); response_output.runCallbackIfDone(_queue[0]['search_command']); } function initResponseOutput(query_callback) { - response_output = { - status : null, - version : null, - length : 0, - data : new Buffer(0), - parseHeader : function() { - if (this.status === null && this.data.length >= 8) { - var decoder = this.data.toReader(); - - this.status = decoder.int16(); - this.version = decoder.int16(); - this.length = decoder.int32(); + response_output = { + status : null, + version : null, + length : 0, + data : new Buffer(0), + parseHeader : function() { + if (this.status === null && this.data.length >= 8) { + // console.log('Answer length: ' + (this.data.length)); + var decoder = this.data.toReader(); + // var decoder = new bits.Decoder(this.data); + + this.status = decoder.int16(); + this.version = decoder.int16(); + this.length = decoder.int32(); + // console.log('Receiving answer with status ' + this.status + ', version ' + this.version + ' and length ' + this.length); this.data = this.data.slice(8, this.data.length); - } - }, - append : function(data) { - var new_buffer = new Buffer(this.data.length + data.length); - this.data.copy(new_buffer, 0, 0); - data.copy(new_buffer, this.data.length, 0); - this.data = new_buffer; - this.parseHeader(); - }, - done : function() { - return this.data.length >= this.length; - }, - checkResponse : function(search_command) { - var errmsg = ''; - if (this.length !== this.data.length) { - errmsg += "Failed to read searchd response (status=" + this.status + ", ver=" + this.version + ", len=" + this.length + ", read=" + this.data.length + ")"; - } - - if (this.version < search_command) { - errmsg += "Searchd command older than client's version, some options might not work"; - } - - if (this.status == Sphinx.statusCode.WARNING) { - errmsg += "Server issued WARNING: " + this.data; - } - - if (this.status == Sphinx.statusCode.ERROR) { - errmsg += "Server issued ERROR: " + this.data; - } - - if (this.status == Sphinx.statusCode.RETRY){ - errmsg += "Server issued RETRY: " + this.data; - } - - return errmsg; - }, - runCallbackIfDone : function(search_command) { - if (this.done()) { - var answer; - var cloned = new Buffer(response_output.data.length); - // clone the response data, so we can dequeue and let the server free to modify again response_output - response_output.data.copy(cloned); - _dequeue(); - var errmsg = this.checkResponse(search_command); - if (!errmsg) { - answer = parseResponse(cloned, search_command); - } - - query_callback(errmsg, answer); - } - } - }; + // this.data = decoder.string(this.data.length - 8); + } + }, + append : function(data) { + //this.data.write(data.toString('utf-8'), 'utf-8'); + // console.log('Appending ' + data.length + ' bytes'); + var new_buffer = new Buffer(this.data.length + data.length); + this.data.copy(new_buffer, 0, 0); + data.copy(new_buffer, this.data.length, 0); + this.data = new_buffer; + // console.log('Data length after appending: ' + this.data.length); + this.parseHeader(); + }, + done : function() { + // console.log('Length: ' + this.data.length + ' / ' + this.length); + return this.data.length >= this.length; + }, + checkResponse : function(search_command) { + var errmsg = ''; + if (this.length !== this.data.length) { + errmsg += "Failed to read searchd response (status=" + this.status + ", ver=" + this.version + ", len=" + this.length + ", read=" + this.data.length + ")"; + } + + if (this.version < search_command) { + errmsg += "Searchd command older than client's version, some options might not work"; + } + + if (this.status == Sphinx.statusCode.WARNING) { + errmsg += "Server issued WARNING: " + this.data; + } + + if (this.status == Sphinx.statusCode.ERROR) { + errmsg += "Server issued ERROR: " + this.data; + } + if (this.status == Sphinx.statusCode.RETRY){ + errmsg += "Server issued RETRY: " + this.data; + }// if RETRY + return errmsg; + }, + runCallbackIfDone : function(search_command) { + if (this.done()) { + var answer; + var cloned = new Buffer(response_output.data.length); + // clone the response data, so we can dequeue and let the server free to modify again response_output + response_output.data.copy(cloned); + _dequeue(); + var errmsg = this.checkResponse(search_command); + if (!errmsg) { + answer = parseResponse(cloned, search_command); + } + query_callback(errmsg, answer); + } + } + }; } var parseResponse = function (data, search_command) { @@ -580,115 +627,115 @@ exports.SphinxClient = function() { } else if (search_command == Sphinx.clientCommand.EXCERPT) { return parseExcerptResponse(data); } - return null; - }; + } var parseSearchResponse = function (data) { - var output = {}; - var response = data.toReader(); - var i; - output.status = response.int32(); + var output = {}; + // var response = new bits.Decoder(data); + var response = data.toReader(); + var i; + output.status = response.int32(); if (output.status != 0) { - return(response.lstring()); + return(response.lstring()); } - output.num_fields = response.int32(); + output.num_fields = response.int32(); - output.fields = []; - output.attributes = []; - output.matches = []; + output.fields = []; + output.attributes = []; + output.matches = []; - // Get fields - for (i = 0; i < output.num_fields; i++) { - var field = {}; + // Get fields + for (i = 0; i < output.num_fields; i++) { + var field = {}; - field.name = response.lstring(); + field.name = response.lstring(); - output.fields.push(field); - } - - output.num_attrs = response.int32(); - - // Get attributes - for (i = 0; i < output.num_attrs; i++) { - var attribute = {}; - - attribute.name = response.lstring(); - attribute.type = response.int32(); - output.attributes.push(attribute); - } - - output.match_count = response.int32(); - output.id64 = response.int32(); + output.fields.push(field); + } - // Get matches - for (i = 0; i < output.match_count; i++) { - var match = {}; + output.num_attrs = response.int32(); - // Here server tells us which format for document IDs - // it uses: int64 or int32 - if (output.id64 == 1) { - // get the 64-bit result, but only use the lower half for now - var id64 = response.int64(); - match.doc = id64[1]; - match.weight = response.int32(); - } else { - // Good news: document id fits our integers size :) - match.doc = response.int32(); - match.weight = response.int32(); - } + // Get attributes + for (i = 0; i < output.num_attrs; i++) { + var attribute = {}; - match.attrs = {}; - - // - var attr_value; - // var attribute; - for (attribute in output.attributes) { - // BIGINT size attributes (64 bits) - if (output.attributes[attribute].type == Sphinx.attribute.BIGINT) { - attr_value = response.int32(); - attr_value = response.int32(); - match.attrs[output.attributes[attribute].name] = attr_value; - continue; - } - - // FLOAT size attributes (32 bits) - if (output.attributes[attribute].type == Sphinx.attribute.FLOAT) { - attr_value = response.int32(); - match.attrs[output.attributes[attribute].name] = attr_value; - continue; - } - - // STRING attributes - if (output.attributes[attribute].type == Sphinx.attribute.STRING) { - attr_value = response.lstring(); - match.attrs[output.attributes[attribute].name] = attr_value; - continue; - } - - // We don't need this branch right now, - // as it is covered by previous `if` - // @todo: implement MULTI attribute type - attr_value = response.int32(); - match.attrs[output.attributes[attribute].name] = attr_value; - } - - output.matches.push(match); + attribute.name = response.lstring(); + attribute.type = response.int32(); + output.attributes.push(attribute); + } - } + output.match_count = response.int32(); + output.id64 = response.int32(); + + // Get matches + for (i = 0; i < output.match_count; i++) { + var match = {}; + + // Here server tells us which format for document IDs + // it uses: int64 or int32 + if (output.id64 == 1) { + // get the 64-bit result, but only use the lower half for now + var id64 = response.int64(); + match.doc = id64[1]; + match.weight = response.int32(); + } else { + // Good news: document id fits our integers size :) + match.doc = response.int32(); + match.weight = response.int32(); + } + + match.attrs = {}; + + // + var attr_value; + // var attribute; + for (attribute in output.attributes) { + // BIGINT size attributes (64 bits) + if (output.attributes[attribute].type == Sphinx.attribute.BIGINT) { + attr_value = response.int32(); + attr_value = response.int32(); + match.attrs[output.attributes[attribute].name] = attr_value; + continue; + } + + // FLOAT size attributes (32 bits) + if (output.attributes[attribute].type == Sphinx.attribute.FLOAT) { + attr_value = response.int32(); + match.attrs[output.attributes[attribute].name] = attr_value; + continue; + } + + // STRING attributes + if (output.attributes[attribute].type == Sphinx.attribute.STRING) { + attr_value = response.lstring(); + match.attrs[output.attributes[attribute].name] = attr_value; + continue; + } + + // We don't need this branch right now, + // as it is covered by previous `if` + // @todo: implement MULTI attribute type + attr_value = response.int32(); + match.attrs[output.attributes[attribute].name] = attr_value; + } + + output.matches.push(match); - output.total = response.int32(); - output.total_found = response.int32(); - output.msecs = response.int32(); - output.words_count = response.int32(); - output.words = new Object(); - for (i = 0; i < output.words_count; i++) { - var word = response.lstring(); - output.words[word] = new Object(); - output.words[word]["docs"] = response.int32(); - output.words[word]["hits"] = response.int32(); - } + } - return output; + output.total = response.int32(); + output.total_found = response.int32(); + output.msecs = response.int32(); + output.words_count = response.int32(); + output.words = new Object(); + for (i = 0; i < output.words_count; i++) { + var word = response.lstring(); + output.words[word] = new Object(); + output.words[word]["docs"] = response.int32(); + output.words[word]["hits"] = response.int32(); + } + + return output; }; var parseExcerptResponse = function (data) { From a9abcb2cb04a9d466c59aca4042d0dbc07fec493 Mon Sep 17 00:00:00 2001 From: Joe Z Date: Fri, 13 Jan 2012 16:49:31 -0500 Subject: [PATCH 14/14] qf --- limestone.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/limestone.js b/limestone.js index 3ffec1c..ee32877 100644 --- a/limestone.js +++ b/limestone.js @@ -158,7 +158,6 @@ exports.SphinxClient = function() { var protocol_version_raw = data.toReader(); var protocol_version = protocol_version_raw.int32(); - // if there still data? process and callback if(!protocol_version_raw.empty()) { status_code = protocol_version_raw.int16(); @@ -498,6 +497,7 @@ exports.SphinxClient = function() { req_length.toBuffer().copy(request_buf,4,0); //console.log('Sending build excerpt request of ' + request_buf.length + 'bytes'); + _enqueue(request_buf, callback, Sphinx.clientCommand.EXCERPT); }; // build_excerpts