Data Engineering with Arrow Exercises

Schemas

library(arrow)
library(dplyr)
seattle_csv <- open_dataset(sources = "data/seattle-library-checkouts.csv",
                            format = "csv")
Data Types & Controlling the Schema
  1. The first few thousand rows of ISBN are blank in the Seattle Checkouts CSV file. Read in the Seattle Checkouts CSV file with open_dataset() and ensure the correct data type for ISBN is <string> (or the alias <utf8>) instead of the <null> interpreted by Arrow.

  2. Once you have a Dataset object with the correct data types, count the number of Checkouts by CheckoutYear and arrange the result by CheckoutYear.

seattle_csv <- open_dataset(sources = "data/seattle-library-checkouts.csv",
  format = "csv",
  schema(
    UsageClass = utf8(),
    CheckoutType = utf8(),
    MaterialType = utf8(),
    CheckoutYear = int64(),
    CheckoutMonth = int64(),
    Checkouts = int64(),
    Title = utf8(),
    ISBN = string(), #or utf8()
    Creator = utf8(),
    Subjects = utf8(),
    Publisher = utf8(),
    PublicationYear = utf8()
  ),
    skip = 1,
)
seattle_csv
FileSystemDataset with 1 csv file
12 columns
UsageClass: string
CheckoutType: string
MaterialType: string
CheckoutYear: int64
CheckoutMonth: int64
Checkouts: int64
Title: string
ISBN: string
Creator: string
Subjects: string
Publisher: string
PublicationYear: string

or

seattle_csv <- open_dataset(sources = "data/seattle-library-checkouts.csv",
  format = "csv",
  col_types = schema(ISBN = string()) # or utf8()
)
seattle_csv
FileSystemDataset with 1 csv file
12 columns
UsageClass: string
CheckoutType: string
MaterialType: string
CheckoutYear: int64
CheckoutMonth: int64
Checkouts: int64
Title: string
ISBN: string
Creator: string
Subjects: string
Publisher: string
PublicationYear: string

The number of Checkouts by CheckoutYear arranged by CheckoutYear:

seattle_csv |>
  group_by(CheckoutYear) |>
  summarise(sum(Checkouts)) |>
  arrange(CheckoutYear) |> 
  collect()
# A tibble: 18 Γ— 2
   CheckoutYear `sum(Checkouts)`
          <int>            <int>
 1         2005          3798685
 2         2006          6599318
 3         2007          7126627
 4         2008          8438486
 5         2009          9135167
 6         2010          8608966
 7         2011          8321732
 8         2012          8163046
 9         2013          9057096
10         2014          9136081
11         2015          9084179
12         2016          9021051
13         2017          9231648
14         2018          9149176
15         2019          9199083
16         2020          6053717
17         2021          7361031
18         2022          7001989

or

seattle_csv |> 
  count(CheckoutYear, wt = Checkouts) |> 
  arrange(CheckoutYear) |> 
  collect()
# A tibble: 18 Γ— 2
   CheckoutYear       n
          <int>   <int>
 1         2005 3798685
 2         2006 6599318
 3         2007 7126627
 4         2008 8438486
 5         2009 9135167
 6         2010 8608966
 7         2011 8321732
 8         2012 8163046
 9         2013 9057096
10         2014 9136081
11         2015 9084179
12         2016 9021051
13         2017 9231648
14         2018 9149176
15         2019 9199083
16         2020 6053717
17         2021 7361031
18         2022 7001989

Timing the query:

seattle_csv |>
  group_by(CheckoutYear) |>
  summarise(sum(Checkouts)) |>
  arrange(CheckoutYear) |> 
  collect() |> 
  system.time()
   user  system elapsed 
 10.660   1.244  10.399 

Querying 42 million rows of data stored in a CSV on disk in ~10 seconds, not too bad.

Parquet

seattle_parquet <- "data/seattle-library-checkouts-parquet"

seattle_csv |>
  write_dataset(path = seattle_parquet,
                format = "parquet")
Parquet
  1. Re-run the query counting the number of Checkouts by CheckoutYear and arranging the result by CheckoutYear, this time using the Seattle Checkout data saved to disk as a single Parquet file. Did you notice a difference in compute time?
seattle_parquet <- "data/seattle-library-checkouts-parquet"

open_dataset(sources = seattle_parquet, 
             format = "parquet") |>
  group_by(CheckoutYear) |>
  summarise(sum(Checkouts)) |>
  arrange(CheckoutYear) |> 
  collect() |> 
  system.time()
   user  system elapsed 
  1.758   0.445   0.557 

A much faster compute time for the query when the on-disk data is stored in the Parquet format.

Partitioning

seattle_parquet_part <- "data/seattle-library-checkouts"

seattle_csv |>
  group_by(CheckoutYear) |>
  write_dataset(path = seattle_parquet_part,
                format = "parquet")
Partitioning
  1. Let’s write the Seattle Checkout CSV data to a multi-file dataset just one more time! This time, write the data partitioned by CheckoutType as Parquet files.

  2. Now compare the compute time between our Parquet data partitioned by CheckoutYear and our Parquet data partitioned by CheckoutType with a query of the total number of checkouts in September of 2019. Did you find a difference in compute time?

Writing the data:

seattle_checkouttype <- "data/seattle-library-checkouts-type"

seattle_csv |>
  group_by(CheckoutType) |>
  write_dataset(path = seattle_checkouttype,
                format = "parquet")

Total number of Checkouts in September of 2019 using partitioned Parquet data by CheckoutType:

open_dataset(sources = "data/seattle-library-checkouts-type") |> 
  filter(CheckoutYear == 2019, CheckoutMonth == 9) |> 
  summarise(TotalCheckouts = sum(Checkouts)) |>
  collect() |> 
  system.time()
   user  system elapsed 
  0.781   0.085   0.297 

Total number of Checkouts in September of 2019 using partitioned Parquet data by CheckoutYear and CheckoutMonth:

open_dataset("data/seattle-library-checkouts") |> 
  filter(CheckoutYear == 2019, CheckoutMonth == 9) |> 
  summarise(TotalCheckouts = sum(Checkouts)) |>
  collect() |> 
  system.time()
   user  system elapsed 
  0.033   0.006   0.031 

Faster compute time because the filter() call is based on the partitions.