Arrow in R: In-Memory Workflows

arrow 📦

Arrow & Single Files


library(arrow)

  • read_parquet()
  • read_csv_arrow()
  • read_feather()
  • read_json_arrow()

Value: tibble (the default), or an Arrow Table if as_data_frame = FALSE — both in-memory

Your Turn

  1. Read in a single NYC Taxi parquet file using read_parquet() as an Arrow Table

  2. Convert your Arrow Table object to a data.frame or a tibble

Read a Parquet File (tibble)

library(arrow)

parquet_file <- "data/nyc-taxi/year=2019/month=9/part-0.parquet"

taxi_df <- read_parquet(file = parquet_file)
taxi_df
# A tibble: 6,567,396 × 22
   vendor_name pickup_datetime     dropoff_datetime    passenger_count
   <chr>       <dttm>              <dttm>                        <int>
 1 CMT         2019-08-31 18:09:30 2019-08-31 18:15:42               1
 2 CMT         2019-08-31 18:26:30 2019-08-31 18:44:31               1
 3 CMT         2019-08-31 18:39:35 2019-08-31 19:15:55               2
 4 VTS         2019-08-31 18:12:26 2019-08-31 18:15:17               4
 5 VTS         2019-08-31 18:43:16 2019-08-31 18:53:50               1
 6 VTS         2019-08-31 18:26:13 2019-08-31 18:45:35               1
 7 CMT         2019-08-31 18:34:52 2019-08-31 18:42:03               1
 8 CMT         2019-08-31 18:50:02 2019-08-31 18:58:16               1
 9 CMT         2019-08-31 18:08:02 2019-08-31 18:14:44               0
10 VTS         2019-08-31 18:11:38 2019-08-31 18:26:47               1
# ℹ 6,567,386 more rows
# ℹ 18 more variables: trip_distance <dbl>, pickup_longitude <dbl>,
#   pickup_latitude <dbl>, rate_code <chr>, store_and_fwd <chr>,
#   dropoff_longitude <dbl>, dropoff_latitude <dbl>, payment_type <chr>,
#   fare_amount <dbl>, extra <dbl>, mta_tax <dbl>, tip_amount <dbl>,
#   tolls_amount <dbl>, total_amount <dbl>, improvement_surcharge <dbl>,
#   congestion_surcharge <dbl>, pickup_location_id <int>, …

Read a Parquet File (Table)

taxi_table <- read_parquet(file = parquet_file, as_data_frame = FALSE)
taxi_table
Table
6567396 rows x 22 columns
$vendor_name <string>
$pickup_datetime <timestamp[ms]>
$dropoff_datetime <timestamp[ms]>
$passenger_count <int64>
$trip_distance <double>
$pickup_longitude <double>
$pickup_latitude <double>
$rate_code <string>
$store_and_fwd <string>
$dropoff_longitude <double>
$dropoff_latitude <double>
$payment_type <string>
$fare_amount <double>
$extra <double>
$mta_tax <double>
$tip_amount <double>
$tolls_amount <double>
$total_amount <double>
$improvement_surcharge <double>
$congestion_surcharge <double>
...
2 more columns
Use `schema()` to see entire schema

tibble <-> Table <-> data.frame

library(dplyr)

#change a df to a table
arrow_table(taxi_df)

#change a table to a tibble
taxi_table |> collect()
as_tibble(taxi_table)

#change a table to a data.frame
as.data.frame(taxi_table)


  • data.frame & tibble are R objects in-memory
  • Table is an Arrow object in-memory

Watch Your Schemas 👀

schema(taxi_df)
Schema
vendor_name: string
pickup_datetime: timestamp[us, tz=America/Vancouver]
dropoff_datetime: timestamp[us, tz=America/Vancouver]
passenger_count: int32
trip_distance: double
pickup_longitude: double
pickup_latitude: double
rate_code: string
store_and_fwd: string
dropoff_longitude: double
dropoff_latitude: double
payment_type: string
fare_amount: double
extra: double
mta_tax: double
tip_amount: double
tolls_amount: double
total_amount: double
improvement_surcharge: double
congestion_surcharge: double
pickup_location_id: int32
dropoff_location_id: int32
schema(taxi_table)
Schema
vendor_name: string
pickup_datetime: timestamp[ms]
dropoff_datetime: timestamp[ms]
passenger_count: int64
trip_distance: double
pickup_longitude: double
pickup_latitude: double
rate_code: string
store_and_fwd: string
dropoff_longitude: double
dropoff_latitude: double
payment_type: string
fare_amount: double
extra: double
mta_tax: double
tip_amount: double
tolls_amount: double
total_amount: double
improvement_surcharge: double
congestion_surcharge: double
pickup_location_id: int64
dropoff_location_id: int64

Data frames

Arrow Tables

Table | Dataset: A dplyr pipeline

parquet_file |>
  read_parquet(as_data_frame = FALSE) |>
  group_by(vendor_name) |>
  summarise(all_trips = n(),
            shared_trips = sum(passenger_count > 1, na.rm = TRUE)) |>
  mutate(pct_shared = shared_trips / all_trips * 100) |>
  collect()
# A tibble: 3 × 4
  vendor_name all_trips shared_trips pct_shared
  <chr>           <int>        <int>      <dbl>
1 VTS           4238808      1339478       31.6
2 CMT           2294473       470344       20.5
3 <NA>            34115            0        0  


Functions available in Arrow dplyr queries: https://arrow.apache.org/docs/r/reference/acero.html

Arrow for Efficient In-Memory Processing

parquet_file |>
  read_parquet() |>
  nrow()
[1] 6567396


parquet_file |>
  read_parquet() |>
  group_by(vendor_name) |>
  summarise(all_trips = n(),
            shared_trips = sum(passenger_count > 1, na.rm = TRUE)) |>
  mutate(pct_shared = shared_trips / all_trips * 100) |>
  collect() |>
  system.time()
   user  system elapsed 
  1.157   0.261   0.509 

Arrow for Efficient In-Memory Processing

parquet_file |>
  read_parquet(as_data_frame = FALSE) |>
  nrow()
[1] 6567396


parquet_file |>
  read_parquet(as_data_frame = FALSE) |>
  group_by(vendor_name) |>
  summarise(all_trips = n(),
            shared_trips = sum(passenger_count > 1, na.rm = TRUE)) |>
  mutate(pct_shared = shared_trips / all_trips * 100) |>
  collect() |>
  system.time()
   user  system elapsed 
  1.047   0.203   0.220 

Read a Parquet File Selectively

parquet_file |>
  read_parquet(
    col_select = c("vendor_name", "passenger_count"),
    as_data_frame = FALSE
  )
Table
6567396 rows x 2 columns
$vendor_name <string>
$passenger_count <int64>

Selective Reads Are Faster

parquet_file |>
  read_parquet(
    col_select = c("vendor_name", "passenger_count"),
    as_data_frame = FALSE
  ) |> 
  group_by(vendor_name) |>
  summarise(all_trips = n(),
            shared_trips = sum(passenger_count > 1, na.rm = TRUE)) |>
  mutate(pct_shared = shared_trips / all_trips * 100) |>
  collect() |>
  system.time()
   user  system elapsed 
  0.258   0.011   0.131 

Arrow Table or Dataset?

https://francoismichonneau.net/2022/10/import-big-csv/

Arrow for Improving Those Sluggish Worklows

  • a “drop-in” for many dplyr workflows (Arrow Table or Dataset)
  • works when your tabular data get too big for your RAM (Arrow Dataset)
  • provides tools for re-engineering data storage for better performance (arrow::write_dataset())