Data Manipulation—Part 1

Goals

Avoiding these! But…don’t worry!

An Arrow Dataset

library(arrow)

nyc_taxi <- open_dataset("data/nyc-taxi/")
nyc_taxi
FileSystemDataset with 120 Parquet files
24 columns
vendor_name: string
pickup_datetime: timestamp[ms]
dropoff_datetime: timestamp[ms]
passenger_count: int64
trip_distance: double
pickup_longitude: double
pickup_latitude: double
rate_code: string
store_and_fwd: string
dropoff_longitude: double
dropoff_latitude: double
payment_type: string
fare_amount: double
extra: double
mta_tax: double
tip_amount: double
tolls_amount: double
total_amount: double
improvement_surcharge: double
congestion_surcharge: double
...
4 more columns
Use `schema()` to see entire schema

Arrow Datasets

Constructing queries

library(dplyr)

shared_rides <- nyc_taxi |>
  group_by(year) |>
  summarize(
    all_trips = n(),
    shared_trips = sum(passenger_count > 1, na.rm = TRUE)
  ) |>
  mutate(pct_shared = shared_trips / all_trips * 100) 

class(shared_rides)
[1] "arrow_dplyr_query"

arrow dplyr queries

  • query has been constructed but not evaluated
  • nothing has been pulled into memory

Running the query

  • collect() evaluates the query, in-memory output returns to R

collect()

collect(shared_rides)
# A tibble: 10 × 4
    year all_trips shared_trips pct_shared
   <int>     <int>        <int>      <dbl>
 1  2012 178544324     53313752       29.9
 2  2013 173179759     51215013       29.6
 3  2014 165114361     48816505       29.6
 4  2015 146112989     43081091       29.5
 5  2016 131165043     38163870       29.1
 6  2017 113495512     32296166       28.5
 7  2018 102797401     28796633       28.0
 8  2019  84393604     23515989       27.9
 9  2020  24647055      5837960       23.7
10  2021  30902618      7221844       23.4

Calling nrow() to see how much data

nyc_taxi |>
  filter(year %in% 2017:2021) |>
  nrow()
[1] 356236190

Your Turn

Use the function nrow() to work out the answers to these questions:

  1. How many taxi fares in the dataset had a total amount greater than $100?

➡️ Data Manipulation Part I Exercises Page

Previewing output for large queries

How much were fares in GBP (£)?

fares_pounds <- nyc_taxi |>
  mutate(
    fare_amount_pounds = fare_amount * 0.79
  )

How many rows?

fares_pounds |>
  nrow()
[1] 1150352666

Use head() and collect() to preview results

nyc_taxi |>
  filter(year == 2020) |>
  mutate(fare_pounds = fare_amount * 0.79) |>
  select(fare_amount, fare_pounds) |>
  head() |>
  collect()
# A tibble: 6 × 2
  fare_amount fare_pounds
        <dbl>       <dbl>
1        16.5       13.0 
2        21.5       17.0 
3         5          3.95
4        10.5        8.30
5        11          8.69
6         5.5        4.35

Use across() to transform data in multiple columns

nyc_taxi |>
  mutate(across(ends_with("amount"), list(pounds = ~.x * 0.79))) |>
  select(contains("amount")) |>
  head() |>
  collect()
# A tibble: 6 × 8
  fare_amount tip_amount tolls_amount total_amount fare_amount_pounds
        <dbl>      <dbl>        <dbl>        <dbl>              <dbl>
1        29.7       6.04            0        36.2               23.5 
2         9.3       0               0         9.8                7.35
3         4.1       1.38            0         5.98               3.24
4         4.5       1               0         6                  3.56
5         4.5       0               0         5.5                3.56
6         4.1       0               0         5.6                3.24
# ℹ 3 more variables: tip_amount_pounds <dbl>, tolls_amount_pounds <dbl>,
#   total_amount_pounds <dbl>

Summary

  • Use nrow() to work out how many rows of data your analyses will return
  • Use collect() to pull all of the data into your R session
  • Use head() and collect() to preview results
  • Use across() to manipulate data in multiple columns at once

dplyr verbs API in arrow - alternatives

Example - slice()

First three trips in the dataset in 2021 where distance > 100 miles

long_rides_2021 <- nyc_taxi |>
  filter(year == 2021 & trip_distance > 100) |>
  select(pickup_datetime, year, trip_distance)

long_rides_2021 |>
  slice(1:3)
Error in UseMethod("slice"): no applicable method for 'slice' applied to an object of class "arrow_dplyr_query"

Head to the docs!

?acero

or view them at https://arrow.apache.org/docs/r/reference/acero.html

A different function

long_rides_2021 |>
  slice_max(n = 3, order_by = trip_distance, with_ties = FALSE) |>
  collect()
# A tibble: 3 × 3
  pickup_datetime      year trip_distance
  <dttm>              <int>         <dbl>
1 2021-11-16 04:55:00  2021       351613.
2 2021-10-27 09:46:00  2021       345124.
3 2021-12-11 02:48:00  2021       335094.

Or call collect() first

long_rides_2021 |>
  collect() |>
  slice(1:3)
# A tibble: 3 × 3
  pickup_datetime      year trip_distance
  <dttm>              <int>         <dbl>
1 2021-01-03 01:01:26  2021          216.
2 2021-01-03 03:36:52  2021          268.
3 2021-10-02 07:04:53  2021          188.

tidyr functions - pivot

library(tidyr)

nyc_taxi |> 
  group_by(vendor_name) |>
  summarise(max_fare = max(fare_amount)) |>
  pivot_longer(!vendor_name, names_to = "metric") |> 
  collect()
Error in UseMethod("pivot_longer"): no applicable method for 'pivot_longer' applied to an object of class "arrow_dplyr_query"

duckdb

  • in-memory database
  • columnar
  • understands Arrow format

sharing data with duckdb and arrow

tidyr functions - pivot with duckdb!

library(duckdb)

nyc_taxi |> 
  group_by(vendor_name) |>
  summarise(max_fare = max(fare_amount)) |>
  to_duckdb() |> # send data to duckdb
  pivot_longer(!vendor_name, names_to = "metric") |> 
  to_arrow() |> # return data back to arrow
  collect()
# A tibble: 3 × 3
  vendor_name metric     value
  <chr>       <chr>      <dbl>
1 CMT         max_fare 998310.
2 VTS         max_fare  10000.
3 <NA>        max_fare   3555.

Using functions inside verbs

Using functions inside verbs

  • lots of the lubridate and stringr APIs supported!
  • base R and others too - always good to check the docs

Morning vs afternoon with namespacing

nyc_taxi |>
  group_by(
    time_of_day = ifelse(lubridate::am(pickup_datetime), "morning", "afternoon")
  ) |>
  count() |>
  collect()
# A tibble: 2 × 2
# Groups:   time_of_day [2]
  time_of_day         n
  <chr>           <int>
1 afternoon   736491676
2 morning     413860990

Morning vs afternoon - without namespacing

library(lubridate)

nyc_taxi |>
  group_by(
    time_of_day = ifelse(am(pickup_datetime), "morning", "afternoon")
  ) |>
  count() |>
  collect()
# A tibble: 2 × 2
# Groups:   time_of_day [2]
  time_of_day         n
  <chr>           <int>
1 afternoon   736491676
2 morning     413860990

How does this work?

## Acero

  • arrow’s query execution engine
  • use Arrow functions on Arrow Datasets

Acero

arrow dplyr queries

What if a function isn’t implemented?

nyc_taxi |>
  mutate(vendor_name = na_if(vendor_name, "CMT")) |>
  head() |>
  collect()
Error in `na_if()`:
! Expression not supported in Arrow
→ Call collect() first to pull data into R.

Head to the docs again to see what’s implemented!

?acero

or view them at https://arrow.apache.org/docs/r/reference/acero.html

Option 1 - find a workaround!

nyc_taxi |>
  mutate(vendor_name = ifelse(vendor_name == "CMT", NA, vendor_name)) |>
  head() |>
  collect()
# A tibble: 6 × 24
  vendor_name pickup_datetime     dropoff_datetime    passenger_count
  <chr>       <dttm>              <dttm>                        <int>
1 <NA>        2012-01-20 06:09:36 2012-01-20 06:42:25               1
2 <NA>        2012-01-20 06:54:10 2012-01-20 07:06:55               1
3 <NA>        2012-01-20 00:08:01 2012-01-20 00:11:02               1
4 <NA>        2012-01-20 00:36:22 2012-01-20 00:39:44               1
5 <NA>        2012-01-20 12:58:32 2012-01-20 13:03:04               1
6 <NA>        2012-01-20 11:40:20 2012-01-20 11:43:43               2
# ℹ 20 more variables: trip_distance <dbl>, pickup_longitude <dbl>,
#   pickup_latitude <dbl>, rate_code <chr>, store_and_fwd <chr>,
#   dropoff_longitude <dbl>, dropoff_latitude <dbl>, payment_type <chr>,
#   fare_amount <dbl>, extra <dbl>, mta_tax <dbl>, tip_amount <dbl>,
#   tolls_amount <dbl>, total_amount <dbl>, improvement_surcharge <dbl>,
#   congestion_surcharge <dbl>, pickup_location_id <int>,
#   dropoff_location_id <int>, year <int>, month <int>

Option 2

  • In data manipulation part 2!

Your Turn

  1. Use the dplyr::filter() and stringr::str_ends() functions to return a subset of the data which is a) from September 2020, and b) the value in vendor_name ends with the letter “S”.

  2. Try to use the stringr function str_replace_na() to replace any NA values in the vendor_name column with the string “No vendor” instead. What happens, and why?

  3. Bonus question: see if you can find a different way of completing the task in question 2.

➡️ Data Manipulation Part I Exercises Page

Working with custom functions

Arrow 17.0.0 or later!

time_text <- function(time){
  day_of_week <- wday(time, label = TRUE, abbr = FALSE)
  time_of_day <- ifelse(lubridate::am(time), "AM", "PM")
  paste(day_of_week, time_of_day)
}

nyc_taxi |>
  mutate(pickup_text = time_text(pickup_datetime)) |>
  select(pickup_datetime, pickup_text) |>
  head() |>
  collect() 
# A tibble: 6 × 2
  pickup_datetime     pickup_text
  <dttm>              <chr>      
1 2012-01-08 12:50:38 Sunday PM  
2 2012-01-08 12:52:01 Sunday PM  
3 2012-01-07 18:39:26 Sunday AM  
4 2012-01-07 18:40:49 Sunday AM  
5 2012-01-08 19:42:37 Monday AM  
6 2012-01-08 12:51:47 Sunday PM  

How did that work?

Custom function converted to Arrow Expression; query doesn’t contain any reference to the time_text() function.

nyc_taxi |>
  mutate(pickup_text = time_text(pickup_datetime)) |>
  select(pickup_datetime, pickup_text)
FileSystemDataset (query)
pickup_datetime: timestamp[ms]
pickup_text: string (binary_join_element_wise(cast(strftime(pickup_datetime, {format="%A"}), {to_type=string, allow_int_overflow=false, allow_time_truncate=false, allow_time_overflow=false, allow_decimal_truncate=false, allow_float_truncate=false, allow_invalid_utf8=false}), cast(if_else((hour(pickup_datetime) < 12), "AM", "PM"), {to_type=string, allow_int_overflow=false, allow_time_truncate=false, allow_time_overflow=false, allow_decimal_truncate=false, allow_float_truncate=false, allow_invalid_utf8=false}), " ", {null_handling=REPLACE, null_replacement="NA"}))

See $.data for the source Arrow object

Anything else to be aware of?

  • arrow 17.0.0 or later
  • this will only work for functions which have Arrow bindings
  • use ?acero to see which ones do

Summary

  • Working with Arrow Datasets allow you to manipulate data which is larger-than-memory
  • You can use many dplyr functions with arrow - run ?acero to view the docs
  • You can pass data to duckdb to use functions implemented in duckdb but not arrow