-
Notifications
You must be signed in to change notification settings - Fork 73
/
Copy pathsnappy.rs
285 lines (254 loc) · 10.5 KB
/
snappy.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
#![allow(clippy::cast_precision_loss)]
static USAGE: &str = r#"
Does streaming compression/decompression of the input using the Snappy framing format.
https://github.com/google/snappy/blob/main/framing_format.txt
It has four subcommands:
compress: Compress the input (multithreaded).
decompress: Decompress the input (single-threaded).
check: Quickly check if the input is a Snappy file by inspecting the
first 50 bytes of the input is valid Snappy data.
Returns exitcode 0 if the first 50 bytes is valid Snappy data,
exitcode 1 otherwise.
validate: Validate if the ENTIRE input is a valid Snappy file.
Returns exitcode 0 if valid, exitcode 1 otherwise.
Note that most qsv commands already automatically decompresses Snappy files if the
input file has an ".sz" extension. It will also automatically compress the output
file (though only single-threaded) if the --output file has an ".sz" extension.
This command's multithreaded compression is 5-6x faster than qsv's automatic
single-threaded compression.
Also, this command is not specific to CSV data, it can compress/decompress ANY file.
For examples, see https://github.com/dathere/qsv/blob/master/tests/test_snappy.rs.
Usage:
qsv snappy compress [options] [<input>]
qsv snappy decompress [options] [<input>]
qsv snappy check [options] [<input>]
qsv snappy validate [options] [<input>]
qsv snappy --help
snappy arguments:
<input> The input file to compress/decompress. This can be a local file, stdin,
or a URL (http and https schemes supported).
snappy options:
--user-agent <agent> Specify custom user agent to use when the input is a URL.
It supports the following variables -
$QSV_VERSION, $QSV_TARGET, $QSV_BIN_NAME, $QSV_KIND and $QSV_COMMAND.
Try to follow the syntax here -
https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/User-Agent
--timeout <secs> Timeout for downloading URLs in seconds.
[default: 60]
Common options:
-h, --help Display this message
-o, --output <file> Write output to <output> instead of stdout.
-j, --jobs <arg> The number of jobs to run in parallel when compressing.
When not set, its set to the number of CPUs - 1
-Q, --quiet Suppress status messages to stderr.
-p, --progressbar Show download progress bars. Only valid for URL input.
"#;
use std::{
fs,
io::{self, stdin, BufRead, Read, Write},
};
use gzp::{par::compress::ParCompressBuilder, snap::Snap, ZWriter};
use serde::Deserialize;
use tempfile::NamedTempFile;
use url::Url;
use crate::{config, util, CliError, CliResult};
#[derive(Deserialize)]
struct Args {
arg_input: Option<String>,
flag_output: Option<String>,
cmd_compress: bool,
cmd_decompress: bool,
cmd_check: bool,
cmd_validate: bool,
flag_user_agent: Option<String>,
flag_timeout: u16,
flag_jobs: Option<usize>,
flag_quiet: bool,
flag_progressbar: bool,
}
impl From<snap::Error> for CliError {
fn from(err: snap::Error) -> CliError {
CliError::Other(format!("Snap error: {err:?}"))
}
}
impl From<gzp::GzpError> for CliError {
fn from(err: gzp::GzpError) -> CliError {
CliError::Other(format!("Gzp error: {err:?}"))
}
}
pub fn run(argv: &[&str]) -> CliResult<()> {
let args: Args = util::get_args(USAGE, argv)?;
let input_bytes;
// create a temporary file to write the download file to
// this is automatically deleted when temp_download goes out of scope
let temp_download = NamedTempFile::new()?;
let input_reader: Box<dyn BufRead> = if let Some(uri) = &args.arg_input {
let path = if Url::parse(uri).is_ok() && uri.starts_with("http") {
// its a remote file, download it first
let future = util::download_file(
uri,
temp_download.path().to_path_buf(),
args.flag_progressbar && !args.cmd_check && !args.flag_quiet,
args.flag_user_agent,
Some(args.flag_timeout),
if args.cmd_check {
Some(50) // only download 50 bytes when checking for a snappy header
} else {
None
},
);
tokio::runtime::Runtime::new()?.block_on(future)?;
// safety: temp_download is a NamedTempFile, so we know that it can be converted
let temp_download_path = temp_download.path().to_str().unwrap().to_string();
temp_download_path
} else {
// its a local file
uri.to_string()
};
let file = fs::File::open(path)?;
input_bytes = file.metadata()?.len();
Box::new(io::BufReader::with_capacity(
config::DEFAULT_RDR_BUFFER_CAPACITY,
file,
))
} else {
input_bytes = 0;
Box::new(io::BufReader::new(stdin().lock()))
};
let output_writer: Box<dyn Write + Send + 'static> = match &args.flag_output {
Some(output_path) => Box::new(io::BufWriter::with_capacity(
config::DEFAULT_WTR_BUFFER_CAPACITY,
fs::File::create(output_path)?,
)),
None => Box::new(io::BufWriter::with_capacity(
config::DEFAULT_WTR_BUFFER_CAPACITY,
io::stdout(),
)),
};
if args.cmd_compress {
let mut jobs = util::njobs(args.flag_jobs);
if jobs > 1 {
jobs -= 1; // save one thread for other tasks
}
compress(input_reader, output_writer, jobs, gzp::BUFSIZE * 2)?;
let compressed_bytes = if let Some(path) = &args.flag_output {
fs::metadata(path)?.len()
} else {
0
};
if !args.flag_quiet && compressed_bytes > 0 {
let compression_ratio = input_bytes as f64 / compressed_bytes as f64;
winfo!(
"Compression successful. Compressed bytes: {}, Decompressed bytes: {}, \
Compression ratio: {:.3}:1, Space savings: {} - {:.2}%",
indicatif::HumanBytes(compressed_bytes),
indicatif::HumanBytes(input_bytes),
compression_ratio,
indicatif::HumanBytes(
input_bytes
.checked_sub(compressed_bytes)
.unwrap_or_default()
),
(1.0 - (compressed_bytes as f64 / input_bytes as f64)) * 100.0
);
}
} else if args.cmd_decompress {
let decompressed_bytes = decompress(input_reader, output_writer)?;
if !args.flag_quiet {
let compression_ratio = decompressed_bytes as f64 / input_bytes as f64;
winfo!(
"Decompression successful. Compressed bytes: {}, Decompressed bytes: {}, \
Compression ratio: {:.3}:1",
indicatif::HumanBytes(input_bytes),
indicatif::HumanBytes(decompressed_bytes),
compression_ratio,
);
}
} else if args.cmd_validate {
if args.arg_input.is_none() {
return fail_incorrectusage_clierror!(
"stdin is not supported by the snappy validate subcommand."
);
}
let Ok(decompressed_bytes) = validate(input_reader) else {
return fail_clierror!("Not a valid snappy file.");
};
if !args.flag_quiet {
let compression_ratio = decompressed_bytes as f64 / input_bytes as f64;
winfo!(
"Valid snappy file. Compressed bytes: {}, Decompressed bytes: {}, Compression \
ratio: {:.3}:1, Space savings: {} - {:.2}%",
indicatif::HumanBytes(input_bytes),
indicatif::HumanBytes(decompressed_bytes),
compression_ratio,
indicatif::HumanBytes(
decompressed_bytes
.checked_sub(input_bytes)
.unwrap_or_default()
),
(1.0 - (input_bytes as f64 / decompressed_bytes as f64)) * 100.0
);
}
} else if args.cmd_check {
let check_ok = check(input_reader);
if args.flag_quiet {
if check_ok {
return Ok(());
}
return fail!("Not a snappy file.");
} else if check_ok {
winfo!("Snappy file.");
} else {
return fail!("Not a snappy file.");
}
}
Ok(())
}
// multithreaded streaming snappy compression
pub fn compress<R: Read, W: Write + Send + 'static>(
mut src: R,
dst: W,
jobs: usize,
buf_size: usize,
) -> CliResult<()> {
let mut writer = ParCompressBuilder::<Snap>::new()
.num_threads(jobs)?
// the buffer size must be at least gzp::DICT_SIZE
.buffer_size(if buf_size < gzp::DICT_SIZE {
gzp::DICT_SIZE
} else {
buf_size
})?
.pin_threads(Some(0))
.from_writer(dst);
io::copy(&mut src, &mut writer)?;
writer.finish()?;
Ok(())
}
// single-threaded streaming snappy decompression
fn decompress<R: Read, W: Write>(src: R, mut dst: W) -> CliResult<u64> {
let mut src = snap::read::FrameDecoder::new(src);
let decompressed_bytes = io::copy(&mut src, &mut dst)?;
Ok(decompressed_bytes)
}
// quickly check if a file is a snappy file
// note that the fn only reads the first 50 bytes of the file
// and does not check the entire file for validity
fn check<R: Read>(src: R) -> bool {
let src = snap::read::FrameDecoder::new(src);
// read the first 50 or less bytes of a file. The snap decoder will return an error
// if the file does not start with a valid snappy header
let mut buffer = Vec::with_capacity(50);
src.take(50).read_to_end(&mut buffer).is_ok()
}
// validate an entire snappy file by decompressing it to sink (i.e. /dev/null). This is useful for
// checking if a snappy file is corrupted.
// Note that this is more expensive than check() as it has to decompress the entire file.
fn validate<R: Read>(src: R) -> CliResult<u64> {
let mut src = snap::read::FrameDecoder::new(src);
let mut sink = io::sink();
match io::copy(&mut src, &mut sink) {
Ok(decompressed_bytes) => Ok(decompressed_bytes),
Err(err) => fail_clierror!("Error validating snappy file: {err:?}"),
}
}