Skip to content

Commit

Permalink
ARROW-17178: [R] Support head() in arrow_dplyr_query with user-define…
Browse files Browse the repository at this point in the history
…d function (apache#13706)

This PR adds support for more types of queries that include calls to R code (i.e., `map_batches(..., .lazy = TRUE)`, user-defined functions in mutates, arranges, and filters, and custom extension type behaviour). Previously these queries failed because it wasn't possible to guarantee that the exec plan would be completely executed within a call to `RunWithCapturedR()` where we establish an event loop on the main R thread and launch a background thread to do "arrow stuff" that can queue tasks to run on the R thread.

The approach I took here was to stuff more of the ExecPlan-to-RecordBatchReader logic in a subclass of RecordBatchReader that doesn't call `plan->StartProducing()` until the first batch has been pulled. This lets you return a record batch reader and pass it around at the R level (currently how head/tail/a few other things are implemented), and as long as it's drained all at once (i.e., `reader$read_table()`) the calls into R will work. 

The R code calls within an exec plan *won't* work with `reader$read_next_batch()` or the C data interface because there we can't guarantee an event loop.

This also has the benefit of allowing us to inject some cancelability to the ExecPlan since we can check a StopToken after apache#13635 (ARROW-11841) for an interrupt (for all exec plans). The biggest benefit is, in my view, that the lifecycle of the ExecPlan is more explicit...before, the plan was stopped when the object was deleted but it was written in a way that I didn't understand for a long time. I think a reader subclass makes it more explicit and maybe will help to print out nested queries (since they're no longer eagerly evaluated).

An example of something that didn't work before that now does:

``` r
library(arrow, warn.conflicts = FALSE)
#> Some features are not enabled in this build of Arrow. Run `arrow_info()` for more information.
library(dplyr, warn.conflicts = FALSE)

register_scalar_function(
  "times_32",
  function(context, x) x * 32.0,
  int32(),
  float64(),
  auto_convert = TRUE
)

record_batch(a = 1:1000) %>%
  dplyr::mutate(b = times_32(a)) %>%
  as_record_batch_reader() %>%
  as_arrow_table()
#> Table
#> 1000 rows x 2 columns
#> $a <int32>
#> $b <double>

record_batch(a = 1:1000) %>%
  dplyr::mutate(fun_result = times_32(a)) %>%
  head(11) %>%
  dplyr::collect()
#> # A tibble: 11 × 2
#>        a fun_result
#>    <int>      <dbl>
#>  1     1         32
#>  2     2         64
#>  3     3         96
#>  4     4        128
#>  5     5        160
#>  6     6        192
#>  7     7        224
#>  8     8        256
#>  9     9        288
#> 10    10        320
#> 11    11        352
```

<sup>Created on 2022-07-25 by the [reprex package](https://reprex.tidyverse.org) (v2.0.1)</sup>

Lead-authored-by: Dewey Dunnington <[email protected]>
Co-authored-by: Dewey Dunnington <[email protected]>
Signed-off-by: Dewey Dunnington <[email protected]>
  • Loading branch information
paleolimbot and paleolimbot authored Sep 16, 2022
1 parent 557acf5 commit 5988363
Show file tree
Hide file tree
Showing 12 changed files with 416 additions and 266 deletions.
32 changes: 22 additions & 10 deletions r/R/arrowExports.R

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 0 additions & 7 deletions r/R/compute.R
Original file line number Diff line number Diff line change
Expand Up @@ -385,13 +385,6 @@ register_scalar_function <- function(name, fun, in_type, out_type,
update_cache = TRUE
)

# User-defined functions require some special handling
# in the query engine which currently require an opt-in using
# the R_ARROW_COLLECT_WITH_UDF environment variable while this
# behaviour is stabilized.
# TODO(ARROW-17178) remove the need for this!
Sys.setenv(R_ARROW_COLLECT_WITH_UDF = "true")

invisible(NULL)
}

Expand Down
9 changes: 6 additions & 3 deletions r/R/dplyr.R
Original file line number Diff line number Diff line change
Expand Up @@ -266,16 +266,19 @@ tail.arrow_dplyr_query <- function(x, n = 6L, ...) {
#' show_exec_plan()
show_exec_plan <- function(x) {
adq <- as_adq(x)
plan <- ExecPlan$create()

# do not show the plan if we have a nested query (as this will force the
# evaluation of the inner query/queries)
# TODO see if we can remove after ARROW-16628
if (is_collapsed(x) && has_head_tail(x$.data)) {
warn("The `ExecPlan` cannot be printed for a nested query.")
return(invisible(x))
}
final_node <- plan$Build(adq)
cat(plan$BuildAndShow(final_node))

result <- as_record_batch_reader(adq)
cat(result$Plan()$ToString())
result$Close()

invisible(x)
}

Expand Down
84 changes: 25 additions & 59 deletions r/R/query-engine.R
Original file line number Diff line number Diff line change
Expand Up @@ -194,13 +194,11 @@ ExecPlan <- R6Class("ExecPlan",
}
node
},
Run = function(node, as_table = FALSE) {
# a section of this code is used by `BuildAndShow()` too - the 2 need to be in sync
# Start of chunk used in `BuildAndShow()`
Run = function(node) {
assert_is(node, "ExecNode")

# Sorting and head/tail (if sorted) are handled in the SinkNode,
# created in ExecPlan_run
# created in ExecPlan_build
sorting <- node$extras$sort %||% list()
select_k <- node$extras$head %||% -1L
has_sorting <- length(sorting) > 0
Expand All @@ -214,16 +212,7 @@ ExecPlan <- R6Class("ExecPlan",
sorting$orders <- as.integer(sorting$orders)
}

# End of chunk used in `BuildAndShow()`

# If we are going to return a Table anyway, we do this in one step and
# entirely in one C++ call to ensure that we can execute user-defined
# functions from the worker threads spawned by the ExecPlan. If not, we
# use ExecPlan_run which returns a RecordBatchReader that can be
# manipulated in R code (but that right now won't work with
# user-defined functions).
exec_fun <- if (as_table) ExecPlan_read_table else ExecPlan_run
out <- exec_fun(
out <- ExecPlan_run(
self,
node,
sorting,
Expand All @@ -240,18 +229,13 @@ ExecPlan <- R6Class("ExecPlan",
slice_size <- node$extras$head %||% node$extras$tail
if (!is.null(slice_size)) {
out <- head(out, slice_size)
# We already have everything we need for the head, so StopProducing
self$Stop()
}
} else if (!is.null(node$extras$tail)) {
# TODO(ARROW-16630): proper BottomK support
# Reverse the row order to get back what we expect
out <- as_arrow_table(out)
out <- out[rev(seq_len(nrow(out))), , drop = FALSE]
# Put back into RBR
if (!as_table) {
out <- as_record_batch_reader(out)
}
out <- as_record_batch_reader(out)
}

# If arrange() created $temp_columns, make sure to omit them from the result
Expand All @@ -261,11 +245,7 @@ ExecPlan <- R6Class("ExecPlan",
if (length(node$extras$sort$temp_columns) > 0) {
tab <- as_arrow_table(out)
tab <- tab[, setdiff(names(tab), node$extras$sort$temp_columns), drop = FALSE]
if (!as_table) {
out <- as_record_batch_reader(tab)
} else {
out <- tab
}
out <- as_record_batch_reader(tab)
}

out
Expand All @@ -279,40 +259,9 @@ ExecPlan <- R6Class("ExecPlan",
...
)
},
# SinkNodes (involved in arrange and/or head/tail operations) are created in
# ExecPlan_run and are not captured by the regulat print method. We take a
# similar approach to expose them before calling the print method.
BuildAndShow = function(node) {
# a section of this code is copied from `Run()` - the 2 need to be in sync
# Start of chunk copied from `Run()`

assert_is(node, "ExecNode")

# Sorting and head/tail (if sorted) are handled in the SinkNode,
# created in ExecPlan_run
sorting <- node$extras$sort %||% list()
select_k <- node$extras$head %||% -1L
has_sorting <- length(sorting) > 0
if (has_sorting) {
if (!is.null(node$extras$tail)) {
# Reverse the sort order and take the top K, then after we'll reverse
# the resulting rows so that it is ordered as expected
sorting$orders <- !sorting$orders
select_k <- node$extras$tail
}
sorting$orders <- as.integer(sorting$orders)
}

# End of chunk copied from `Run()`

ExecPlan_BuildAndShow(
self,
node,
sorting,
select_k
)
},
Stop = function() ExecPlan_StopProducing(self)
ToString = function() {
ExecPlan_ToString(self)
}
)
)
# nolint end.
Expand Down Expand Up @@ -396,6 +345,23 @@ ExecNode <- R6Class("ExecNode",
)
)

ExecPlanReader <- R6Class("ExecPlanReader",
inherit = RecordBatchReader,
public = list(
batches = function() ExecPlanReader__batches(self),
read_table = function() Table__from_ExecPlanReader(self),
Plan = function() ExecPlanReader__Plan(self),
PlanStatus = function() ExecPlanReader__PlanStatus(self),
ToString = function() {
sprintf(
"<Status: %s>\n\n%s\n\nSee $Plan() for details.",
self$PlanStatus(),
super$ToString()
)
}
)
)

do_exec_plan_substrait <- function(substrait_plan) {
if (is.string(substrait_plan)) {
substrait_plan <- substrait__internal__SubstraitFromJSON(substrait_plan)
Expand Down
1 change: 1 addition & 0 deletions r/R/record-batch-reader.R
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ RecordBatchReader <- R6Class("RecordBatchReader",
read_next_batch = function() RecordBatchReader__ReadNext(self),
batches = function() RecordBatchReader__batches(self),
read_table = function() Table__from_RecordBatchReader(self),
Close = function() RecordBatchReader__Close(self),
export_to_c = function(stream_ptr) ExportRecordBatchReader(self, stream_ptr),
ToString = function() self$schema$ToString()
),
Expand Down
12 changes: 1 addition & 11 deletions r/R/table.R
Original file line number Diff line number Diff line change
Expand Up @@ -328,15 +328,5 @@ as_arrow_table.RecordBatchReader <- function(x, ...) {
#' @rdname as_arrow_table
#' @export
as_arrow_table.arrow_dplyr_query <- function(x, ...) {
# See query-engine.R for ExecPlan/Nodes
plan <- ExecPlan$create()
final_node <- plan$Build(x)

run_with_event_loop <- identical(
Sys.getenv("R_ARROW_COLLECT_WITH_UDF", ""),
"true"
)

result <- plan$Run(final_node, as_table = run_with_event_loop)
as_arrow_table(result)
as_arrow_table(as_record_batch_reader(x))
}
78 changes: 49 additions & 29 deletions r/src/arrowExports.cpp

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions r/src/arrow_types.h
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,8 @@ class ExecNode;
} // namespace compute
} // namespace arrow

class ExecPlanReader;

#if defined(ARROW_R_WITH_PARQUET)
#include <parquet/type_fwd.h>
#endif
Expand Down
Loading

0 comments on commit 5988363

Please sign in to comment.