library(sparklyr)
library(dplyr)
<- spark_connect(method = "databricks_connect")
sc #> ! Changing host URL to: https://rstudio-partner-posit-default.cloud.databricks.com
#> ℹ Retrieving info for cluster:'1026-175310-7cpsh3g8'
#> ✔ Cluster: '1026-175310-7cpsh3g8' | DBR: '14.1' [432ms]
#>
#> ℹ Attempting to load 'r-sparklyr-databricks-14.1'
#> ✔ Python environment: 'r-sparklyr-databricks-14.1' [991ms]
#>
#> ℹ Connecting to '14.1 cluster'
#> ✔ Connected to: '14.1 cluster' [6ms]
#>
6 Intro to R UDFs
Catch up
6.1 Simple operations
Trying out very simple operation to become familiar with the process
- Use
copy_to()
to sendmtcars
to the cluster. Load it to a variable calledtbl_mtcars
<- copy_to(sc, mtcars) tbl_mtcars
- Pipe
tbl_mtcars
tospark_apply()
. Usenrow
as the function to run
|>
tbl_mtcars spark_apply(nrow)
#> Some features are not enabled in this build of Arrow. Run `arrow_info()` for more information.
#> The repository you retrieved Arrow from did not include all of Arrow's features.
#> You can install a fully-featured version by running:
#> `install.packages('arrow', repos = 'https://apache.r-universe.dev')`.
#>
#> Attaching package: 'arrow'
#> The following object is masked from 'package:lubridate':
#>
#> duration
#> The following object is masked from 'package:utils':
#>
#> timestamp
#> To increase performance, use the following schema:
#> columns = "x long"
#> # Source: table<`sparklyr_tmp_table_867dadbd_1724_452e_a30b_0aea96ec858f`> [4 x 1]
#> # Database: spark_connection
#> x
#> <dbl>
#> 1 8
#> 2 8
#> 3 8
#> 4 8
- Switch the function to use in
spark_apply()
todim
. Notice how it returns more rows, because coercing the size 2 vector creates a 2 row data frame
|>
tbl_mtcars spark_apply(dim)
#> To increase performance, use the following schema:
#> columns = "x long"
#> # Source: table<`sparklyr_tmp_table_816793c9_b4cc_420b_a8a8_d2c20a5b07e5`> [8 x 1]
#> # Database: spark_connection
#> x
#> <dbl>
#> 1 8
#> 2 11
#> 3 8
#> 4 11
#> 5 8
#> 6 11
#> 7 8
#> 8 11
6.2 Group by variable
Write and run simple grouping commands
- Go back to using
nrow
again forspark_apply()
. Remember to passcolumns = "x long"
|>
tbl_mtcars spark_apply(nrow, columns = "x long")
#> # Source: table<`sparklyr_tmp_table_067200fb_e916_4f85_888b_cb302e55e1d9`> [4 x 1]
#> # Database: spark_connection
#> x
#> <dbl>
#> 1 8
#> 2 8
#> 3 8
#> 4 8
- Add the
group_by
argument, with the value of"am"
. There should be an error. This is because there are 2 variables in the result, instead of one, and we definedx
only incolumns
|>
tbl_mtcars spark_apply(nrow, group_by = "am", columns = "x long")
- Insert
am long,
at the beginning ofcolumns
|>
tbl_mtcars spark_apply(nrow, group_by = "am", columns = "am long, x long")
#> # Source: table<`sparklyr_tmp_table_4321bb30_d09b_4b23_8f37_6dadd518ce53`> [2 x 2]
#> # Database: spark_connection
#> am x
#> <dbl> <dbl>
#> 1 0 19
#> 2 1 13
- To see how the name we pass does not have to match the variable name, change
am
tonotam
incolumns
|>
tbl_mtcars spark_apply(nrow, group_by = "am", columns = "notam long, x long")
#> # Source: table<`sparklyr_tmp_table_48a8b521_0011_4622_99b8_94b701463acf`> [2 x 2]
#> # Database: spark_connection
#> notam x
#> <dbl> <dbl>
#> 1 0 19
#> 2 1 13
- Change the grouping variable to “cyl”, make sure to update that in the
columns
argument as well
|>
tbl_mtcars spark_apply(nrow, group_by = "cyl", columns = "cyl long, x long")
#> # Source: table<`sparklyr_tmp_table_7b98282a_0246_4786_b252_4b0cfa037313`> [3 x 2]
#> # Database: spark_connection
#> cyl x
#> <dbl> <dbl>
#> 1 4 11
#> 2 6 7
#> 3 8 14
6.3 Custom functions
Create simple custom functions to send to Spark
- In
spark_apply()
, passfunction(x) x
as the function. This will return the entiremtcars
data set
|>
tbl_mtcars spark_apply(function(x) x)
#> To increase performance, use the following schema:
#> columns = "mpg double, cyl double, disp double, hp double, drat double, wt
#> double, qsec double, vs double, am double, gear double, carb double"
#> # Source: table<`sparklyr_tmp_table_0f95d585_c5a5_4b6a_b4dd_afeacd7812c5`> [?? x 11]
#> # Database: spark_connection
#> mpg cyl disp hp drat wt qsec vs am gear carb
#> <dbl> <dbl> <dbl> <dbl> <dbl> <dbl> <dbl> <dbl> <dbl> <dbl> <dbl>
#> 1 21 6 160 110 3.9 2.62 16.5 0 1 4 4
#> 2 21 6 160 110 3.9 2.88 17.0 0 1 4 4
#> 3 22.8 4 108 93 3.85 2.32 18.6 1 1 4 1
#> 4 21.4 6 258 110 3.08 3.22 19.4 1 0 3 1
#> 5 18.7 8 360 175 3.15 3.44 17.0 0 0 3 2
#> 6 18.1 6 225 105 2.76 3.46 20.2 1 0 3 1
#> 7 14.3 8 360 245 3.21 3.57 15.8 0 0 3 4
#> 8 24.4 4 147. 62 3.69 3.19 20 1 0 4 2
#> 9 22.8 4 141. 95 3.92 3.15 22.9 1 0 4 2
#> 10 19.2 6 168. 123 3.92 3.44 18.3 1 0 4 4
#> # ℹ more rows
- Modify the function to return only the “mpg”, “cyl”, and “disp” variables
|>
tbl_mtcars spark_apply(function(x) x[, c("mpg", "cyl", "disp")])
#> To increase performance, use the following schema:
#> columns = "mpg double, cyl double, disp double"
#> # Source: table<`sparklyr_tmp_table_6d668022_b215_4b66_a904_b8df93f0814c`> [?? x 3]
#> # Database: spark_connection
#> mpg cyl disp
#> <dbl> <dbl> <dbl>
#> 1 21 6 160
#> 2 21 6 160
#> 3 22.8 4 108
#> 4 21.4 6 258
#> 5 18.7 8 360
#> 6 18.1 6 225
#> 7 14.3 8 360
#> 8 24.4 4 147.
#> 9 22.8 4 141.
#> 10 19.2 6 168.
#> # ℹ more rows
- Add the recommended
columns
spec from
|>
tbl_mtcars spark_apply(
function(x) x[, c("mpg", "cyl", "disp")],
columns = "mpg double, cyl double, disp double"
)#> # Source: table<`sparklyr_tmp_table_384a32ad_fcb8_468f_886b_142130ee6656`> [?? x 3]
#> # Database: spark_connection
#> mpg cyl disp
#> <dbl> <dbl> <dbl>
#> 1 21 6 160
#> 2 21 6 160
#> 3 22.8 4 108
#> 4 21.4 6 258
#> 5 18.7 8 360
#> 6 18.1 6 225
#> 7 14.3 8 360
#> 8 24.4 4 147.
#> 9 22.8 4 141.
#> 10 19.2 6 168.
#> # ℹ more rows
- Make your custom function into a ‘multi-line’ function
|>
tbl_mtcars spark_apply(
function(x){
c("mpg", "cyl", "disp")]
x[,
}, columns = "mpg double, cyl double, disp double"
)#> # Source: table<`sparklyr_tmp_table_9ad6389e_7f88_455d_bb44_26373948e69a`> [?? x 3]
#> # Database: spark_connection
#> mpg cyl disp
#> <dbl> <dbl> <dbl>
#> 1 21 6 160
#> 2 21 6 160
#> 3 22.8 4 108
#> 4 21.4 6 258
#> 5 18.7 8 360
#> 6 18.1 6 225
#> 7 14.3 8 360
#> 8 24.4 4 147.
#> 9 22.8 4 141.
#> 10 19.2 6 168.
#> # ℹ more rows
- Assign the data selection step to a variable called
out
, and then use it as the output of the function
|>
tbl_mtcars spark_apply(
function(x){
<- x[, c("mpg", "cyl", "disp")]
out
out
}, columns = "mpg double, cyl double, disp double"
)#> # Source: table<`sparklyr_tmp_table_729c736f_c544_4c37_81b4_3067a08c321a`> [?? x 3]
#> # Database: spark_connection
#> mpg cyl disp
#> <dbl> <dbl> <dbl>
#> 1 21 6 160
#> 2 21 6 160
#> 3 22.8 4 108
#> 4 21.4 6 258
#> 5 18.7 8 360
#> 6 18.1 6 225
#> 7 14.3 8 360
#> 8 24.4 4 147.
#> 9 22.8 4 141.
#> 10 19.2 6 168.
#> # ℹ more rows
- Add a filter step that returns the highest “mpg”. Notice that instead of 1 record, it returns several. That is because the filter is being processed per partition.
|>
tbl_mtcars spark_apply(
function(x){
<- x[, c("mpg", "cyl", "disp")]
out <- out[out$mpg == max(out$mpg), ]
out
out
}, columns = "mpg double, cyl double, disp double"
)#> # Source: table<`sparklyr_tmp_table_bfa02dd9_be66_4637_bf95_a699570246f3`> [4 x 3]
#> # Database: spark_connection
#> mpg cyl disp
#> <dbl> <dbl> <dbl>
#> 1 24.4 4 147.
#> 2 22.8 4 141.
#> 3 33.9 4 71.1
#> 4 30.4 4 95.1
- Change the filter to display any records with an “mpg” over 25
|>
tbl_mtcars spark_apply(
function(x){
<- x[, c("mpg", "cyl", "disp")]
out <- out[out$mpg > 25, ]
out
out
}, columns = "mpg double, cyl double, disp double"
)#> # Source: table<`sparklyr_tmp_table_ec9b86e6_1803_4e7a_90d8_fe9fb7910e9f`> [6 x 3]
#> # Database: spark_connection
#> mpg cyl disp
#> <dbl> <dbl> <dbl>
#> 1 32.4 4 78.7
#> 2 30.4 4 75.7
#> 3 33.9 4 71.1
#> 4 27.3 4 79
#> 5 26 4 120.
#> 6 30.4 4 95.1
- Insert a step that modifies
cyl
. It should make it add 1 to the value.
|>
tbl_mtcars spark_apply(
function(x){
<- x[, c("mpg", "cyl", "disp")]
out $cyl <- out$cyl + 1
out<- out[out$mpg > 25, ]
out
out
}, columns = "mpg double, cyl double, disp double"
)#> # Source: table<`sparklyr_tmp_table_7121314e_2bb9_4de1_ba18_6926a6c9cb6a`> [6 x 3]
#> # Database: spark_connection
#> mpg cyl disp
#> <dbl> <dbl> <dbl>
#> 1 32.4 5 78.7
#> 2 30.4 5 75.7
#> 3 33.9 5 71.1
#> 4 27.3 5 79
#> 5 26 5 120.
#> 6 30.4 5 95.1
6.4 R packages
Simple example that uses an R package
- Load the
broom
package into your R session
library(broom)
- Create a function that, creates an
lm
model against the one, and only, argument passed to the function. Then usetidy()
to return the results of the model as a data frame. Thelm()
call should assume that the data will always have the same columns asmtcars
, and it will create a linear model of the “mpg” against all the other variables. Name itmodel_function
<- function(x) {
model_function <- lm(mpg ~ ., x)
model tidy(model)
}
- Test
model_function
by passingmtcars
to it
model_function(mtcars)
#> # A tibble: 11 × 5
#> term estimate std.error statistic p.value
#> <chr> <dbl> <dbl> <dbl> <dbl>
#> 1 (Intercept) 12.3 18.7 0.657 0.518
#> 2 cyl -0.111 1.05 -0.107 0.916
#> 3 disp 0.0133 0.0179 0.747 0.463
#> 4 hp -0.0215 0.0218 -0.987 0.335
#> 5 drat 0.787 1.64 0.481 0.635
#> 6 wt -3.72 1.89 -1.96 0.0633
#> 7 qsec 0.821 0.731 1.12 0.274
#> 8 vs 0.318 2.10 0.151 0.881
#> 9 am 2.52 2.06 1.23 0.234
#> 10 gear 0.655 1.49 0.439 0.665
#> 11 carb -0.199 0.829 -0.241 0.812
- Pass
model_function
tospark_apply()
, againsttbl_mtcars
. The call should fail, becausebroom
is not explicitly referred to inmodel_function
|>
tbl_mtcars spark_apply(model_function)
- Modify
model_function
by either adding alibrary()
call, or using::
to explicitly refer tobroom
inside it
<- function(x) {
model_function <- lm(mpg ~ ., x)
model ::tidy(model)
broom }
- Test
model_function
again againsttbl_mtcars
|>
tbl_mtcars spark_apply(model_function)
#> To increase performance, use the following schema:
#> columns = "term string, estimate double, std_error double, statistic double,
#> p_value double"
#> # Source: table<`sparklyr_tmp_table_7147ff59_4759_4f01_a920_3184b005d1f3`> [?? x 5]
#> # Database: spark_connection
#> term estimate std_error statistic p_value
#> <chr> <dbl> <dbl> <dbl> <dbl>
#> 1 (Intercept) -18.6 NaN NaN NaN
#> 2 cyl 0.517 NaN NaN NaN
#> 3 disp 0.0356 NaN NaN NaN
#> 4 hp -0.0579 NaN NaN NaN
#> 5 drat 7.98 NaN NaN NaN
#> 6 wt -1.24 NaN NaN NaN
#> 7 qsec 0.565 NaN NaN NaN
#> 8 vs 2.51 NaN NaN NaN
#> 9 am NaN NaN NaN NaN
#> 10 gear NaN NaN NaN NaN
#> # ℹ more rows
- Add a
group_by
argument, use “am” as the grouping variable
|>
tbl_mtcars spark_apply(model_function, group_by = "am")
#> To increase performance, use the following schema:
#> columns = "am double, term string, estimate double, std_error double, statistic
#> double, p_value double"
#> # Source: table<`sparklyr_tmp_table_b2796b5a_7ab9_4279_b1be_54b3268fb568`> [?? x 6]
#> # Database: spark_connection
#> am term estimate std_error statistic p_value
#> <dbl> <chr> <dbl> <dbl> <dbl> <dbl>
#> 1 0 (Intercept) 8.64 21.5 0.402 0.697
#> 2 0 cyl -0.534 1.13 -0.474 0.647
#> 3 0 disp -0.0203 0.0174 -1.16 0.275
#> 4 0 hp 0.0622 0.0461 1.35 0.210
#> 5 0 drat 0.592 3.01 0.196 0.849
#> 6 0 wt 1.95 2.23 0.876 0.404
#> 7 0 qsec -0.884 0.758 -1.17 0.274
#> 8 0 vs 0.739 2.51 0.294 0.775
#> 9 0 am NaN NaN NaN NaN
#> 10 0 gear 8.65 3.90 2.22 0.0534
#> # ℹ more rows