library(sparklyr)
library(dplyr)
sc <- spark_connect(method = "databricks_connect")
#> ! Changing host URL to: https://rstudio-partner-posit-default.cloud.databricks.com
#> ℹ Retrieving info for cluster:'1026-175310-7cpsh3g8'
#> ✔ Cluster: '1026-175310-7cpsh3g8' | DBR: '14.1' [432ms]
#> 
#> ℹ Attempting to load 'r-sparklyr-databricks-14.1'
#> ✔ Python environment: 'r-sparklyr-databricks-14.1' [991ms]
#> 
#> ℹ Connecting to '14.1 cluster'
#> ✔ Connected to: '14.1 cluster' [6ms]
#> 6 Intro to R UDFs
Catch up
6.1 Simple operations
Trying out very simple operation to become familiar with the process
- Use copy_to()to sendmtcarsto the cluster. Load it to a variable calledtbl_mtcars
tbl_mtcars <- copy_to(sc, mtcars)- Pipe tbl_mtcarstospark_apply(). Usenrowas the function to run
tbl_mtcars |> 
  spark_apply(nrow)
#> Some features are not enabled in this build of Arrow. Run `arrow_info()` for more information.
#> The repository you retrieved Arrow from did not include all of Arrow's features.
#> You can install a fully-featured version by running:
#> `install.packages('arrow', repos = 'https://apache.r-universe.dev')`.
#> 
#> Attaching package: 'arrow'
#> The following object is masked from 'package:lubridate':
#> 
#>     duration
#> The following object is masked from 'package:utils':
#> 
#>     timestamp
#> To increase performance, use the following schema:
#> columns = "x long"
#> # Source:   table<`sparklyr_tmp_table_867dadbd_1724_452e_a30b_0aea96ec858f`> [4 x 1]
#> # Database: spark_connection
#>       x
#>   <dbl>
#> 1     8
#> 2     8
#> 3     8
#> 4     8- Switch the function to use in spark_apply()todim. Notice how it returns more rows, because coercing the size 2 vector creates a 2 row data frame
tbl_mtcars |> 
  spark_apply(dim)
#> To increase performance, use the following schema:
#> columns = "x long"
#> # Source:   table<`sparklyr_tmp_table_816793c9_b4cc_420b_a8a8_d2c20a5b07e5`> [8 x 1]
#> # Database: spark_connection
#>       x
#>   <dbl>
#> 1     8
#> 2    11
#> 3     8
#> 4    11
#> 5     8
#> 6    11
#> 7     8
#> 8    116.2 Group by variable
Write and run simple grouping commands
- Go back to using nrowagain forspark_apply(). Remember to passcolumns = "x long"
tbl_mtcars |> 
  spark_apply(nrow, columns = "x long")
#> # Source:   table<`sparklyr_tmp_table_067200fb_e916_4f85_888b_cb302e55e1d9`> [4 x 1]
#> # Database: spark_connection
#>       x
#>   <dbl>
#> 1     8
#> 2     8
#> 3     8
#> 4     8- Add the group_byargument, with the value of"am". There should be an error. This is because there are 2 variables in the result, instead of one, and we definedxonly incolumns
tbl_mtcars |> 
  spark_apply(nrow, group_by = "am", columns = "x long")- Insert am long,at the beginning ofcolumns
tbl_mtcars |> 
  spark_apply(nrow, group_by = "am", columns = "am long, x long")
#> # Source:   table<`sparklyr_tmp_table_4321bb30_d09b_4b23_8f37_6dadd518ce53`> [2 x 2]
#> # Database: spark_connection
#>      am     x
#>   <dbl> <dbl>
#> 1     0    19
#> 2     1    13- To see how the name we pass does not have to match the variable name, change amtonotamincolumns
tbl_mtcars |> 
  spark_apply(nrow, group_by = "am", columns = "notam long, x long")
#> # Source:   table<`sparklyr_tmp_table_48a8b521_0011_4622_99b8_94b701463acf`> [2 x 2]
#> # Database: spark_connection
#>   notam     x
#>   <dbl> <dbl>
#> 1     0    19
#> 2     1    13- Change the grouping variable to “cyl”, make sure to update that in the columnsargument as well
tbl_mtcars |> 
  spark_apply(nrow, group_by = "cyl", columns = "cyl long, x long")
#> # Source:   table<`sparklyr_tmp_table_7b98282a_0246_4786_b252_4b0cfa037313`> [3 x 2]
#> # Database: spark_connection
#>     cyl     x
#>   <dbl> <dbl>
#> 1     4    11
#> 2     6     7
#> 3     8    146.3 Custom functions
Create simple custom functions to send to Spark
- In spark_apply(), passfunction(x) xas the function. This will return the entiremtcarsdata set
tbl_mtcars |> 
  spark_apply(function(x) x)
#> To increase performance, use the following schema:
#> columns = "mpg double, cyl double, disp double, hp double, drat double, wt
#> double, qsec double, vs double, am double, gear double, carb double"
#> # Source:   table<`sparklyr_tmp_table_0f95d585_c5a5_4b6a_b4dd_afeacd7812c5`> [?? x 11]
#> # Database: spark_connection
#>      mpg   cyl  disp    hp  drat    wt  qsec    vs    am  gear  carb
#>    <dbl> <dbl> <dbl> <dbl> <dbl> <dbl> <dbl> <dbl> <dbl> <dbl> <dbl>
#>  1  21       6  160    110  3.9   2.62  16.5     0     1     4     4
#>  2  21       6  160    110  3.9   2.88  17.0     0     1     4     4
#>  3  22.8     4  108     93  3.85  2.32  18.6     1     1     4     1
#>  4  21.4     6  258    110  3.08  3.22  19.4     1     0     3     1
#>  5  18.7     8  360    175  3.15  3.44  17.0     0     0     3     2
#>  6  18.1     6  225    105  2.76  3.46  20.2     1     0     3     1
#>  7  14.3     8  360    245  3.21  3.57  15.8     0     0     3     4
#>  8  24.4     4  147.    62  3.69  3.19  20       1     0     4     2
#>  9  22.8     4  141.    95  3.92  3.15  22.9     1     0     4     2
#> 10  19.2     6  168.   123  3.92  3.44  18.3     1     0     4     4
#> # ℹ more rows- Modify the function to return only the “mpg”, “cyl”, and “disp” variables
tbl_mtcars |> 
  spark_apply(function(x) x[, c("mpg", "cyl", "disp")])
#> To increase performance, use the following schema:
#> columns = "mpg double, cyl double, disp double"
#> # Source:   table<`sparklyr_tmp_table_6d668022_b215_4b66_a904_b8df93f0814c`> [?? x 3]
#> # Database: spark_connection
#>      mpg   cyl  disp
#>    <dbl> <dbl> <dbl>
#>  1  21       6  160 
#>  2  21       6  160 
#>  3  22.8     4  108 
#>  4  21.4     6  258 
#>  5  18.7     8  360 
#>  6  18.1     6  225 
#>  7  14.3     8  360 
#>  8  24.4     4  147.
#>  9  22.8     4  141.
#> 10  19.2     6  168.
#> # ℹ more rows- Add the recommended columnsspec from
tbl_mtcars |> 
  spark_apply(
    function(x) x[, c("mpg", "cyl", "disp")], 
    columns = "mpg double, cyl double, disp double"
    )
#> # Source:   table<`sparklyr_tmp_table_384a32ad_fcb8_468f_886b_142130ee6656`> [?? x 3]
#> # Database: spark_connection
#>      mpg   cyl  disp
#>    <dbl> <dbl> <dbl>
#>  1  21       6  160 
#>  2  21       6  160 
#>  3  22.8     4  108 
#>  4  21.4     6  258 
#>  5  18.7     8  360 
#>  6  18.1     6  225 
#>  7  14.3     8  360 
#>  8  24.4     4  147.
#>  9  22.8     4  141.
#> 10  19.2     6  168.
#> # ℹ more rows- Make your custom function into a ‘multi-line’ function
tbl_mtcars |> 
  spark_apply(
    function(x){ 
      x[, c("mpg", "cyl", "disp")]
      }, 
    columns = "mpg double, cyl double, disp double"
    )
#> # Source:   table<`sparklyr_tmp_table_9ad6389e_7f88_455d_bb44_26373948e69a`> [?? x 3]
#> # Database: spark_connection
#>      mpg   cyl  disp
#>    <dbl> <dbl> <dbl>
#>  1  21       6  160 
#>  2  21       6  160 
#>  3  22.8     4  108 
#>  4  21.4     6  258 
#>  5  18.7     8  360 
#>  6  18.1     6  225 
#>  7  14.3     8  360 
#>  8  24.4     4  147.
#>  9  22.8     4  141.
#> 10  19.2     6  168.
#> # ℹ more rows- Assign the data selection step to a variable called out, and then use it as the output of the function
tbl_mtcars |> 
  spark_apply(
    function(x){ 
      out <- x[, c("mpg", "cyl", "disp")]
      out
      }, 
    columns = "mpg double, cyl double, disp double"
    )
#> # Source:   table<`sparklyr_tmp_table_729c736f_c544_4c37_81b4_3067a08c321a`> [?? x 3]
#> # Database: spark_connection
#>      mpg   cyl  disp
#>    <dbl> <dbl> <dbl>
#>  1  21       6  160 
#>  2  21       6  160 
#>  3  22.8     4  108 
#>  4  21.4     6  258 
#>  5  18.7     8  360 
#>  6  18.1     6  225 
#>  7  14.3     8  360 
#>  8  24.4     4  147.
#>  9  22.8     4  141.
#> 10  19.2     6  168.
#> # ℹ more rows- Add a filter step that returns the highest “mpg”. Notice that instead of 1 record, it returns several. That is because the filter is being processed per partition.
tbl_mtcars |> 
  spark_apply(
    function(x){ 
      out <- x[, c("mpg", "cyl", "disp")]
      out <- out[out$mpg == max(out$mpg), ]
      out
      }, 
    columns = "mpg double, cyl double, disp double"
    )
#> # Source:   table<`sparklyr_tmp_table_bfa02dd9_be66_4637_bf95_a699570246f3`> [4 x 3]
#> # Database: spark_connection
#>     mpg   cyl  disp
#>   <dbl> <dbl> <dbl>
#> 1  24.4     4 147. 
#> 2  22.8     4 141. 
#> 3  33.9     4  71.1
#> 4  30.4     4  95.1- Change the filter to display any records with an “mpg” over 25
tbl_mtcars |> 
  spark_apply(
    function(x){ 
      out <- x[, c("mpg", "cyl", "disp")]
      out <- out[out$mpg > 25, ]
      out
      }, 
    columns = "mpg double, cyl double, disp double"
    )
#> # Source:   table<`sparklyr_tmp_table_ec9b86e6_1803_4e7a_90d8_fe9fb7910e9f`> [6 x 3]
#> # Database: spark_connection
#>     mpg   cyl  disp
#>   <dbl> <dbl> <dbl>
#> 1  32.4     4  78.7
#> 2  30.4     4  75.7
#> 3  33.9     4  71.1
#> 4  27.3     4  79  
#> 5  26       4 120. 
#> 6  30.4     4  95.1- Insert a step that modifies cyl. It should make it add 1 to the value.
tbl_mtcars |> 
  spark_apply(
    function(x){ 
      out <- x[, c("mpg", "cyl", "disp")]
      out$cyl <- out$cyl + 1
      out <- out[out$mpg > 25, ]
      out
      }, 
    columns = "mpg double, cyl double, disp double"
    )
#> # Source:   table<`sparklyr_tmp_table_7121314e_2bb9_4de1_ba18_6926a6c9cb6a`> [6 x 3]
#> # Database: spark_connection
#>     mpg   cyl  disp
#>   <dbl> <dbl> <dbl>
#> 1  32.4     5  78.7
#> 2  30.4     5  75.7
#> 3  33.9     5  71.1
#> 4  27.3     5  79  
#> 5  26       5 120. 
#> 6  30.4     5  95.16.4 R packages
Simple example that uses an R package
- Load the broompackage into your R session
library(broom)- Create a function that, creates an lmmodel against the one, and only, argument passed to the function. Then usetidy()to return the results of the model as a data frame. Thelm()call should assume that the data will always have the same columns asmtcars, and it will create a linear model of the “mpg” against all the other variables. Name itmodel_function
model_function <- function(x) {
  model <- lm(mpg ~ ., x)
  tidy(model)
}- Test model_functionby passingmtcarsto it
model_function(mtcars)
#> # A tibble: 11 × 5
#>    term        estimate std.error statistic p.value
#>    <chr>          <dbl>     <dbl>     <dbl>   <dbl>
#>  1 (Intercept)  12.3      18.7        0.657  0.518 
#>  2 cyl          -0.111     1.05      -0.107  0.916 
#>  3 disp          0.0133    0.0179     0.747  0.463 
#>  4 hp           -0.0215    0.0218    -0.987  0.335 
#>  5 drat          0.787     1.64       0.481  0.635 
#>  6 wt           -3.72      1.89      -1.96   0.0633
#>  7 qsec          0.821     0.731      1.12   0.274 
#>  8 vs            0.318     2.10       0.151  0.881 
#>  9 am            2.52      2.06       1.23   0.234 
#> 10 gear          0.655     1.49       0.439  0.665 
#> 11 carb         -0.199     0.829     -0.241  0.812- Pass model_functiontospark_apply(), againsttbl_mtcars. The call should fail, becausebroomis not explicitly referred to inmodel_function
tbl_mtcars |> 
  spark_apply(model_function)- Modify model_functionby either adding alibrary()call, or using::to explicitly refer tobroominside it
model_function <- function(x) {
  model <- lm(mpg ~ ., x)
  broom::tidy(model)
}- Test model_functionagain againsttbl_mtcars
tbl_mtcars |> 
  spark_apply(model_function)
#> To increase performance, use the following schema:
#> columns = "term string, estimate double, std_error double, statistic double,
#> p_value double"
#> # Source:   table<`sparklyr_tmp_table_7147ff59_4759_4f01_a920_3184b005d1f3`> [?? x 5]
#> # Database: spark_connection
#>    term        estimate std_error statistic p_value
#>    <chr>          <dbl>     <dbl>     <dbl>   <dbl>
#>  1 (Intercept) -18.6          NaN       NaN     NaN
#>  2 cyl           0.517        NaN       NaN     NaN
#>  3 disp          0.0356       NaN       NaN     NaN
#>  4 hp           -0.0579       NaN       NaN     NaN
#>  5 drat          7.98         NaN       NaN     NaN
#>  6 wt           -1.24         NaN       NaN     NaN
#>  7 qsec          0.565        NaN       NaN     NaN
#>  8 vs            2.51         NaN       NaN     NaN
#>  9 am          NaN            NaN       NaN     NaN
#> 10 gear        NaN            NaN       NaN     NaN
#> # ℹ more rows- Add a group_byargument, use “am” as the grouping variable
tbl_mtcars |> 
  spark_apply(model_function, group_by = "am")
#> To increase performance, use the following schema:
#> columns = "am double, term string, estimate double, std_error double, statistic
#> double, p_value double"
#> # Source:   table<`sparklyr_tmp_table_b2796b5a_7ab9_4279_b1be_54b3268fb568`> [?? x 6]
#> # Database: spark_connection
#>       am term        estimate std_error statistic  p_value
#>    <dbl> <chr>          <dbl>     <dbl>     <dbl>    <dbl>
#>  1     0 (Intercept)   8.64     21.5        0.402   0.697 
#>  2     0 cyl          -0.534     1.13      -0.474   0.647 
#>  3     0 disp         -0.0203    0.0174    -1.16    0.275 
#>  4     0 hp            0.0622    0.0461     1.35    0.210 
#>  5     0 drat          0.592     3.01       0.196   0.849 
#>  6     0 wt            1.95      2.23       0.876   0.404 
#>  7     0 qsec         -0.884     0.758     -1.17    0.274 
#>  8     0 vs            0.739     2.51       0.294   0.775 
#>  9     0 am          NaN       NaN        NaN     NaN     
#> 10     0 gear          8.65      3.90       2.22    0.0534
#> # ℹ more rows