-
Notifications
You must be signed in to change notification settings - Fork 95
/
Copy pathwritefile.go
226 lines (203 loc) · 5.34 KB
/
writefile.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
package gws
import (
"bytes"
"encoding/binary"
"errors"
"io"
"math"
"github.com/klauspost/compress/flate"
"github.com/lxzan/gws/internal"
)
const segmentSize = 128 * 1024
// 获取大文件压缩器
// Get bigDeflater
func (c *Conn) getBigDeflater() *bigDeflater {
if c.isServer {
return c.config.bdPool.Get()
}
return (*bigDeflater)(c.deflater.cpsWriter)
}
// 回收大文件压缩器
// Recycle bigDeflater
func (c *Conn) putBigDeflater(d *bigDeflater) {
if c.isServer {
c.config.bdPool.Put(d)
}
}
// 拆分io.Reader为小切片
// Split io.Reader into small slices
func (c *Conn) splitReader(r io.Reader, f func(index int, eof bool, p []byte) error) error {
var buf = binaryPool.Get(segmentSize)
defer binaryPool.Put(buf)
var p = buf.Bytes()[:segmentSize]
var n, index = 0, 0
var err error
for n, err = r.Read(p); err == nil || errors.Is(err, io.EOF); n, err = r.Read(p) {
eof := errors.Is(err, io.EOF)
if err = f(index, eof, p[:n]); err != nil {
return err
}
index++
if eof {
break
}
}
return err
}
// WriteFile 大文件写入
// 采用分段写入技术, 减少写入过程中的内存占用
// Segmented write technology to reduce memory usage during write process
func (c *Conn) WriteFile(opcode Opcode, payload io.Reader) error {
err := c.doWriteFile(opcode, payload)
c.emitError(false, err)
return err
}
func (c *Conn) doWriteFile(opcode Opcode, payload io.Reader) error {
c.mu.Lock()
defer c.mu.Unlock()
var cb = func(index int, eof bool, p []byte) error {
if index > 0 {
opcode = OpcodeContinuation
}
frame, err := c.genFrame(opcode, internal.Bytes(p), frameConfig{
fin: eof,
compress: false,
broadcast: false,
checkEncoding: false,
})
if err != nil {
return err
}
if c.pd.Enabled && index == 0 {
frame.Bytes()[0] |= uint8(64)
}
if c.isClosed() {
return ErrConnClosed
}
err = internal.WriteN(c.conn, frame.Bytes())
binaryPool.Put(frame)
return err
}
if c.pd.Enabled {
var deflater = c.getBigDeflater()
var fw = &flateWriter{cb: cb}
var reader = &readerWrapper{r: payload, sw: &c.cpsWindow}
err := deflater.Compress(reader, fw, c.cpsWindow.dict)
c.putBigDeflater(deflater)
return err
} else {
return c.splitReader(payload, cb)
}
}
// 大文件压缩器
type bigDeflater flate.Writer
// 创建大文件压缩器
// Create a bigDeflater
func newBigDeflater(isServer bool, options PermessageDeflate) *bigDeflater {
windowBits := internal.SelectValue(isServer, options.ServerMaxWindowBits, options.ClientMaxWindowBits)
if windowBits == 15 {
cpsWriter, _ := flate.NewWriter(nil, options.Level)
return (*bigDeflater)(cpsWriter)
} else {
cpsWriter, _ := flate.NewWriterWindow(nil, internal.BinaryPow(windowBits))
return (*bigDeflater)(cpsWriter)
}
}
func (c *bigDeflater) FlateWriter() *flate.Writer { return (*flate.Writer)(c) }
// Compress 压缩
func (c *bigDeflater) Compress(r io.WriterTo, w *flateWriter, dict []byte) error {
if err := compressTo(c.FlateWriter(), r, w, dict); err != nil {
return err
}
return w.Flush()
}
// 写入代理
// 将切片透传给回调函数, 以实现分段写入功能
// Write proxy
// Passthrough slices to the callback function for segmented writes.
type flateWriter struct {
index int
buffers []*bytes.Buffer
cb func(index int, eof bool, p []byte) error
}
// 是否可以执行回调函数
// Whether the callback function can be executed
func (c *flateWriter) shouldCall() bool {
var n = len(c.buffers)
if n < 2 {
return false
}
var sum = 0
for i := 1; i < n; i++ {
sum += c.buffers[i].Len()
}
return sum >= 4
}
// 聚合写入, 减少syscall.write调用次数
// Aggregate writes, reducing the number of syscall.write calls
func (c *flateWriter) write(p []byte) {
var size = internal.Max(segmentSize, len(p))
if len(c.buffers) == 0 {
c.buffers = append(c.buffers, binaryPool.Get(size))
}
var n = len(c.buffers)
var tail = c.buffers[n-1]
if tail.Len()+len(p)+frameHeaderSize > tail.Cap() {
tail = binaryPool.Get(size)
c.buffers = append(c.buffers, tail)
}
tail.Write(p)
}
func (c *flateWriter) Write(p []byte) (n int, err error) {
c.write(p)
if c.shouldCall() {
err = c.cb(c.index, false, c.buffers[0].Bytes())
binaryPool.Put(c.buffers[0])
c.buffers = c.buffers[1:]
c.index++
}
return n, err
}
func (c *flateWriter) Flush() error {
var buf = c.buffers[0]
for i := 1; i < len(c.buffers); i++ {
buf.Write(c.buffers[i].Bytes())
binaryPool.Put(c.buffers[i])
}
if n := buf.Len(); n >= 4 {
if tail := buf.Bytes()[n-4:]; binary.BigEndian.Uint32(tail) == math.MaxUint16 {
buf.Truncate(n - 4)
}
}
var err = c.cb(c.index, true, buf.Bytes())
c.index++
binaryPool.Put(buf)
return err
}
// 将io.Reader包装为io.WriterTo
// Wrapping io.Reader as io.WriterTo
type readerWrapper struct {
r io.Reader
sw *slideWindow
}
// WriteTo 写入内容, 并更新字典
// Write the contents, and update the dictionary
func (c *readerWrapper) WriteTo(w io.Writer) (int64, error) {
var buf = binaryPool.Get(segmentSize)
defer binaryPool.Put(buf)
var p = buf.Bytes()[:segmentSize]
var sum, n = 0, 0
var err error
for n, err = c.r.Read(p); err == nil || errors.Is(err, io.EOF); n, err = c.r.Read(p) {
eof := errors.Is(err, io.EOF)
if _, err = w.Write(p[:n]); err != nil {
return int64(sum), err
}
sum += n
_, _ = c.sw.Write(p[:n])
if eof {
break
}
}
return int64(sum), err
}