Skip to content

Commit

Permalink
support RFC3339 style timestamps in arrow-json (#3449)
Browse files Browse the repository at this point in the history
* Use array_value_to_string in arrow-json

* Update tests

* Add test to write timestamps with timezone in json

* Add use_z=true to to_rfc3339

* Fix the write_timestamps_tz test

* Add arrow-json/chrono-tz to top level Cargo.toml

* Remove chrono-tz requirements from arrow-json

* Fix linting errors
  • Loading branch information
JayjeetAtGithub authored Jan 6, 2023
1 parent 2d2d0a3 commit acefeef
Show file tree
Hide file tree
Showing 2 changed files with 69 additions and 8 deletions.
3 changes: 2 additions & 1 deletion arrow-cast/src/display.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ use arrow_array::types::*;
use arrow_array::*;
use arrow_buffer::ArrowNativeType;
use arrow_schema::*;
use chrono::prelude::SecondsFormat;

macro_rules! make_string {
($array_type:ty, $column: ident, $row: ident) => {{
Expand Down Expand Up @@ -157,7 +158,7 @@ macro_rules! make_string_datetime_with_tz {
let s = match $tz_string.parse::<Tz>() {
Ok(tz) => array
.value_as_datetime_with_tz($row, tz)
.map(|d| format!("{}", d.to_rfc3339()))
.map(|d| format!("{}", d.to_rfc3339_opts(SecondsFormat::AutoSi, true)))
.unwrap_or_else(|| "ERROR CONVERTING DATE".to_string()),
Err(_) => array
.value_as_datetime($row)
Expand Down
74 changes: 67 additions & 7 deletions arrow-json/src/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,8 @@ use arrow_array::types::*;
use arrow_array::*;
use arrow_schema::*;

use arrow_cast::display::array_value_to_string;

fn primitive_array_to_json<T>(array: &ArrayRef) -> Result<Vec<Value>, ArrowError>
where
T: ArrowPrimitiveType,
Expand Down Expand Up @@ -217,17 +219,16 @@ macro_rules! set_column_by_array_type {

macro_rules! set_temporal_column_by_array_type {
($array_type:ident, $col_name:ident, $rows:ident, $array:ident, $row_count:ident, $cast_fn:ident) => {
let arr = $array.as_any().downcast_ref::<$array_type>().unwrap();

$rows
.iter_mut()
.enumerate()
.take($row_count)
.for_each(|(i, row)| {
if !arr.is_null(i) {
if let Some(v) = arr.$cast_fn(i) {
row.insert($col_name.to_string(), v.to_string().into());
}
if !$array.is_null(i) {
row.insert(
$col_name.to_string(),
array_value_to_string($array, i).unwrap().to_string().into(),
);
}
});
};
Expand Down Expand Up @@ -925,7 +926,66 @@ mod tests {

assert_json_eq(
&buf,
r#"{"nanos":"2018-11-13 17:11:10.011375885","micros":"2018-11-13 17:11:10.011375","millis":"2018-11-13 17:11:10.011","secs":"2018-11-13 17:11:10","name":"a"}
r#"{"micros":"2018-11-13T17:11:10.011375","millis":"2018-11-13T17:11:10.011","name":"a","nanos":"2018-11-13T17:11:10.011375885","secs":"2018-11-13T17:11:10"}
{"name":"b"}
"#,
);
}

#[test]
fn write_timestamps_with_tz() {
let ts_string = "2018-11-13T17:11:10.011375885995";
let ts_nanos = ts_string
.parse::<chrono::NaiveDateTime>()
.unwrap()
.timestamp_nanos();
let ts_micros = ts_nanos / 1000;
let ts_millis = ts_micros / 1000;
let ts_secs = ts_millis / 1000;

let arr_nanos = TimestampNanosecondArray::from(vec![Some(ts_nanos), None]);
let arr_micros = TimestampMicrosecondArray::from(vec![Some(ts_micros), None]);
let arr_millis = TimestampMillisecondArray::from(vec![Some(ts_millis), None]);
let arr_secs = TimestampSecondArray::from(vec![Some(ts_secs), None]);
let arr_names = StringArray::from(vec![Some("a"), Some("b")]);

let tz = "+00:00".to_string();

let arr_nanos = arr_nanos.with_timezone(&tz);
let arr_micros = arr_micros.with_timezone(&tz);
let arr_millis = arr_millis.with_timezone(&tz);
let arr_secs = arr_secs.with_timezone(&tz);

let schema = Schema::new(vec![
Field::new("nanos", arr_nanos.data_type().clone(), true),
Field::new("micros", arr_micros.data_type().clone(), true),
Field::new("millis", arr_millis.data_type().clone(), true),
Field::new("secs", arr_secs.data_type().clone(), true),
Field::new("name", arr_names.data_type().clone(), true),
]);
let schema = Arc::new(schema);

let batch = RecordBatch::try_new(
schema,
vec![
Arc::new(arr_nanos),
Arc::new(arr_micros),
Arc::new(arr_millis),
Arc::new(arr_secs),
Arc::new(arr_names),
],
)
.unwrap();

let mut buf = Vec::new();
{
let mut writer = LineDelimitedWriter::new(&mut buf);
writer.write_batches(&[batch]).unwrap();
}

assert_json_eq(
&buf,
r#"{"micros":"2018-11-13T17:11:10.011375Z","millis":"2018-11-13T17:11:10.011Z","name":"a","nanos":"2018-11-13T17:11:10.011375885Z","secs":"2018-11-13T17:11:10Z"}
{"name":"b"}
"#,
);
Expand Down

0 comments on commit acefeef

Please sign in to comment.