forked from brianray/mm
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathpsycopg2_tests.py
81 lines (67 loc) · 2.47 KB
/
psycopg2_tests.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
import unittest
import mm
import os
from xlrd_helper import XLSReader
path = os.path.dirname(__file__)
no_psycopg = True
try:
import psycopg2
import psycopg2.extras
no_psycopg = False
except ImportError:
print "could not import psycopg2"
class PsycopgTestSuite(unittest.TestCase):
def setUp(self):
if no_psycopg:
return
self.conn = psycopg2.connect("dbname='template1' user='testuser' host='localhost' password='password'")
dict_cur = self.conn.cursor()
dict_cur.execute("CREATE TABLE marmir_test (num INT, data CHAR(12))")
dict_cur.close()
def tearDown(self):
if no_psycopg:
return
dict_cur = self.conn.cursor()
dict_cur.execute("DROP TABLE marmir_test;")
dict_cur.close()
def test_psycopg(self):
if no_psycopg:
print 'Skipping psycopg2 test (psycopg2 package not installed)'
return
dict_cur = self.conn.cursor(cursor_factory=psycopg2.extras.DictCursor)
dict_cur.execute("INSERT INTO marmir_test (num, data) VALUES(%s, %s)",
(100, "abc'def"))
dict_cur.execute("INSERT INTO marmir_test (num, data) VALUES(%s, %s)",
(200, "mmmmaaarrrr"))
dict_cur.execute("SELECT * FROM marmir_test")
my_data = dict_cur.fetchall()
dict_cur.close()
mm_doc = mm.Document(my_data)
str = mm_doc.writestr()
self.assertTrue(
len(str) > 10,
msg="String should be longer than %s" % len(str))
with open("tests/generated_files/test_psycopg.xls", "wb") as f:
f.write(str)
self.check("tests/generated_files/test_psycopg.xls", my_data)
def check(self, filename, my_data):
xls = XLSReader(filename)
row = 0
headers = []
for ddict in my_data:
col = 0
for header, value in ddict.items():
cell_type = xls.get_type(row, col)
cell_value = xls.get_value(row, col)
if row == 0:
#headers
headers.append(cell_value)
else:
column_header = headers[col]
data = my_data[row - 1]
self.assertEquals(cell_value, data[column_header])
self.assertEquals(cell_type, type(data[column_header]))
col += 1
row += 1
if __name__ == "__main__":
unittest.main()