-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathpgparse.py
262 lines (225 loc) · 9.61 KB
/
pgparse.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
"""pgparse.py: Classes and function for parsing emails on a PDF page."""
from dataclasses import dataclass, field
from typing import ClassVar
from collections import defaultdict
@dataclass
class Header:
"""A data class for an email header."""
from_email: str = None
to: list[str] = field(default_factory=list)
subject: str = None
date: str = None
cc: list[str] = field(default_factory=list)
bcc: list[str] = field(default_factory=list)
attachments: list[str] = field(default_factory=list)
importance: str = None
begin_ln: int = 0 # start line number
end_ln: int = 0 # finish line number
unprocessed: list[str] = field(default_factory=list)
@dataclass
class Page:
"""A data class for a PDF page without an email header."""
body: str
@dataclass
class Email(Page):
"""A data class for an email."""
header: Header
page_number: int = field(default_factory=int)
page_count: int = field(default_factory=int)
csv_header: ClassVar[list] = [
'PDF page number', 'page count',
'subject', 'date',
'from', 'to', 'cc',
'bcc', 'attachments',
'importance', 'body',
'hdr begin', 'hdr end',
'unprocessed']
def flatten(self):
"""Return flattened representation of email header."""
return [self.page_number, self.page_count,
self.header.subject, self.header.date,
self.header.from_email, self.header.to, self.header.cc,
self.header.bcc, self.header.attachments,
self.header.importance, self.body,
self.header.begin_ln, self.header.end_ln,
self.header.unprocessed]
def info(self):
"""Return email header info summary."""
return {"page_number": self.page_number,
"page_count": self.page_count,
"subject": self.header.subject,
"date": self.header.date,
"from_email": self.header.from_email,
"to": self.header.to}
@dataclass
class HeaderParser:
"""A class that parses an email header on a page, if it exists."""
pgarr: list[str]
_FIELD_TOKENS: ClassVar[list[str]] = ['from', 'to', 'cc', 'bcc',
'subject', 'date', 'sent',
'importance', 'attachments']
_MAX_START_LN: ClassVar[int] = 12 # max start line for header
_MAX_COL_COLON: ClassVar[int] = 14 # rt most column for header colon
_header: defaultdict(str) = field(
default_factory=lambda: defaultdict(str))
_ln: int = 0 # line position in page
_token: str = field(default_factory=str) # last field token
_lncnt: int = 0 # len(pgarr)
def _get_token(self, str):
# fix erroneous OCR spaces
return str.lower().replace(' ', '')
def _find_start(self):
self._ln = 0
while True:
if self._ln == self._MAX_START_LN: # reached _MAX_START_LN
return False
if self._ln == self._lncnt: # reached end of page
return False
loc = self.pgarr[self._ln][:self._MAX_COL_COLON].find(':')
if loc != -1:
self._token = self._get_token(self.pgarr[self._ln][:loc])
if self._token in self._FIELD_TOKENS:
self._header['begin_ln'] = self._ln + 1 # human counting
return True # Found the start of header
self._ln += 1
def _tokenize(self):
"""Tokenize a string if it represents an email header element."""
line = self.pgarr[self._ln].strip()
loc = line.find(':')
if loc != -1: # found add to header dictionary
# self._token = line[:loc].lower().replace(' ', '')
self._token = self._get_token(line[:loc])
if self._token in self._FIELD_TOKENS:
self._header[self._token] = line[loc+1:].strip()
else:
if self._header['unprocessed']:
self._header['unproccessed'] = \
self._header['unprocessed'].append(line)
else:
self._header['unprocessed'] = [line]
# print(f'Warning - unprocessed header element: {self._token}')
# print(line)
elif self._token in self._FIELD_TOKENS:
# existing token value carried onto next line
self._header[self._token] += line
def _next_line(self):
"""Return True if there is another line in the header else False."""
# Side effect: always increments ln."""
self._ln += 1
if self._ln >= self._lncnt: # Reached end of page
return False
elif self.pgarr[self._ln].strip() == '': # Blank line indicates EOH
return False
else:
return True
def _convert_obj(self):
"""Create a Header object based on the self._header dictionary.
If required fields are missing, raise warnings and return None.
"""
if self._header['date'] and self._header['from']:
return Header(from_email=self._header.get('from'),
to=self._header.get('to'),
cc=self._header.get('cc'),
bcc=self._header.get('bcc'),
subject=self._header.get('subject'),
date=self._header.get('date'),
attachments=self._header.get('attachments'),
importance=self._header.get('importance'),
begin_ln=self._header.get('begin_ln'),
end_ln=self._header.get('end_ln'),
unprocessed=self._header.get('unprocessed'))
else: # No date: or from:, v likely a false positive header
return None
def _gmail_parser(self):
from_sent = self.pgarr[self._header['end_ln']-2]
fe = from_sent.find('>')
# oldsubject - 2021 or earlier; newsubject - 2022 or later
old_subject = self.pgarr[self._header['end_ln']-3].strip()
new_subject = self.pgarr[self._header['end_ln']-5].strip()
if fe > 0 and (old_subject or new_subject):
self._header['from'] = from_sent[:fe+1]
self._header['date'] = from_sent[fe+1:].strip()
if old_subject:
self._header['subject'] = old_subject
self._header['begin_ln'] = self._header['begin_ln'] - 2
else: # new_subject:
self._header['subject'] = new_subject
self._header['begin_ln'] = self._header['begin_ln'] - 4
def parse(self):
"""Parse the email header if it exists."""
self._lncnt = len(self.pgarr) # lines in page
if self._find_start(): # find the start of the header
while True: # while in header
self._tokenize() # process header line
if not self._next_line(): # end of header
break
self._header['end_ln'] = self._ln
if not self._header['date']:
self._header['date'] = self._header.get('sent')
if not (self._header['subject'] or
self._header['date'] or
self._header['from']):
self._gmail_parser()
header_obj = self._convert_obj() # convert _header to object
# print(f'{header_obj=}')
# print(f'{self._header=}')
return header_obj
else: # no header
return None
def parse(page):
"""Parse a string representation of a PDF page.
Returns either a Page or an Email object depending on whether the page has
an email header.
"""
pgarr = page.splitlines()
hp = HeaderParser(pgarr)
header = hp.parse()
if header:
# ignore blank lines between email header and text
body_begin = header.end_ln
for ln in pgarr[header.end_ln::]:
if ln == '':
body_begin += 1
else:
break
body = '\n'.join(pgarr[body_begin::])
return Email(header=header, body=body)
else:
body = '\n'.join(pgarr)
return Page(body=body)
def main():
"""Run as script for testing purposes."""
example_page = """
From: [email protected]
Subject: this afternoon
Date: Thursday, March 25, 2021 06:16:10 AM
Status: Urgent
Hi Booboo:
Let's go see the ranger this afternoon at 2 o'clock, ok?
Yogi
"""
pg = parse(example_page)
print(type(pg))
print(pg)
example_page = """
Up next:
This is an example of a continuation page, which occurs when an email
extends beyond a page.
Thanks,
Yogi
"""
pg = parse(example_page)
print(type(pg))
print(pg)
exit()
e1 = Email(header=Header(from_email='[email protected]',
to=['[email protected]'],
subject='this afternoon',
date='Thursday, March 25, 2021 06:16:10 AM'),
body="""Hi Booboo:
Let's go see the ranger this afternoon at 2 o'clock, ok?
Yogi""")
print(e1)
if __name__ == "__main__":
main()