Skip to content

Commit

Permalink
Support "values" orient (array of arrays) in Nested JSON reader (rapi…
Browse files Browse the repository at this point in the history
…dsai#12498)

Legacy GPU JSON reader can read "values" orient data in JSON string (only JSON lines).
With this PR change, Nested JSON reader can also reader "values" orient data for both JSON lines and non-line JSON string.

Examples:
```
import cudf
json="[[1, 2, 3], [4, 5], [7, 8, 9]]"
cudf.read_json(json, engine="cudf_experimental")
   0  1     2
0  1  2     3
1  4  5  <NA>
2  7  8     9
json="[1, 2, 3]\n [4, 5, null]\n [7, 8, [9]]"
cudf.read_json(json, engine="cudf_experimental", lines=True)
   0  1     2
0  1  2  None
1  4  5  None
2  7  8   [9]
```


Note that pandas passes "values" data but with `orient="records"` argument, but it is parsed as "values". Similar support is added here too. Passing values with `orient="records"` will still work).

closes rapidsai#12446

#### Changes in this PR:
- Added a lambda `is_non_list_parent` - added extra condition to skip array (level2 list) nodes.
- New algorithm to generate `list_indices` within each row.
- Use it for hashing.
- Use it for column names. (but generated again).

**Changes to tests:**
- added row orient experiemental reader testing param
- datetime, duration needs to be wrapped with double quotes if it has whitespace or colon. (limitation of nested json reader, also to be compliant with JSON specification).

Authors:
  - Karthikeyan (https://github.com/karthikeyann)

Approvers:
  - Vukasin Milovanovic (https://github.com/vuule)
  - Elias Stehle (https://github.com/elstehle)
  - GALI PREM SAGAR (https://github.com/galipremsagar)

URL: rapidsai#12498
  • Loading branch information
karthikeyann authored Jan 20, 2023
1 parent c14b3c8 commit cc37c72
Show file tree
Hide file tree
Showing 7 changed files with 473 additions and 101 deletions.
10 changes: 2 additions & 8 deletions cpp/src/io/json/experimental/read_json.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -180,14 +180,8 @@ table_with_metadata read_json(host_span<std::unique_ptr<datasource>> sources,

auto const buffer = get_record_range_raw_input(sources, reader_opts, stream);

try {
return cudf::io::json::detail::device_parse_nested_json(buffer, reader_opts, stream, mr);
} catch (cudf::logic_error const& err) {
#ifdef NJP_DEBUG_PRINT
std::cout << "Fall back to host nested json parser" << std::endl;
#endif
return cudf::io::json::detail::host_parse_nested_json(buffer, reader_opts, stream, mr);
}
return cudf::io::json::detail::device_parse_nested_json(buffer, reader_opts, stream, mr);
// For debug purposes, use host_parse_nested_json()
}

} // namespace cudf::io::detail::json::experimental
146 changes: 131 additions & 15 deletions cpp/src/io/json/json_column.cu
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,8 @@ void print_tree(host_span<SymbolT const> input,
* @param sorted_col_ids Sorted column ids of nodes
* @param ordered_node_ids Node ids of nodes sorted by column ids
* @param row_offsets Row offsets of nodes
* @param is_array_of_arrays Whether the tree is an array of arrays
* @param row_array_parent_col_id Column id of row array, if is_array_of_arrays is true
* @param stream CUDA stream used for device memory operations and kernel launches
* @return A tuple of column tree representation of JSON string, column ids of columns, and
* max row offsets of columns
Expand All @@ -115,6 +117,8 @@ reduce_to_column_tree(tree_meta_t& tree,
device_span<NodeIndexT> sorted_col_ids,
device_span<NodeIndexT> ordered_node_ids,
device_span<size_type> row_offsets,
bool is_array_of_arrays,
NodeIndexT const row_array_parent_col_id,
rmm::cuda_stream_view stream)
{
CUDF_FUNC_RANGE();
Expand Down Expand Up @@ -197,6 +201,15 @@ reduce_to_column_tree(tree_meta_t& tree,
: col_ids[parent_node_id];
});

// condition is true if parent is not a list, or sentinel/root
// Special case to return true if parent is a list and is_array_of_arrays is true
auto is_non_list_parent = [column_categories = column_categories.begin(),
is_array_of_arrays,
row_array_parent_col_id] __device__(auto parent_col_id) -> bool {
return !(parent_col_id == parent_node_sentinel ||
column_categories[parent_col_id] == NC_LIST &&
(!is_array_of_arrays || parent_col_id != row_array_parent_col_id));
};
// Mixed types in List children go to different columns,
// so all immediate children of list column should have same max_row_offsets.
// create list's children max_row_offsets array. (initialize to zero)
Expand Down Expand Up @@ -244,22 +257,23 @@ reduce_to_column_tree(tree_meta_t& tree,
unique_col_ids.end(),
max_row_offsets.begin(),
[column_categories = column_categories.begin(),
parent_col_ids = parent_col_ids.begin(),
max_row_offsets = max_row_offsets.begin()] __device__(size_type col_id) {
is_non_list_parent,
parent_col_ids = parent_col_ids.begin(),
max_row_offsets = max_row_offsets.begin()] __device__(size_type col_id) {
auto parent_col_id = parent_col_ids[col_id];
while (parent_col_id != parent_node_sentinel and
column_categories[parent_col_id] != node_t::NC_LIST) {
// condition is true if parent is not a list, or sentinel/root
while (is_non_list_parent(parent_col_id)) {
col_id = parent_col_id;
parent_col_id = parent_col_ids[parent_col_id];
}
return max_row_offsets[col_id];
},
[column_categories = column_categories.begin(),
parent_col_ids = parent_col_ids.begin()] __device__(size_type col_id) {
is_non_list_parent,
parent_col_ids = parent_col_ids.begin()] __device__(size_type col_id) {
auto parent_col_id = parent_col_ids[col_id];
return parent_col_id != parent_node_sentinel and
(column_categories[parent_col_id] != node_t::NC_LIST);
// Parent is not a list, or sentinel/root
// condition is true if parent is not a list, or sentinel/root
return is_non_list_parent(parent_col_id);
});

return std::tuple{tree_meta_t{std::move(column_categories),
Expand All @@ -271,6 +285,35 @@ reduce_to_column_tree(tree_meta_t& tree,
std::move(max_row_offsets)};
}

/**
* @brief Get the column indices for the values column for array of arrays rows
*
* @param row_array_children_level The level of the row array's children
* @param d_tree The tree metadata
* @param col_ids The column ids
* @param num_columns The number of columns
* @param stream The stream to use
* @return The value columns' indices
*/
rmm::device_uvector<NodeIndexT> get_values_column_indices(TreeDepthT const row_array_children_level,
tree_meta_t const& d_tree,
device_span<NodeIndexT> col_ids,
size_type const num_columns,
rmm::cuda_stream_view stream)
{
CUDF_FUNC_RANGE();
auto [level2_nodes, level2_indices] = get_array_children_indices(
row_array_children_level, d_tree.node_levels, d_tree.parent_node_ids, stream);
auto col_id_location = thrust::make_permutation_iterator(col_ids.begin(), level2_nodes.begin());
rmm::device_uvector<NodeIndexT> values_column_indices(num_columns, stream);
thrust::scatter(rmm::exec_policy(stream),
level2_indices.begin(),
level2_indices.end(),
col_id_location,
values_column_indices.begin());
return values_column_indices;
}

/**
* @brief Copies strings specified by pair of begin, end offsets to host vector of strings.
*
Expand Down Expand Up @@ -347,6 +390,8 @@ struct json_column_data {
* @param col_ids Column ids of the nodes in the tree
* @param row_offsets Row offsets of the nodes in the tree
* @param root Root node of the `d_json_column` tree
* @param is_array_of_arrays Whether the tree is an array of arrays
* @param is_enabled_lines Whether the input is a line-delimited JSON
* @param stream CUDA stream used for device memory operations and kernel launches
* @param mr Device memory resource used to allocate the device memory
* of child_offets and validity members of `d_json_column`
Expand All @@ -356,6 +401,8 @@ void make_device_json_column(device_span<SymbolT const> input,
device_span<NodeIndexT> col_ids,
device_span<size_type> row_offsets,
device_json_column& root,
bool is_array_of_arrays,
bool is_enabled_lines,
rmm::cuda_stream_view stream,
rmm::mr::device_memory_resource* mr)
{
Expand All @@ -370,9 +417,29 @@ void make_device_json_column(device_span<SymbolT const> input,
thrust::stable_sort_by_key(
rmm::exec_policy(stream), sorted_col_ids.begin(), sorted_col_ids.end(), node_ids.begin());

NodeIndexT const row_array_parent_col_id = [&]() {
if (!is_array_of_arrays) return parent_node_sentinel;
auto const list_node_index = is_enabled_lines ? 0 : 1;
NodeIndexT value;
CUDF_CUDA_TRY(cudaMemcpyAsync(&value,
col_ids.data() + list_node_index,
sizeof(NodeIndexT),
cudaMemcpyDefault,
stream.value()));
stream.synchronize();
return value;
}();

// 1. gather column information.
auto [d_column_tree, d_unique_col_ids, d_max_row_offsets] =
reduce_to_column_tree(tree, col_ids, sorted_col_ids, node_ids, row_offsets, stream);
reduce_to_column_tree(tree,
col_ids,
sorted_col_ids,
node_ids,
row_offsets,
is_array_of_arrays,
row_array_parent_col_id,
stream);
auto num_columns = d_unique_col_ids.size();
auto unique_col_ids = cudf::detail::make_std_vector_async(d_unique_col_ids, stream);
auto column_categories =
Expand All @@ -384,6 +451,24 @@ void make_device_json_column(device_span<SymbolT const> input,
auto max_row_offsets = cudf::detail::make_std_vector_async(d_max_row_offsets, stream);
std::vector<std::string> column_names = copy_strings_to_host(
input, d_column_tree.node_range_begin, d_column_tree.node_range_end, stream);
// array of arrays column names
if (is_array_of_arrays) {
TreeDepthT const row_array_children_level = is_enabled_lines ? 1 : 2;
auto values_column_indices =
get_values_column_indices(row_array_children_level, tree, col_ids, num_columns, stream);
auto h_values_column_indices =
cudf::detail::make_std_vector_async(values_column_indices, stream);
std::transform(unique_col_ids.begin(),
unique_col_ids.end(),
column_names.begin(),
column_names.begin(),
[&h_values_column_indices, &column_parent_ids, row_array_parent_col_id](
auto col_id, auto name) mutable {
return column_parent_ids[col_id] == row_array_parent_col_id
? std::to_string(h_values_column_indices[col_id])
: name;
});
}

auto to_json_col_type = [](auto category) {
switch (category) {
Expand Down Expand Up @@ -440,7 +525,11 @@ void make_device_json_column(device_span<SymbolT const> input,
std::string name = "";
auto parent_col_id = column_parent_ids[this_col_id];
if (parent_col_id == parent_node_sentinel || column_categories[parent_col_id] == NC_LIST) {
name = list_child_name;
if (is_array_of_arrays && parent_col_id == row_array_parent_col_id) {
name = column_names[this_col_id];
} else {
name = list_child_name;
}
} else if (column_categories[parent_col_id] == NC_FN) {
auto field_name_col_id = parent_col_id;
parent_col_id = column_parent_ids[parent_col_id];
Expand Down Expand Up @@ -478,7 +567,7 @@ void make_device_json_column(device_span<SymbolT const> input,
"A mix of lists and structs within the same column is not supported");
}
}
CUDF_EXPECTS(parent_col.child_columns.count(name) == 0, "duplicate column name");
CUDF_EXPECTS(parent_col.child_columns.count(name) == 0, "duplicate column name: " + name);
// move into parent
device_json_column col(stream, mr);
initialize_json_columns(this_col_id, col);
Expand Down Expand Up @@ -801,7 +890,21 @@ table_with_metadata device_parse_nested_json(device_span<SymbolT const> d_input,
print_tree(h_input, gpu_tree, stream);
#endif

auto [gpu_col_id, gpu_row_offsets] = records_orient_tree_traversal(d_input, gpu_tree, stream);
bool const is_array_of_arrays = [&]() {
std::array<node_t, 2> h_node_categories = {NC_ERR, NC_ERR};
auto const size_to_copy = std::min(size_t{2}, gpu_tree.node_categories.size());
CUDF_CUDA_TRY(cudaMemcpyAsync(h_node_categories.data(),
gpu_tree.node_categories.data(),
sizeof(node_t) * size_to_copy,
cudaMemcpyDefault,
stream.value()));
stream.synchronize();
if (options.is_enabled_lines()) return h_node_categories[0] == NC_LIST;
return h_node_categories[0] == NC_LIST and h_node_categories[1] == NC_LIST;
}();

auto [gpu_col_id, gpu_row_offsets] = records_orient_tree_traversal(
d_input, gpu_tree, is_array_of_arrays, options.is_enabled_lines(), stream);

device_json_column root_column(stream, mr);
root_column.type = json_col_t::ListColumn;
Expand All @@ -812,7 +915,15 @@ table_with_metadata device_parse_nested_json(device_span<SymbolT const> d_input,
0);

// Get internal JSON column
make_device_json_column(d_input, gpu_tree, gpu_col_id, gpu_row_offsets, root_column, stream, mr);
make_device_json_column(d_input,
gpu_tree,
gpu_col_id,
gpu_row_offsets,
root_column,
is_array_of_arrays,
options.is_enabled_lines(),
stream,
mr);

// data_root refers to the root column of the data represented by the given JSON string
auto& data_root =
Expand All @@ -828,8 +939,9 @@ table_with_metadata device_parse_nested_json(device_span<SymbolT const> d_input,
auto constexpr single_child_col_count = 1;
CUDF_EXPECTS(data_root.type == json_col_t::ListColumn and
data_root.child_columns.size() == single_child_col_count and
data_root.child_columns.begin()->second.type == json_col_t::StructColumn,
"Currently the nested JSON parser only supports an array of (nested) objects");
data_root.child_columns.begin()->second.type ==
(is_array_of_arrays ? json_col_t::ListColumn : json_col_t::StructColumn),
"Input needs to be an array of arrays or an array of (nested) objects");

// Slice off the root list column, which has only a single row that contains all the structs
auto& root_struct_col = data_root.child_columns.begin()->second;
Expand Down Expand Up @@ -890,6 +1002,10 @@ table_with_metadata device_parse_nested_json(device_span<SymbolT const> d_input,
// Get this JSON column's cudf column and schema info, (modifies json_col)
auto [cudf_col, col_name_info] = device_json_column_to_cudf_column(
json_col, d_input, parse_opt, child_schema_element, stream, mr);
// TODO: RangeIndex as DataFrame.columns names for array of arrays
// if (is_array_of_arrays) {
// col_name_info.back().name = "";
// }

out_column_names.back().children = std::move(col_name_info);
out_columns.emplace_back(std::move(cudf_col));
Expand Down
Loading

0 comments on commit cc37c72

Please sign in to comment.