6  Intro to R UDFs

Catch up

library(sparklyr)
library(dplyr)
sc <- spark_connect(method = "databricks_connect")
#> ! Changing host URL to: https://rstudio-partner-posit-default.cloud.databricks.com
#> ℹ Retrieving info for cluster:'1026-175310-7cpsh3g8'
#> ✔ Cluster: '1026-175310-7cpsh3g8' | DBR: '14.1' [432ms]
#> 
#> ℹ Attempting to load 'r-sparklyr-databricks-14.1'
#> ✔ Python environment: 'r-sparklyr-databricks-14.1' [991ms]
#> 
#> ℹ Connecting to '14.1 cluster'
#> ✔ Connected to: '14.1 cluster' [6ms]
#> 

6.1 Simple operations

Trying out very simple operation to become familiar with the process

  1. Use copy_to() to send mtcars to the cluster. Load it to a variable called tbl_mtcars
tbl_mtcars <- copy_to(sc, mtcars)
  1. Pipe tbl_mtcars to spark_apply(). Use nrow as the function to run
tbl_mtcars |> 
  spark_apply(nrow)
#> Some features are not enabled in this build of Arrow. Run `arrow_info()` for more information.
#> The repository you retrieved Arrow from did not include all of Arrow's features.
#> You can install a fully-featured version by running:
#> `install.packages('arrow', repos = 'https://apache.r-universe.dev')`.
#> 
#> Attaching package: 'arrow'
#> The following object is masked from 'package:lubridate':
#> 
#>     duration
#> The following object is masked from 'package:utils':
#> 
#>     timestamp
#> To increase performance, use the following schema:
#> columns = "x long"
#> # Source:   table<`sparklyr_tmp_table_867dadbd_1724_452e_a30b_0aea96ec858f`> [4 x 1]
#> # Database: spark_connection
#>       x
#>   <dbl>
#> 1     8
#> 2     8
#> 3     8
#> 4     8
  1. Switch the function to use in spark_apply() to dim. Notice how it returns more rows, because coercing the size 2 vector creates a 2 row data frame
tbl_mtcars |> 
  spark_apply(dim)
#> To increase performance, use the following schema:
#> columns = "x long"
#> # Source:   table<`sparklyr_tmp_table_816793c9_b4cc_420b_a8a8_d2c20a5b07e5`> [8 x 1]
#> # Database: spark_connection
#>       x
#>   <dbl>
#> 1     8
#> 2    11
#> 3     8
#> 4    11
#> 5     8
#> 6    11
#> 7     8
#> 8    11

6.2 Group by variable

Write and run simple grouping commands

  1. Go back to using nrow again for spark_apply(). Remember to pass columns = "x long"
tbl_mtcars |> 
  spark_apply(nrow, columns = "x long")
#> # Source:   table<`sparklyr_tmp_table_067200fb_e916_4f85_888b_cb302e55e1d9`> [4 x 1]
#> # Database: spark_connection
#>       x
#>   <dbl>
#> 1     8
#> 2     8
#> 3     8
#> 4     8
  1. Add the group_by argument, with the value of "am". There should be an error. This is because there are 2 variables in the result, instead of one, and we defined x only in columns
tbl_mtcars |> 
  spark_apply(nrow, group_by = "am", columns = "x long")
  1. Insert am long, at the beginning of columns
tbl_mtcars |> 
  spark_apply(nrow, group_by = "am", columns = "am long, x long")
#> # Source:   table<`sparklyr_tmp_table_4321bb30_d09b_4b23_8f37_6dadd518ce53`> [2 x 2]
#> # Database: spark_connection
#>      am     x
#>   <dbl> <dbl>
#> 1     0    19
#> 2     1    13
  1. To see how the name we pass does not have to match the variable name, change am to notam in columns
tbl_mtcars |> 
  spark_apply(nrow, group_by = "am", columns = "notam long, x long")
#> # Source:   table<`sparklyr_tmp_table_48a8b521_0011_4622_99b8_94b701463acf`> [2 x 2]
#> # Database: spark_connection
#>   notam     x
#>   <dbl> <dbl>
#> 1     0    19
#> 2     1    13
  1. Change the grouping variable to “cyl”, make sure to update that in the columns argument as well
tbl_mtcars |> 
  spark_apply(nrow, group_by = "cyl", columns = "cyl long, x long")
#> # Source:   table<`sparklyr_tmp_table_7b98282a_0246_4786_b252_4b0cfa037313`> [3 x 2]
#> # Database: spark_connection
#>     cyl     x
#>   <dbl> <dbl>
#> 1     4    11
#> 2     6     7
#> 3     8    14

6.3 Custom functions

Create simple custom functions to send to Spark

  1. In spark_apply(), pass function(x) x as the function. This will return the entire mtcars data set
tbl_mtcars |> 
  spark_apply(function(x) x)
#> To increase performance, use the following schema:
#> columns = "mpg double, cyl double, disp double, hp double, drat double, wt
#> double, qsec double, vs double, am double, gear double, carb double"
#> # Source:   table<`sparklyr_tmp_table_0f95d585_c5a5_4b6a_b4dd_afeacd7812c5`> [?? x 11]
#> # Database: spark_connection
#>      mpg   cyl  disp    hp  drat    wt  qsec    vs    am  gear  carb
#>    <dbl> <dbl> <dbl> <dbl> <dbl> <dbl> <dbl> <dbl> <dbl> <dbl> <dbl>
#>  1  21       6  160    110  3.9   2.62  16.5     0     1     4     4
#>  2  21       6  160    110  3.9   2.88  17.0     0     1     4     4
#>  3  22.8     4  108     93  3.85  2.32  18.6     1     1     4     1
#>  4  21.4     6  258    110  3.08  3.22  19.4     1     0     3     1
#>  5  18.7     8  360    175  3.15  3.44  17.0     0     0     3     2
#>  6  18.1     6  225    105  2.76  3.46  20.2     1     0     3     1
#>  7  14.3     8  360    245  3.21  3.57  15.8     0     0     3     4
#>  8  24.4     4  147.    62  3.69  3.19  20       1     0     4     2
#>  9  22.8     4  141.    95  3.92  3.15  22.9     1     0     4     2
#> 10  19.2     6  168.   123  3.92  3.44  18.3     1     0     4     4
#> # ℹ more rows
  1. Modify the function to return only the “mpg”, “cyl”, and “disp” variables
tbl_mtcars |> 
  spark_apply(function(x) x[, c("mpg", "cyl", "disp")])
#> To increase performance, use the following schema:
#> columns = "mpg double, cyl double, disp double"
#> # Source:   table<`sparklyr_tmp_table_6d668022_b215_4b66_a904_b8df93f0814c`> [?? x 3]
#> # Database: spark_connection
#>      mpg   cyl  disp
#>    <dbl> <dbl> <dbl>
#>  1  21       6  160 
#>  2  21       6  160 
#>  3  22.8     4  108 
#>  4  21.4     6  258 
#>  5  18.7     8  360 
#>  6  18.1     6  225 
#>  7  14.3     8  360 
#>  8  24.4     4  147.
#>  9  22.8     4  141.
#> 10  19.2     6  168.
#> # ℹ more rows
  1. Add the recommended columns spec from
tbl_mtcars |> 
  spark_apply(
    function(x) x[, c("mpg", "cyl", "disp")], 
    columns = "mpg double, cyl double, disp double"
    )
#> # Source:   table<`sparklyr_tmp_table_384a32ad_fcb8_468f_886b_142130ee6656`> [?? x 3]
#> # Database: spark_connection
#>      mpg   cyl  disp
#>    <dbl> <dbl> <dbl>
#>  1  21       6  160 
#>  2  21       6  160 
#>  3  22.8     4  108 
#>  4  21.4     6  258 
#>  5  18.7     8  360 
#>  6  18.1     6  225 
#>  7  14.3     8  360 
#>  8  24.4     4  147.
#>  9  22.8     4  141.
#> 10  19.2     6  168.
#> # ℹ more rows
  1. Make your custom function into a ‘multi-line’ function
tbl_mtcars |> 
  spark_apply(
    function(x){ 
      x[, c("mpg", "cyl", "disp")]
      }, 
    columns = "mpg double, cyl double, disp double"
    )
#> # Source:   table<`sparklyr_tmp_table_9ad6389e_7f88_455d_bb44_26373948e69a`> [?? x 3]
#> # Database: spark_connection
#>      mpg   cyl  disp
#>    <dbl> <dbl> <dbl>
#>  1  21       6  160 
#>  2  21       6  160 
#>  3  22.8     4  108 
#>  4  21.4     6  258 
#>  5  18.7     8  360 
#>  6  18.1     6  225 
#>  7  14.3     8  360 
#>  8  24.4     4  147.
#>  9  22.8     4  141.
#> 10  19.2     6  168.
#> # ℹ more rows
  1. Assign the data selection step to a variable called out, and then use it as the output of the function
tbl_mtcars |> 
  spark_apply(
    function(x){ 
      out <- x[, c("mpg", "cyl", "disp")]
      out
      }, 
    columns = "mpg double, cyl double, disp double"
    )
#> # Source:   table<`sparklyr_tmp_table_729c736f_c544_4c37_81b4_3067a08c321a`> [?? x 3]
#> # Database: spark_connection
#>      mpg   cyl  disp
#>    <dbl> <dbl> <dbl>
#>  1  21       6  160 
#>  2  21       6  160 
#>  3  22.8     4  108 
#>  4  21.4     6  258 
#>  5  18.7     8  360 
#>  6  18.1     6  225 
#>  7  14.3     8  360 
#>  8  24.4     4  147.
#>  9  22.8     4  141.
#> 10  19.2     6  168.
#> # ℹ more rows
  1. Add a filter step that returns the highest “mpg”. Notice that instead of 1 record, it returns several. That is because the filter is being processed per partition.
tbl_mtcars |> 
  spark_apply(
    function(x){ 
      out <- x[, c("mpg", "cyl", "disp")]
      out <- out[out$mpg == max(out$mpg), ]
      out
      }, 
    columns = "mpg double, cyl double, disp double"
    )
#> # Source:   table<`sparklyr_tmp_table_bfa02dd9_be66_4637_bf95_a699570246f3`> [4 x 3]
#> # Database: spark_connection
#>     mpg   cyl  disp
#>   <dbl> <dbl> <dbl>
#> 1  24.4     4 147. 
#> 2  22.8     4 141. 
#> 3  33.9     4  71.1
#> 4  30.4     4  95.1
  1. Change the filter to display any records with an “mpg” over 25
tbl_mtcars |> 
  spark_apply(
    function(x){ 
      out <- x[, c("mpg", "cyl", "disp")]
      out <- out[out$mpg > 25, ]
      out
      }, 
    columns = "mpg double, cyl double, disp double"
    )
#> # Source:   table<`sparklyr_tmp_table_ec9b86e6_1803_4e7a_90d8_fe9fb7910e9f`> [6 x 3]
#> # Database: spark_connection
#>     mpg   cyl  disp
#>   <dbl> <dbl> <dbl>
#> 1  32.4     4  78.7
#> 2  30.4     4  75.7
#> 3  33.9     4  71.1
#> 4  27.3     4  79  
#> 5  26       4 120. 
#> 6  30.4     4  95.1
  1. Insert a step that modifies cyl. It should make it add 1 to the value.
tbl_mtcars |> 
  spark_apply(
    function(x){ 
      out <- x[, c("mpg", "cyl", "disp")]
      out$cyl <- out$cyl + 1
      out <- out[out$mpg > 25, ]
      out
      }, 
    columns = "mpg double, cyl double, disp double"
    )
#> # Source:   table<`sparklyr_tmp_table_7121314e_2bb9_4de1_ba18_6926a6c9cb6a`> [6 x 3]
#> # Database: spark_connection
#>     mpg   cyl  disp
#>   <dbl> <dbl> <dbl>
#> 1  32.4     5  78.7
#> 2  30.4     5  75.7
#> 3  33.9     5  71.1
#> 4  27.3     5  79  
#> 5  26       5 120. 
#> 6  30.4     5  95.1

6.4 R packages

Simple example that uses an R package

  1. Load the broom package into your R session
library(broom)
  1. Create a function that, creates an lm model against the one, and only, argument passed to the function. Then use tidy() to return the results of the model as a data frame. The lm() call should assume that the data will always have the same columns as mtcars, and it will create a linear model of the “mpg” against all the other variables. Name it model_function
model_function <- function(x) {
  model <- lm(mpg ~ ., x)
  tidy(model)
}
  1. Test model_function by passing mtcars to it
model_function(mtcars)
#> # A tibble: 11 × 5
#>    term        estimate std.error statistic p.value
#>    <chr>          <dbl>     <dbl>     <dbl>   <dbl>
#>  1 (Intercept)  12.3      18.7        0.657  0.518 
#>  2 cyl          -0.111     1.05      -0.107  0.916 
#>  3 disp          0.0133    0.0179     0.747  0.463 
#>  4 hp           -0.0215    0.0218    -0.987  0.335 
#>  5 drat          0.787     1.64       0.481  0.635 
#>  6 wt           -3.72      1.89      -1.96   0.0633
#>  7 qsec          0.821     0.731      1.12   0.274 
#>  8 vs            0.318     2.10       0.151  0.881 
#>  9 am            2.52      2.06       1.23   0.234 
#> 10 gear          0.655     1.49       0.439  0.665 
#> 11 carb         -0.199     0.829     -0.241  0.812
  1. Pass model_function to spark_apply(), against tbl_mtcars. The call should fail, because broom is not explicitly referred to in model_function
tbl_mtcars |> 
  spark_apply(model_function)
  1. Modify model_function by either adding a library() call, or using :: to explicitly refer to broom inside it
model_function <- function(x) {
  model <- lm(mpg ~ ., x)
  broom::tidy(model)
}
  1. Test model_function again against tbl_mtcars
tbl_mtcars |> 
  spark_apply(model_function)
#> To increase performance, use the following schema:
#> columns = "term string, estimate double, std_error double, statistic double,
#> p_value double"
#> # Source:   table<`sparklyr_tmp_table_7147ff59_4759_4f01_a920_3184b005d1f3`> [?? x 5]
#> # Database: spark_connection
#>    term        estimate std_error statistic p_value
#>    <chr>          <dbl>     <dbl>     <dbl>   <dbl>
#>  1 (Intercept) -18.6          NaN       NaN     NaN
#>  2 cyl           0.517        NaN       NaN     NaN
#>  3 disp          0.0356       NaN       NaN     NaN
#>  4 hp           -0.0579       NaN       NaN     NaN
#>  5 drat          7.98         NaN       NaN     NaN
#>  6 wt           -1.24         NaN       NaN     NaN
#>  7 qsec          0.565        NaN       NaN     NaN
#>  8 vs            2.51         NaN       NaN     NaN
#>  9 am          NaN            NaN       NaN     NaN
#> 10 gear        NaN            NaN       NaN     NaN
#> # ℹ more rows
  1. Add a group_by argument, use “am” as the grouping variable
tbl_mtcars |> 
  spark_apply(model_function, group_by = "am")
#> To increase performance, use the following schema:
#> columns = "am double, term string, estimate double, std_error double, statistic
#> double, p_value double"
#> # Source:   table<`sparklyr_tmp_table_b2796b5a_7ab9_4279_b1be_54b3268fb568`> [?? x 6]
#> # Database: spark_connection
#>       am term        estimate std_error statistic  p_value
#>    <dbl> <chr>          <dbl>     <dbl>     <dbl>    <dbl>
#>  1     0 (Intercept)   8.64     21.5        0.402   0.697 
#>  2     0 cyl          -0.534     1.13      -0.474   0.647 
#>  3     0 disp         -0.0203    0.0174    -1.16    0.275 
#>  4     0 hp            0.0622    0.0461     1.35    0.210 
#>  5     0 drat          0.592     3.01       0.196   0.849 
#>  6     0 wt            1.95      2.23       0.876   0.404 
#>  7     0 qsec         -0.884     0.758     -1.17    0.274 
#>  8     0 vs            0.739     2.51       0.294   0.775 
#>  9     0 am          NaN       NaN        NaN     NaN     
#> 10     0 gear          8.65      3.90       2.22    0.0534
#> # ℹ more rows