forked from segmentio/parquet-go
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathdedupe.go
111 lines (95 loc) · 2.67 KB
/
dedupe.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
package parquet
// DedupeRowReader constructs a row reader which drops duplicated consecutive
// rows, according to the comparator function passed as argument.
//
// If the underlying reader produces a sequence of rows sorted by the same
// comparison predicate, the output is guaranteed to produce unique rows only.
func DedupeRowReader(reader RowReader, compare func(Row, Row) int) RowReader {
return &dedupeRowReader{reader: reader, compare: compare}
}
type dedupeRowReader struct {
reader RowReader
compare func(Row, Row) int
dedupe
}
func (d *dedupeRowReader) ReadRows(rows []Row) (int, error) {
for {
n, err := d.reader.ReadRows(rows)
n = d.deduplicate(rows[:n], d.compare)
if n > 0 || err != nil {
return n, err
}
}
}
// DedupeRowWriter constructs a row writer which drops duplicated consecutive
// rows, according to the comparator function passed as argument.
//
// If the writer is given a sequence of rows sorted by the same comparison
// predicate, the output is guaranteed to contain unique rows only.
func DedupeRowWriter(writer RowWriter, compare func(Row, Row) int) RowWriter {
return &dedupeRowWriter{writer: writer, compare: compare}
}
type dedupeRowWriter struct {
writer RowWriter
compare func(Row, Row) int
dedupe
rows []Row
}
func (d *dedupeRowWriter) WriteRows(rows []Row) (int, error) {
// We need to make a copy because we cannot modify the rows slice received
// as argument to respect the RowWriter contract.
d.rows = append(d.rows[:0], rows...)
defer func() {
for i := range d.rows {
d.rows[i] = Row{}
}
}()
if n := d.deduplicate(d.rows, d.compare); n > 0 {
w, err := d.writer.WriteRows(d.rows[:n])
if err != nil {
return w, err
}
}
// Return the number of rows received instead of the number of deduplicated
// rows actually written to the underlying writer because we have to repsect
// the RowWriter contract.
return len(rows), nil
}
type dedupe struct {
alloc rowAllocator
lastRow Row
uniq []Row
dupe []Row
}
func (d *dedupe) reset() {
d.alloc.reset()
d.lastRow = d.lastRow[:0]
}
func (d *dedupe) deduplicate(rows []Row, compare func(Row, Row) int) int {
defer func() {
for i := range d.uniq {
d.uniq[i] = Row{}
}
for i := range d.dupe {
d.dupe[i] = Row{}
}
d.uniq = d.uniq[:0]
d.dupe = d.dupe[:0]
}()
lastRow := d.lastRow
for _, row := range rows {
if len(lastRow) != 0 && compare(row, lastRow) == 0 {
d.dupe = append(d.dupe, row)
} else {
lastRow = row
d.uniq = append(d.uniq, row)
}
}
rows = rows[:0]
rows = append(rows, d.uniq...)
rows = append(rows, d.dupe...)
d.alloc.reset()
d.alloc.capture(lastRow)
d.lastRow = append(d.lastRow[:0], lastRow...)
return len(d.uniq)
}