Skip to content

Commit

Permalink
accurate .upper and .lower value for high frequency timers (#22)
Browse files Browse the repository at this point in the history
  • Loading branch information
Yash Kumaraswamy authored and lyft-buildnotify-2 committed Nov 4, 2016
1 parent 37eac7a commit 4e8201f
Show file tree
Hide file tree
Showing 5 changed files with 133 additions and 18 deletions.
1 change: 0 additions & 1 deletion src/hashmap.c
Original file line number Diff line number Diff line change
Expand Up @@ -382,4 +382,3 @@ int hashmap_iter(hashmap *map, hashmap_callback cb, void *data) {
}
return should_break;
}

79 changes: 79 additions & 0 deletions src/sampling.c
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
#include <float.h>
#include <math.h>
#include <stdio.h>
#include <string.h>
Expand Down Expand Up @@ -56,6 +57,28 @@ struct sample_bucket {
*/
int reservoir_index;

/**
* Upper value of timer seen in the sampling period
*/
double upper;

/**
* Lower value of timer seen in the sampling period
*/
double lower;

/**
* retain the incoming pre applied sample rate to relay to statsite
* for the current_min
*/
double lower_sample_rate;

/**
* retain the incoming pre applied sample rate to relay to statsite
* for the current_max
*/
double upper_sample_rate;

/**
* Maintain a reservoir of 'threshold' timer values
*/
Expand Down Expand Up @@ -123,6 +146,22 @@ static int sampler_flush_callback(void* _s, const char* key, void* _value) {
num_samples++;
}
}

// Flush the max and min for the well-being of timer.upper and timer.lower respectively
if (bucket->upper > DBL_MIN) {
len = sprintf(line_buffer, "%s:%g|ms@%g\n", key, bucket->upper, bucket->upper_sample_rate);
len -= 1;
flush_data->cb(flush_data->data, key, line_buffer, len);
bucket->upper = DBL_MIN;
}

if (bucket->lower < DBL_MAX) {
len = sprintf(line_buffer, "%s:%g|ms@%g\n", key, bucket->lower, bucket->lower_sample_rate);
len -= 1;
flush_data->cb(flush_data->data, key, line_buffer, len);
bucket->lower = DBL_MAX;
}

double sample_rate = (double)(1.0 * num_samples) / bucket->count;
for (int j = 0; j < flush_data->sampler->threshold; j++) {
if (!isnan(bucket->reservoir[j])) {
Expand Down Expand Up @@ -185,6 +224,8 @@ sampling_result sampler_consider_timer(sampler_t* sampler, const char* name, val
bucket->reservoir_index = 0;
bucket->last_window_count = 0;
bucket->type = parsed->type;
bucket->upper = DBL_MIN;
bucket->lower = DBL_MAX;
bucket->sum = 0;
bucket->count = 0;

Expand All @@ -205,6 +246,44 @@ sampling_result sampler_consider_timer(sampler_t* sampler, const char* name, val
if (bucket->sampling) {
double value = parsed->value;

/**
* update the upper and lower
* timer values.
*/
if (value > bucket->upper) {
// keep the sampling rate in sync with the value
bucket->upper_sample_rate = parsed->presampling_value;

if (bucket->upper != DBL_MIN) {
// add previous_max to reservoir
// update current_max
double old_max = bucket->upper;
bucket->upper = value;
value = old_max;
} else {
// dont include it in the reservoir
bucket->upper = value;
return SAMPLER_SAMPLING;
}
}

if (value < bucket->lower) {
// keep the sampling rate in sync with the value
bucket->lower_sample_rate = parsed->presampling_value;

if (bucket->lower != DBL_MAX) {
// add previous_min to reservoir
// update current_min
double old_min = bucket->lower;
bucket->lower = value;
value = old_min;
} else {
// dont include it in the reservoir
bucket->lower = value;
return SAMPLER_SAMPLING;
}
}

if (bucket->reservoir_index < sampler_threshold(sampler)) {
bucket->reservoir[bucket->reservoir_index++] = value;
} else {
Expand Down
24 changes: 18 additions & 6 deletions src/tests/test_timer_sampler.c
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,20 @@ const char* t3n = "foo";

static void print_callback(void* data, const char* key, const char* line, int len) {
char* expect = (char*)data;
stats_log(" Expect: %s Got: %s \n", expect, line);
assert(strcmp(line, expect) == 0);
char* buffer = (char*)malloc(strlen(line) * sizeof(char) + 1);

strcpy(buffer, line);
buffer[strlen(line)] = '\0';

stats_log("Expect: %s Got: %s\n", expect, line);
/**
* Since we flush upper and lower values
* on every flush, while sampling
* we need to check for both values
*/
buffer[strcspn(buffer, "\n")] = 0;
assert(strstr(expect, buffer) != NULL);
free(buffer);
}

int main(int argc, char** argv) {
Expand Down Expand Up @@ -52,13 +64,13 @@ int main(int argc, char** argv) {
/* Feed another value, make sure we are now in sampling mode */
assert(sampler_consider_timer(sampler, t1n, &t1_res) == SAMPLER_SAMPLING);

/* Feed t2, to check that its not sampled */
// Feed t2, to check that its not sampled
assert(sampler_consider_metric(sampler, t2n, &t2_res) == SAMPLER_NOT_SAMPLING);

/* Feed another value, make sure we are now in sampling mode */
assert(sampler_consider_timer(sampler, t1n, &t1_res) == SAMPLER_SAMPLING);

sampler_flush(sampler, print_callback, "differing_geohash_query:77923.2|ms@1\n");
sampler_flush(sampler, print_callback, "differing_geohash_query:77923.2|ms@1.0\n");

/* This update should not sampled */
assert(sampler_consider_timer(sampler, t1n, &t1_res) == SAMPLER_NOT_SAMPLING);
Expand All @@ -75,7 +87,7 @@ int main(int argc, char** argv) {
assert(sampler_consider_timer(sampler, t1n, &t1_res) == SAMPLER_SAMPLING);
}

sampler_flush(sampler, print_callback, "differing_geohash_query:77923.2|ms@0.001\n");
sampler_flush(sampler, print_callback, "differing_geohash_query:77923.2|ms@1.0\ndiffering_geohash_query:77923.2|[email protected]\n");

/* foo should still be sampling (it does so across two periods) - lets check */
assert(sampler_is_sampling(sampler, t1n, METRIC_TIMER) == SAMPLER_SAMPLING);
Expand All @@ -89,7 +101,7 @@ int main(int argc, char** argv) {
assert(sampler_consider_timer(sampler, t3n, &t3_res) == SAMPLER_SAMPLING);
}

sampler_flush(sampler, print_callback, "foo:12|ms@0.0002\n");
sampler_flush(sampler, print_callback, "foo:12|ms@0.2\nfoo:12|[email protected]\n");

/* foo should now should still be sampling */
assert(sampler_is_sampling(sampler, t3n, METRIC_TIMER) == SAMPLER_SAMPLING);
Expand Down
2 changes: 1 addition & 1 deletion src/validate.c
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ int validate_statsd(const char *line, size_t len, validate_parsed_result_t* resu

c = end[0];
end[0] = '\0';
result->presampling_value = 1; /* Default pre-sampling to 1.0 */
result->presampling_value = 1.0; /* Default pre-sampling to 1.0 */
result->value = strtod(start, &err);
if ((result->value == 0.0) && (err == start)) {
stats_log("validate: Invalid line \"%.*s\" unable to parse value as double", len, line);
Expand Down
45 changes: 35 additions & 10 deletions tests/test_endtoend.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,11 @@ def check_recv_in(self, fd, subset, size=512):
bytes_read = fd.recv(size)
self.assertIn(subset, bytes_read)

def check_list_in_recv(self, fd, subset=list(), size=512):
bytes_read = fd.recv(size)
for line in subset:
self.assertIn(line, bytes_read)

def recv_status(self, fd):
return fd.recv(65536)

Expand Down Expand Up @@ -208,22 +213,44 @@ def test_tcp_with_timer_sampler(self):
fd, addr = self.statsd_listener.accept()
sender = self.connect('tcp', self.bind_statsd_port)
for i in range(0, 5):
sender.sendall('test.srv.req:1|ms\n')
expected = 'test-1.test.srv.req.suffix:1|ms\n'
sender.sendall('test.srv.req:1.0|ms\n')
expected = 'test-1.test.srv.req.suffix:1.0|ms\n'
self.check_recv(fd, expected, len(expected))

# We should now be in sampling mode
for i in range(0, 200):
sender.sendall('test.srv.req:1|ms\n')
sender.sendall('test.srv.req:1|ms|@0.2\n')

time.sleep(4.0)
self.check_recv_in(fd, 'test-1.test.srv.req.suffix:1|[email protected]\n')

# We should now be in sampling mode
# should have flushed the upper and lower values
samples = [
'test-1.test.srv.req.suffix:1|[email protected]\n',
'test-1.test.srv.req.suffix:1|[email protected]\n',
]

self.check_list_in_recv(fd, samples, 1024)


# We should now be in sampling mode
for i in range(0, 200):
sender.sendall('test.srv.req:%s|ms|@1.0\n' % str(i))

time.sleep(5.0)

samples = [
'test-1.test.srv.req.suffix:199|ms@1\n',
'test-1.test.srv.req.suffix:0|ms@1\n',
]

# Ensure lower and upper timer values are being flushed.
self.check_list_in_recv(fd, samples, 1024)

# We should now be in non sampling mode
for i in range(0, 100):
sender.sendall('test.srv.req:1|ms\n')
sender.sendall('test.srv.req:1.0|ms\n')

self.check_recv_in(fd, 'test-1.test.srv.req.suffix:1|ms@0.05\n')
self.check_recv_in(fd, 'test-1.test.srv.req.suffix:1.0|ms\n')

sender.sendall('status\n')
status = sender.recv(65536)
Expand All @@ -240,10 +267,8 @@ def test_tcp_with_timer_sampler(self):
backends[backend][key] = int(value)

key = '127.0.0.1:%d:tcp' % (self.statsd_listener.getsockname()[1],)
self.assertEqual(backends[key]['relayed_lines'], 15)
self.assertEqual(backends[key]['relayed_lines'], 24)
self.assertEqual(backends[key]['dropped_lines'], 0)
self.assertEqual(backends[key]['bytes_sent'], 535)
self.assertEqual(backends[key]['bytes_queued'], 325)

def test_tcp_with_ingress_blacklist(self):
with self.generate_config('tcp', suffix="-blacklist.json") as config_path:
Expand Down

0 comments on commit 4e8201f

Please sign in to comment.