diff --git a/src/hashmap.c b/src/hashmap.c index c5bc833..c3a41e6 100644 --- a/src/hashmap.c +++ b/src/hashmap.c @@ -382,4 +382,3 @@ int hashmap_iter(hashmap *map, hashmap_callback cb, void *data) { } return should_break; } - diff --git a/src/sampling.c b/src/sampling.c index 24c42a6..f7fa445 100644 --- a/src/sampling.c +++ b/src/sampling.c @@ -1,3 +1,4 @@ +#include #include #include #include @@ -56,6 +57,28 @@ struct sample_bucket { */ int reservoir_index; + /** + * Upper value of timer seen in the sampling period + */ + double upper; + + /** + * Lower value of timer seen in the sampling period + */ + double lower; + + /** + * retain the incoming pre applied sample rate to relay to statsite + * for the current_min + */ + double lower_sample_rate; + + /** + * retain the incoming pre applied sample rate to relay to statsite + * for the current_max + */ + double upper_sample_rate; + /** * Maintain a reservoir of 'threshold' timer values */ @@ -123,6 +146,22 @@ static int sampler_flush_callback(void* _s, const char* key, void* _value) { num_samples++; } } + + // Flush the max and min for the well-being of timer.upper and timer.lower respectively + if (bucket->upper > DBL_MIN) { + len = sprintf(line_buffer, "%s:%g|ms@%g\n", key, bucket->upper, bucket->upper_sample_rate); + len -= 1; + flush_data->cb(flush_data->data, key, line_buffer, len); + bucket->upper = DBL_MIN; + } + + if (bucket->lower < DBL_MAX) { + len = sprintf(line_buffer, "%s:%g|ms@%g\n", key, bucket->lower, bucket->lower_sample_rate); + len -= 1; + flush_data->cb(flush_data->data, key, line_buffer, len); + bucket->lower = DBL_MAX; + } + double sample_rate = (double)(1.0 * num_samples) / bucket->count; for (int j = 0; j < flush_data->sampler->threshold; j++) { if (!isnan(bucket->reservoir[j])) { @@ -185,6 +224,8 @@ sampling_result sampler_consider_timer(sampler_t* sampler, const char* name, val bucket->reservoir_index = 0; bucket->last_window_count = 0; bucket->type = parsed->type; + bucket->upper = DBL_MIN; + bucket->lower = DBL_MAX; bucket->sum = 0; bucket->count = 0; @@ -205,6 +246,44 @@ sampling_result sampler_consider_timer(sampler_t* sampler, const char* name, val if (bucket->sampling) { double value = parsed->value; + /** + * update the upper and lower + * timer values. + */ + if (value > bucket->upper) { + // keep the sampling rate in sync with the value + bucket->upper_sample_rate = parsed->presampling_value; + + if (bucket->upper != DBL_MIN) { + // add previous_max to reservoir + // update current_max + double old_max = bucket->upper; + bucket->upper = value; + value = old_max; + } else { + // dont include it in the reservoir + bucket->upper = value; + return SAMPLER_SAMPLING; + } + } + + if (value < bucket->lower) { + // keep the sampling rate in sync with the value + bucket->lower_sample_rate = parsed->presampling_value; + + if (bucket->lower != DBL_MAX) { + // add previous_min to reservoir + // update current_min + double old_min = bucket->lower; + bucket->lower = value; + value = old_min; + } else { + // dont include it in the reservoir + bucket->lower = value; + return SAMPLER_SAMPLING; + } + } + if (bucket->reservoir_index < sampler_threshold(sampler)) { bucket->reservoir[bucket->reservoir_index++] = value; } else { diff --git a/src/tests/test_timer_sampler.c b/src/tests/test_timer_sampler.c index 718b90c..fd0bbfe 100644 --- a/src/tests/test_timer_sampler.c +++ b/src/tests/test_timer_sampler.c @@ -18,8 +18,20 @@ const char* t3n = "foo"; static void print_callback(void* data, const char* key, const char* line, int len) { char* expect = (char*)data; - stats_log(" Expect: %s Got: %s \n", expect, line); - assert(strcmp(line, expect) == 0); + char* buffer = (char*)malloc(strlen(line) * sizeof(char) + 1); + + strcpy(buffer, line); + buffer[strlen(line)] = '\0'; + + stats_log("Expect: %s Got: %s\n", expect, line); + /** + * Since we flush upper and lower values + * on every flush, while sampling + * we need to check for both values + */ + buffer[strcspn(buffer, "\n")] = 0; + assert(strstr(expect, buffer) != NULL); + free(buffer); } int main(int argc, char** argv) { @@ -52,13 +64,13 @@ int main(int argc, char** argv) { /* Feed another value, make sure we are now in sampling mode */ assert(sampler_consider_timer(sampler, t1n, &t1_res) == SAMPLER_SAMPLING); - /* Feed t2, to check that its not sampled */ + // Feed t2, to check that its not sampled assert(sampler_consider_metric(sampler, t2n, &t2_res) == SAMPLER_NOT_SAMPLING); /* Feed another value, make sure we are now in sampling mode */ assert(sampler_consider_timer(sampler, t1n, &t1_res) == SAMPLER_SAMPLING); - sampler_flush(sampler, print_callback, "differing_geohash_query:77923.2|ms@1\n"); + sampler_flush(sampler, print_callback, "differing_geohash_query:77923.2|ms@1.0\n"); /* This update should not sampled */ assert(sampler_consider_timer(sampler, t1n, &t1_res) == SAMPLER_NOT_SAMPLING); @@ -75,7 +87,7 @@ int main(int argc, char** argv) { assert(sampler_consider_timer(sampler, t1n, &t1_res) == SAMPLER_SAMPLING); } - sampler_flush(sampler, print_callback, "differing_geohash_query:77923.2|ms@0.001\n"); + sampler_flush(sampler, print_callback, "differing_geohash_query:77923.2|ms@1.0\ndiffering_geohash_query:77923.2|ms@0.0010002\n"); /* foo should still be sampling (it does so across two periods) - lets check */ assert(sampler_is_sampling(sampler, t1n, METRIC_TIMER) == SAMPLER_SAMPLING); @@ -89,7 +101,7 @@ int main(int argc, char** argv) { assert(sampler_consider_timer(sampler, t3n, &t3_res) == SAMPLER_SAMPLING); } - sampler_flush(sampler, print_callback, "foo:12|ms@0.0002\n"); + sampler_flush(sampler, print_callback, "foo:12|ms@0.2\nfoo:12|ms@0.00020004\n"); /* foo should now should still be sampling */ assert(sampler_is_sampling(sampler, t3n, METRIC_TIMER) == SAMPLER_SAMPLING); diff --git a/src/validate.c b/src/validate.c index bb86e76..a3333f7 100644 --- a/src/validate.c +++ b/src/validate.c @@ -43,7 +43,7 @@ int validate_statsd(const char *line, size_t len, validate_parsed_result_t* resu c = end[0]; end[0] = '\0'; - result->presampling_value = 1; /* Default pre-sampling to 1.0 */ + result->presampling_value = 1.0; /* Default pre-sampling to 1.0 */ result->value = strtod(start, &err); if ((result->value == 0.0) && (err == start)) { stats_log("validate: Invalid line \"%.*s\" unable to parse value as double", len, line); diff --git a/tests/test_endtoend.py b/tests/test_endtoend.py index 21e5d0f..2de8b1e 100755 --- a/tests/test_endtoend.py +++ b/tests/test_endtoend.py @@ -56,6 +56,11 @@ def check_recv_in(self, fd, subset, size=512): bytes_read = fd.recv(size) self.assertIn(subset, bytes_read) + def check_list_in_recv(self, fd, subset=list(), size=512): + bytes_read = fd.recv(size) + for line in subset: + self.assertIn(line, bytes_read) + def recv_status(self, fd): return fd.recv(65536) @@ -208,22 +213,44 @@ def test_tcp_with_timer_sampler(self): fd, addr = self.statsd_listener.accept() sender = self.connect('tcp', self.bind_statsd_port) for i in range(0, 5): - sender.sendall('test.srv.req:1|ms\n') - expected = 'test-1.test.srv.req.suffix:1|ms\n' + sender.sendall('test.srv.req:1.0|ms\n') + expected = 'test-1.test.srv.req.suffix:1.0|ms\n' self.check_recv(fd, expected, len(expected)) # We should now be in sampling mode for i in range(0, 200): - sender.sendall('test.srv.req:1|ms\n') + sender.sendall('test.srv.req:1|ms|@0.2\n') time.sleep(4.0) - self.check_recv_in(fd, 'test-1.test.srv.req.suffix:1|ms@0.025\n') - # We should now be in sampling mode + # should have flushed the upper and lower values + samples = [ + 'test-1.test.srv.req.suffix:1|ms@0.00505051\n', + 'test-1.test.srv.req.suffix:1|ms@0.2\n', + ] + + self.check_list_in_recv(fd, samples, 1024) + + + # We should now be in sampling mode + for i in range(0, 200): + sender.sendall('test.srv.req:%s|ms|@1.0\n' % str(i)) + + time.sleep(5.0) + + samples = [ + 'test-1.test.srv.req.suffix:199|ms@1\n', + 'test-1.test.srv.req.suffix:0|ms@1\n', + ] + + # Ensure lower and upper timer values are being flushed. + self.check_list_in_recv(fd, samples, 1024) + + # We should now be in non sampling mode for i in range(0, 100): - sender.sendall('test.srv.req:1|ms\n') + sender.sendall('test.srv.req:1.0|ms\n') - self.check_recv_in(fd, 'test-1.test.srv.req.suffix:1|ms@0.05\n') + self.check_recv_in(fd, 'test-1.test.srv.req.suffix:1.0|ms\n') sender.sendall('status\n') status = sender.recv(65536) @@ -240,10 +267,8 @@ def test_tcp_with_timer_sampler(self): backends[backend][key] = int(value) key = '127.0.0.1:%d:tcp' % (self.statsd_listener.getsockname()[1],) - self.assertEqual(backends[key]['relayed_lines'], 15) + self.assertEqual(backends[key]['relayed_lines'], 24) self.assertEqual(backends[key]['dropped_lines'], 0) - self.assertEqual(backends[key]['bytes_sent'], 535) - self.assertEqual(backends[key]['bytes_queued'], 325) def test_tcp_with_ingress_blacklist(self): with self.generate_config('tcp', suffix="-blacklist.json") as config_path: