-
Notifications
You must be signed in to change notification settings - Fork 73
/
Copy pathdatefmt.rs
402 lines (344 loc) · 15.6 KB
/
datefmt.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
static USAGE: &str = r#"
Formats recognized date fields (19 formats recognized) to a specified date format
using strftime date format specifiers.
See https://github.com/dathere/belt/tree/main/dateparser#accepted-date-formats for
recognized date formats.
See https://docs.rs/chrono/latest/chrono/format/strftime/ for
accepted date format specifiers for --formatstr.
Defaults to ISO 8601/RFC 3339 format when --formatstr is not specified.
( "%Y-%m-%dT%H:%M:%S%z" - e.g. 2001-07-08T00:34:60.026490+09:30 )
Examples:
Format dates in Open Date column to ISO 8601/RFC 3339 format:
$ qsv datefmt 'Open Date' file.csv
Format multiple date columns in file.csv to ISO 8601/RFC 3339 format:
$ qsv datefmt 'Open Date,Modified Date,Closed Date' file.csv
Format all columns that end with "_date" case-insensitive in file.csv to ISO 8601/RFC 3339 format:
$ qsv datefmt '/(?i)_date$/' file.csv
Format dates in OpenDate column using '%Y-%m-%d' format:
$ qsv datefmt OpenDate --formatstr '%Y-%m-%d' file.csv
Format multiple date columns using '%Y-%m-%d' format:
$ qsv datefmt OpenDate,CloseDate,ReopenDate --formatstr '%Y-%m-%d' file.csv
Get the week number for OpenDate and store it in the week_number column:
$ qsv datefmt OpenDate --formatstr '%V' --new-column week_number file.csv
Get the day of the week for several date columns and store it in the corresponding weekday columns:
$ qsv datefmt OpenDate,CloseDate --formatstr '%u' --rename Open_weekday,Close_weekday file.csv
For more extensive examples, see https://github.com/dathere/qsv/blob/master/tests/test_datefmt.rs.
Usage:
qsv datefmt [--formatstr=<string>] [options] <column> [<input>]
qsv datefmt --help
datefmt arguments:
<column> The column/s to apply the date formats to.
Note that the <column> argument supports multiple columns.
See 'qsv select --help' for the format details.
--formatstr=<string> The date format to use for the datefmt operation.
The date format to use. For formats, see
https://docs.rs/chrono/latest/chrono/format/strftime/
Default to ISO 8601 / RFC 3339 date & time format -
"%Y-%m-%dT%H:%M:%S%z" - e.g. 2001-07-08T00:34:60.026490+09:30
[default: %+]
<input> The input file to read from. If not specified, reads from stdin.
datefmt options:
-c, --new-column <name> Put the transformed values in a new column instead.
-r, --rename <name> New name for the transformed column.
--prefer-dmy Prefer to parse dates in dmy format. Otherwise, use mdy format.
--keep-zero-time If a formatted date ends with "T00:00:00+00:00", keep the time
instead of removing it.
--input-tz=<string> The timezone to use for the input date if the date does not have
timezone specified. The timezone must be a valid IANA timezone name or
the string "local" for the local timezone.
See https://en.wikipedia.org/wiki/List_of_tz_database_time_zones
for a list of valid timezone names.
[default: UTC]
--output-tz=<string> The timezone to use for the output date.
The timezone must be a valid IANA timezone name or the string "local".
[default: UTC]
--default-tz=<string> The timezone to use for BOTH input and output dates when they do have timezone.
Shortcut for --input-tz and --output-tz set to the same timezone.
The timezone must be a valid IANA timezone name or the string "local".
--utc Shortcut for --input-tz and --output-tz set to UTC.
--zulu Shortcut for --output-tz set to UTC and --formatstr set to "%Y-%m-%dT%H:%M:%SZ".
-R, --ts-resolution <res> The resolution to use when parsing Unix timestamps.
Valid values are "sec", "milli", "micro", "nano".
[default: sec]
-j, --jobs <arg> The number of jobs to run in parallel.
When not set, the number of jobs is set to the number of CPUs detected.
-b, --batch <size> The number of rows per batch to load into memory, before running in parallel.
Automatically determined for CSV files with more than 50000 rows.
Set to 0 to load all rows in one batch. Set to 1 to force batch optimization
even for files with less than 50000 rows.
[default: 50000]
Common options:
-h, --help Display this message
-o, --output <file> Write output to <file> instead of stdout.
-n, --no-headers When set, the first row will not be interpreted
as headers.
-d, --delimiter <arg> The field delimiter for reading CSV data.
Must be a single character. (default: ,)
-p, --progressbar Show progress bars. Not valid for stdin.
"#;
use std::str::FromStr;
use chrono::{DateTime, TimeZone, Utc};
use chrono_tz::Tz;
#[cfg(any(feature = "feature_capable", feature = "lite"))]
use indicatif::{ProgressBar, ProgressDrawTarget};
use qsv_dateparser::parse_with_preference_and_timezone;
use rayon::{
iter::{IndexedParallelIterator, ParallelIterator},
prelude::IntoParallelRefIterator,
};
use serde::Deserialize;
use crate::{
config::{Config, Delimiter},
select::SelectColumns,
util,
util::replace_column_value,
CliResult,
};
#[allow(dead_code)]
#[derive(Deserialize)]
struct Args {
arg_column: SelectColumns,
arg_input: Option<String>,
flag_rename: Option<String>,
flag_prefer_dmy: bool,
flag_keep_zero_time: bool,
flag_ts_resolution: String,
flag_formatstr: String,
flag_input_tz: String,
flag_output_tz: String,
flag_default_tz: Option<String>,
flag_utc: bool,
flag_zulu: bool,
flag_batch: usize,
flag_jobs: Option<usize>,
flag_new_column: Option<String>,
flag_output: Option<String>,
flag_no_headers: bool,
flag_delimiter: Option<Delimiter>,
flag_progressbar: bool,
}
#[derive(Default, Clone, Copy)]
enum TimestampResolution {
#[default]
Second,
Millisecond,
Microsecond,
Nanosecond,
}
impl FromStr for TimestampResolution {
type Err = String;
fn from_str(s: &str) -> Result<Self, Self::Err> {
match s.to_lowercase().as_str() {
"sec" => Ok(TimestampResolution::Second),
"milli" => Ok(TimestampResolution::Millisecond),
"micro" => Ok(TimestampResolution::Microsecond),
"nano" => Ok(TimestampResolution::Nanosecond),
_ => Err(format!("Invalid timestamp resolution: {s}")),
}
}
}
#[inline]
fn unix_timestamp(input: &str, resolution: TimestampResolution) -> Option<DateTime<Utc>> {
let Ok(ts_input_val) = atoi_simd::parse::<i64>(input.as_bytes()) else {
return None;
};
match resolution {
TimestampResolution::Second => Utc
.timestamp_opt(ts_input_val, 0)
.single()
.map(|result| result.with_timezone(&Utc)),
TimestampResolution::Millisecond => Utc
.timestamp_millis_opt(ts_input_val)
.single()
.map(|result| result.with_timezone(&Utc)),
TimestampResolution::Microsecond => Utc
.timestamp_micros(ts_input_val)
.single()
.map(|result| result.with_timezone(&Utc)),
TimestampResolution::Nanosecond => {
let result = Utc.timestamp_nanos(ts_input_val).with_timezone(&Utc);
Some(result)
},
}
}
pub fn run(argv: &[&str]) -> CliResult<()> {
let args: Args = util::get_args(USAGE, argv)?;
let rconfig = Config::new(args.arg_input.as_ref())
.delimiter(args.flag_delimiter)
.no_headers(args.flag_no_headers)
.select(args.arg_column);
let mut rdr = rconfig.reader()?;
let mut wtr = Config::new(args.flag_output.as_ref()).writer()?;
let headers = rdr.byte_headers()?.clone();
let sel = rconfig.selection(&headers)?;
let tsres = args.flag_ts_resolution.parse::<TimestampResolution>()?;
let mut headers = rdr.headers()?.clone();
if let Some(new_name) = args.flag_rename {
let new_col_names = util::ColumnNameParser::new(&new_name).parse()?;
if new_col_names.len() != sel.len() {
return fail_incorrectusage_clierror!(
"Number of new columns does not match input column selection."
);
}
for (i, col_index) in sel.iter().enumerate() {
headers = replace_column_value(&headers, *col_index, &new_col_names[i]);
}
}
if !rconfig.no_headers {
if let Some(new_column) = &args.flag_new_column {
headers.push_field(new_column);
}
wtr.write_record(&headers)?;
}
let mut flag_formatstr = args.flag_formatstr;
let flag_new_column = args.flag_new_column;
// prep progress bar
#[cfg(any(feature = "feature_capable", feature = "lite"))]
let show_progress =
(args.flag_progressbar || util::get_envvar_flag("QSV_PROGRESSBAR")) && !rconfig.is_stdin();
#[cfg(any(feature = "feature_capable", feature = "lite"))]
let progress = ProgressBar::with_draw_target(None, ProgressDrawTarget::stderr_with_hz(5));
#[cfg(any(feature = "feature_capable", feature = "lite"))]
if show_progress {
util::prep_progress(&progress, util::count_rows(&rconfig)?);
} else {
progress.set_draw_target(ProgressDrawTarget::hidden());
}
let prefer_dmy = args.flag_prefer_dmy || rconfig.get_dmy_preference();
let keep_zero_time = args.flag_keep_zero_time;
// amortize memory allocation by reusing record
#[allow(unused_assignments)]
let mut batch_record = csv::StringRecord::new();
let num_jobs = util::njobs(args.flag_jobs);
// reuse batch buffers
let batchsize = util::optimal_batch_size(&rconfig, args.flag_batch, num_jobs);
let mut batch = Vec::with_capacity(batchsize);
let mut batch_results = Vec::with_capacity(batchsize);
// set timezone variables
let default_tz = match args.flag_default_tz.as_deref() {
Some(tz) => {
if tz.eq_ignore_ascii_case("local") {
if let Some(tz) = localzone::get_local_zone() {
log::info!("default-tz local timezone: {tz}");
tz.parse::<Tz>()?
} else {
log::warn!("default-tz local timezone {tz} not found. Defaulting to UTC.");
chrono_tz::UTC
}
} else {
tz.parse::<Tz>()?
}
},
None => chrono_tz::UTC,
};
let mut input_tz = if let Ok(tz) = args.flag_input_tz.parse::<Tz>() {
tz
} else if args.flag_input_tz.eq_ignore_ascii_case("local") {
if let Some(tz) = localzone::get_local_zone() {
log::info!("input-tz local timezone: {tz}");
tz.parse::<Tz>()?
} else {
default_tz
}
} else {
default_tz
};
#[allow(clippy::useless_let_if_seq)] // more readable this way
let mut output_tz = if let Ok(tz) = args.flag_output_tz.parse::<Tz>() {
tz
} else if args.flag_output_tz.eq_ignore_ascii_case("local") {
if let Some(tz) = localzone::get_local_zone() {
log::info!("output-tz local timezone: {tz}");
tz.parse::<Tz>()?
} else {
default_tz
}
} else {
default_tz
};
if args.flag_utc {
input_tz = chrono_tz::UTC;
output_tz = chrono_tz::UTC;
}
if args.flag_zulu {
output_tz = chrono_tz::UTC;
flag_formatstr = "%Y-%m-%dT%H:%M:%SZ".to_string();
}
let is_output_utc = output_tz == chrono_tz::UTC;
// main loop to read CSV and construct batches for parallel processing.
// each batch is processed via Rayon parallel iterator.
// loop exits when batch is empty.
'batch_loop: loop {
for _ in 0..batchsize {
match rdr.read_record(&mut batch_record) {
Ok(true) => batch.push(std::mem::take(&mut batch_record)),
Ok(false) => break, // nothing else to add to batch
Err(e) => {
return fail_clierror!("Error reading file: {e}");
},
}
}
if batch.is_empty() {
// break out of infinite loop when at EOF
break 'batch_loop;
}
// do actual datefmt via Rayon parallel iterator
batch
.par_iter()
.map(|record_item| {
let mut record = record_item.clone();
let mut cell = String::new();
#[allow(unused_assignments)]
let mut formatted_date = String::new();
let mut format_date_with_tz: DateTime<Tz>;
let mut parsed_date;
let new_column = flag_new_column.is_some();
for col_index in &*sel {
record[*col_index].clone_into(&mut cell);
if !cell.is_empty() {
parsed_date = if let Some(ts) = unix_timestamp(&cell, tsres) {
Ok(ts)
} else {
parse_with_preference_and_timezone(&cell, prefer_dmy, &input_tz)
};
if let Ok(format_date) = parsed_date {
// don't need to call with_timezone() if output_tz is UTC
// as format_date is already in UTC
formatted_date = if is_output_utc {
format_date.format(&flag_formatstr).to_string()
} else {
format_date_with_tz = format_date.with_timezone(&output_tz);
format_date_with_tz.format(&flag_formatstr).to_string()
};
if !keep_zero_time && formatted_date.ends_with("T00:00:00+00:00") {
formatted_date[..10].clone_into(&mut cell);
} else {
formatted_date.clone_into(&mut cell);
}
}
}
if new_column {
record.push_field(&cell);
} else {
record = replace_column_value(&record, *col_index, &cell);
}
}
record
})
.collect_into_vec(&mut batch_results);
// rayon collect() guarantees original order, so we can just append results each batch
for result_record in &batch_results {
wtr.write_record(result_record)?;
}
#[cfg(any(feature = "feature_capable", feature = "lite"))]
if show_progress {
progress.inc(batch.len() as u64);
}
batch.clear();
} // end batch loop
#[cfg(any(feature = "feature_capable", feature = "lite"))]
if show_progress {
util::finish_progress(&progress);
}
Ok(wtr.flush()?)
}