diff --git a/webapp/graphite/whitelist/views.py b/webapp/graphite/whitelist/views.py index 721832b00..f5ccafdb9 100644 --- a/webapp/graphite/whitelist/views.py +++ b/webapp/graphite/whitelist/views.py @@ -18,7 +18,6 @@ from django.conf import settings from graphite.compat import HttpResponse -from graphite.util import unpickle def add(request): @@ -42,7 +41,7 @@ def show(request): def load_whitelist(): fh = open(settings.WHITELIST_FILE, 'rb') - whitelist = unpickle.load(fh) + whitelist = pickle.load(fh) fh.close() return whitelist diff --git a/webapp/tests/test_whitelist.py b/webapp/tests/test_whitelist.py new file mode 100644 index 000000000..4702c88cb --- /dev/null +++ b/webapp/tests/test_whitelist.py @@ -0,0 +1,115 @@ +import errno +import mock +import os +import pickle + +from . import DATA_DIR + +from django.conf import settings +from django.core.urlresolvers import reverse +from django.test import TestCase + +from graphite.whitelist.views import load_whitelist, save_whitelist + +class WhitelistTester(TestCase): + settings.WHITELIST_FILE = os.path.join(DATA_DIR, 'lists/whitelist') + + def wipe_whitelist(self): + try: + os.remove(settings.WHITELIST_FILE) + except OSError: + pass + + def create_whitelist(self): + try: + os.makedirs(settings.WHITELIST_FILE.replace('whitelist', '')) + except OSError: + pass + fh = open(settings.WHITELIST_FILE, 'wb') + pickle.dump({'a.b.c.d', 'e.f.g.h'}, fh) + fh.close() + + def test_whitelist_show_no_whitelist(self): + url = reverse('whitelist_show') + with self.assertRaises(IOError): + response = self.client.get(url) + + def test_whitelist_show(self): + url = reverse('whitelist_show') + self.create_whitelist() + self.addCleanup(self.wipe_whitelist) + response = self.client.get(url) + self.assertEqual(response.status_code, 200) + self.assertEqual(response.content, "a.b.c.d\ne.f.g.h") + + def test_whitelist_add(self): + self.create_whitelist() + self.addCleanup(self.wipe_whitelist) + + url = reverse('whitelist_add') + response = self.client.post(url, {'metrics': ['i.j.k.l']}) + self.assertEqual(response.status_code, 200) + self.assertEqual(response.content, "OK") + + url = reverse('whitelist_show') + response = self.client.get(url) + self.assertEqual(response.status_code, 200) + self.assertEqual(response.content, "a.b.c.d\ne.f.g.h\ni.j.k.l") + + def test_whitelist_add_existing(self): + self.create_whitelist() + self.addCleanup(self.wipe_whitelist) + + url = reverse('whitelist_add') + response = self.client.post(url, {'metrics': ['a.b.c.d']}) + self.assertEqual(response.status_code, 200) + self.assertEqual(response.content, "OK") + + url = reverse('whitelist_show') + response = self.client.get(url) + self.assertEqual(response.status_code, 200) + self.assertEqual(response.content, "a.b.c.d\ne.f.g.h") + + def test_whitelist_remove(self): + self.create_whitelist() + self.addCleanup(self.wipe_whitelist) + + url = reverse('whitelist_remove') + response = self.client.post(url, {'metrics': ['a.b.c.d']}) + self.assertEqual(response.status_code, 200) + self.assertEqual(response.content, "OK") + + url = reverse('whitelist_show') + response = self.client.get(url) + self.assertEqual(response.status_code, 200) + self.assertEqual(response.content, "e.f.g.h") + + def test_whitelist_remove_missing(self): + self.create_whitelist() + self.addCleanup(self.wipe_whitelist) + + url = reverse('whitelist_remove') + response = self.client.post(url, {'metrics': ['i.j.k.l']}) + self.assertEqual(response.status_code, 200) + self.assertEqual(response.content, "OK") + + url = reverse('whitelist_show') + response = self.client.get(url) + self.assertEqual(response.status_code, 200) + self.assertEqual(response.content, "a.b.c.d\ne.f.g.h") + + def test_save_whitelist(self): + try: + os.makedirs(settings.WHITELIST_FILE.replace('whitelist', '')) + except OSError: + pass + self.addCleanup(self.wipe_whitelist) + self.assertEqual(save_whitelist({'a.b.c.d','e.f.g.h'}), None) + self.assertEqual(load_whitelist(), {'a.b.c.d','e.f.g.h'}) + + @mock.patch('os.rename') + def test_save_whitelist_rename_failure(self, rename): + self.addCleanup(self.wipe_whitelist) + rename.side_effect = OSError(errno.EPERM, 'Operation not permitted') + with self.assertRaises(OSError): + save_whitelist({'a.b.c.d','e.f.g.h'})