22  Arrow

22.1 引言

CSV 文件被设计为易于人类阅读。它们是一种很好的交换格式,因为它们非常简单,并且几乎所有工具都能读取它们。但 CSV 文件效率不高:你需要做相当多的工作才能将数据读入 R。在本章中,你将学习一种强大的替代方案:parquet 格式,这是一种被大数据系统广泛使用的基于开放标准的格式。

我们将把 parquet 文件与 Apache Arrow 配对使用,这是一个为高效分析和传输大型数据集而设计的多语言工具箱。我们将通过 arrow 包 来使用 Apache Arrow,它提供了一个 dplyr 后端,允许你使用熟悉的 dplyr 语法来分析大于内存的数据集。另外一个好处是,arrow 非常快:你将在本章后面看到一些例子。

arrow 和 dbplyr 都提供了 dplyr 后端,所以你可能会想知道何时使用哪一个。在很多情况下,选择已经为你做好了,因为数据已经存在于数据库或 parquet 文件中,而你希望直接使用它。但如果你是从自己的数据(也许是 CSV 文件)开始,你可以将其加载到数据库中或转换为 parquet。总的来说,很难知道哪种方法效果最好,所以在你分析的早期阶段,我们鼓励你尝试两者,并选择最适合你的那一个。

(非常感谢 Danielle Navarro,她贡献了本章的初版。)

22.1.1 前提条件

在本章中,我们将继续使用 tidyverse,特别是 dplyr,但我们会将其与专门为处理大数据而设计的 arrow 包配对使用。

在本章的后面,我们还会看到 arrow 和 duckdb 之间的一些联系,所以我们还需要 dbplyr 和 duckdb。

library(dbplyr, warn.conflicts = FALSE)
library(duckdb)
#> 载入需要的程序包:DBI

22.2 获取数据

我们首先获取一个值得使用这些工具的数据集:西雅图公共图书馆的图书借阅数据集,可在线获取:data.seattle.gov/Community/Checkouts-by-Title/tmmm-ytt6。这个数据集包含 41,389,465 行,告诉你从 2005 年 4 月到 2022 年 10 月,每本书每月被借阅了多少次。

以下代码将为你获取该数据的缓存副本。数据是一个 9GB 的 CSV 文件,所以下载需要一些时间。我强烈推荐使用 curl::multi_download() 来获取非常大的文件,因为它正是为此目的而构建的:它会给你一个进度条,并且如果下载中断,它可以恢复下载。

# eval: !expr "!file.exists('data/seattle-library-checkouts.csv')"
dir.create("data", showWarnings = FALSE)

curl::multi_download(
    "https://r4ds.s3.us-west-2.amazonaws.com/seattle-library-checkouts.csv",
    "data/seattle-library-checkouts.csv",
    resume = TRUE
)
#> # A tibble: 1 × 10
#>   success status_code resumefrom url                    destfile        error
#>   <lgl>         <dbl>      <dbl> <chr>                  <chr>           <chr>
#> 1 TRUE            416          0 https://r4ds.s3.us-we… "E:\\r4ds-cn\\… <NA> 
#> # ℹ 4 more variables: type <chr>, modified <dttm>, time <dbl>,
#> #   headers <list>

22.3 打开数据集

让我们先来看看数据。这个文件有 9 GB,足够大,我们可能不想把整个文件都加载到内存中。一个好的经验法则是,你通常需要至少是数据大小两倍的内存,而许多笔记本电脑的内存上限是 16 GB。这意味着我们想避免使用 read_csv(),而是使用 arrow::open_dataset()

seattle_csv <- open_dataset(
    sources = "data/seattle-library-checkouts.csv",
    col_types = schema(ISBN = string()),
    format = "csv"
)

当这段代码运行时发生了什么?open_dataset() 会扫描几千行来确定数据集的结构。ISBN 列在前 80,000 行包含空白值,所以我们必须指定列类型来帮助 arrow 确定数据结构。一旦数据被 open_dataset() 扫描过,它会记录下它所发现的内容然后停止;它只会在你明确请求时才读取更多的行。这个元数据就是我们打印 seattle_csv 时看到的内容:

seattle_csv
#> FileSystemDataset with 1 csv file
#> 12 columns
#> UsageClass: string
#> CheckoutType: string
#> MaterialType: string
#> CheckoutYear: int64
#> CheckoutMonth: int64
#> Checkouts: int64
#> Title: string
#> ISBN: string
#> Creator: string
#> Subjects: string
#> Publisher: string
#> PublicationYear: string

输出的第一行告诉你 seattle_csv 是作为单个 CSV 文件存储在本地磁盘上的;它只会在需要时才被加载到内存中。输出的其余部分告诉了你 arrow 为每一列推断出的列类型。

我们可以用 glimpse() 来看实际内容。这揭示了有大约 4100 万行和 12 列,并向我们展示了几个值。

seattle_csv |> glimpse()
#> FileSystemDataset with 1 csv file
#> 41,389,465 rows x 12 columns
#> $ UsageClass      <string> "Physical", "Physical", "Digital", "Physical", "Ph…
#> $ CheckoutType    <string> "Horizon", "Horizon", "OverDrive", "Horizon", "Hor…
#> $ MaterialType    <string> "BOOK", "BOOK", "EBOOK", "BOOK", "SOUNDDISC", "BOO…
#> $ CheckoutYear     <int64> 2016, 2016, 2016, 2016, 2016, 2016, 2016, 2016, 20…
#> $ CheckoutMonth    <int64> 6, 6, 6, 6, 6, 6, 6, 6, 6, 6, 6, 6, 6, 6, 6, 6, 6,…
#> $ Checkouts        <int64> 1, 1, 1, 1, 1, 1, 1, 1, 4, 1, 1, 2, 3, 2, 1, 3, 2,…
#> $ Title           <string> "Super rich : a guide to having it all / Russell S…
#> $ ISBN            <string> "", "", "", "", "", "", "", "", "", "", "", "", ""…
#> $ Creator         <string> "Simmons, Russell", "Barclay, James, 1965-", "Tim …
#> $ Subjects        <string> "Self realization, Conduct of life, Attitude Psych…
#> $ Publisher       <string> "Gotham Books,", "Pyr,", "Random House, Inc.", "Di…
#> $ PublicationYear <string> "c2011.", "2010.", "2015", "2005.", "c2004.", "c20…

我们可以开始用 dplyr 动词来使用这个数据集,并用 collect() 来强制 arrow 执行计算并返回一些数据。例如,这段代码告诉我们每年的总借阅量:

seattle_csv |>
    group_by(CheckoutYear) |>
    summarise(Checkouts = sum(Checkouts)) |>
    arrange(CheckoutYear) |>
    collect()
#> # A tibble: 18 × 2
#>   CheckoutYear Checkouts
#>          <int>     <int>
#> 1         2005   3798685
#> 2         2006   6599318
#> 3         2007   7126627
#> 4         2008   8438486
#> 5         2009   9135167
#> 6         2010   8608966
#> # ℹ 12 more rows

多亏了 arrow,无论底层数据集有多大,这段代码都能工作。但它目前相当慢:在 Hadley 的电脑上,它运行了大约 10 秒。考虑到我们拥有的数据量,这不算太糟,但我们可以通过切换到更好的格式来使其快得多。

22.4 parquet 格式

为了让这个数据更容易处理,让我们切换到 parquet 文件格式,并将其分割成多个文件。接下来的部分将首先向你介绍 parquet 和分区 (partitioning),然后将我们学到的知识应用到西雅图图书馆的数据上。

22.4.1 parquet 的优势

像 CSV 一样,parquet 用于矩形数据,但它不是一个你可以用任何文件编辑器读取的文本格式,而是一个专为大数据需求设计的自定义二进制格式。这意味着:

  • Parquet 文件通常比等效的 CSV 文件小。Parquet 依赖于高效的编码方式来减小文件大小,并支持文件压缩。这有助于使 parquet 文件快速,因为从磁盘移动到内存的数据更少。

  • Parquet 文件有丰富的类型系统。正如我们在 Section 7.3 中讨论的,CSV 文件不提供任何关于列类型的信息。例如,CSV 读取器必须猜测 "08-10-2022" 应该被解析为字符串还是日期。相比之下,parquet 文件以一种记录了类型和数据的方式存储数据。

  • Parquet 文件是“列式”(column-oriented) 的。这意味着它们是按列组织的,很像 R 的数据框。与按行组织的 CSV 文件相比,这通常为数据分析任务带来更好的性能。

  • Parquet 文件是“分块”(chunked) 的,这使得可以同时处理文件的不同部分,并且,如果你幸运的话,可以完全跳过一些块。

parquet 文件有一个主要缺点:它们不再是“人类可读的”,也就是说,如果你用 readr::read_file() 查看一个 parquet 文件,你只会看到一堆乱码。

22.4.2 分区 (Partitioning)

随着数据集变得越来越大,将所有数据存储在单个文件中变得越来越痛苦,将大数据集分割到多个文件中通常很有用。当这种结构化做得巧妙时,这种策略可以带来显著的性能提升,因为许多分析只会需要文件的一个子集。

关于如何对你的数据集进行分区,没有硬性规定:结果将取决于你的数据、访问模式以及读取数据的系统。你可能需要做一些实验才能找到适合你情况的理想分区方式。作为一个粗略的指南,arrow 建议你避免小于 20MB 和大于 2GB 的文件,并避免产生超过 10,000 个文件的分区。你还应该尝试按你过滤时使用的变量进行分区;正如你稍后将看到的,这允许 arrow 通过只读取相关文件来跳过大量工作。

22.4.3 重写西雅图图书馆数据

让我们将这些想法应用到西雅图图书馆的数据上,看看它们在实践中是如何发挥作用的。我们将按 CheckoutYear 进行分区,因为很可能一些分析只想看最近的数据,而按年分区会产生 18 个大小合理的块。

为了重写数据,我们使用 dplyr::group_by() 定义分区,然后用 arrow::write_dataset() 将分区保存到一个目录中。write_dataset() 有两个重要的参数:一个是我们将在其中创建文件的目录,另一个是我们使用的格式。

pq_path <- "data/seattle-library-checkouts"
## | eval: !expr "!file.exists(pq_path)"

seattle_csv |>
    group_by(CheckoutYear) |>
    write_dataset(path = pq_path, format = "parquet")

这需要大约一分钟的时间来运行;正如我们稍后将看到的,这是一项初步的投资,它通过使未来的操作快得多来得到回报。

让我们看看我们刚刚产生了什么:

tibble(
    files = list.files(pq_path, recursive = TRUE),
    size_MB = file.size(file.path(pq_path, files)) / 1024^2
)
#> # A tibble: 18 × 2
#>   files                            size_MB
#>   <chr>                              <dbl>
#> 1 CheckoutYear=2005/part-0.parquet    109.
#> 2 CheckoutYear=2006/part-0.parquet    164.
#> 3 CheckoutYear=2007/part-0.parquet    177.
#> 4 CheckoutYear=2008/part-0.parquet    194.
#> 5 CheckoutYear=2009/part-0.parquet    214.
#> 6 CheckoutYear=2010/part-0.parquet    222.
#> # ℹ 12 more rows

我们单个 9GB 的 CSV 文件被重写成了 18 个 parquet 文件。文件名使用了 Apache Hive 项目使用的“自描述”约定。Hive 风格的分区 (Hive-style partitions) 使用“键=值”的约定来命名文件夹,所以你可能猜到,CheckoutYear=2005 目录包含了所有 CheckoutYear 是 2005 的数据。每个文件在 100 到 300 MB 之间,总大小现在约为 4 GB,略多于原始 CSV 文件的一半大小。这正如我们预期的那样,因为 parquet 是一种更高效的格式。

22.5 结合使用 dplyr 和 arrow

现在我们已经创建了这些 parquet 文件,我们需要再次读取它们。我们再次使用 open_dataset(),但这次我们给它一个目录:

seattle_pq <- open_dataset(pq_path)

现在我们可以编写我们的 dplyr 管道了。例如,我们可以计算过去五年中每个月借出的图书总数:

query <- seattle_pq |>
    filter(CheckoutYear >= 2018, MaterialType == "BOOK") |>
    group_by(CheckoutYear, CheckoutMonth) |>
    summarize(TotalCheckouts = sum(Checkouts)) |>
    arrange(CheckoutYear, CheckoutMonth)

为 arrow 数据编写 dplyr 代码在概念上与 dbplyr 类似,Chapter 21:你编写 dplyr 代码,它被自动转换为 Apache Arrow C++ 库能理解的查询,然后在你调用 collect() 时执行。如果我们打印出 query 对象,我们可以看到一些关于我们期望 Arrow 在执行发生时返回什么的信息:

query
#> FileSystemDataset (query)
#> CheckoutYear: int32
#> CheckoutMonth: int64
#> TotalCheckouts: int64
#> 
#> * Grouped by CheckoutYear
#> * Sorted by CheckoutYear [asc], CheckoutMonth [asc]
#> See $.data for the source Arrow object

我们可以通过调用 collect() 来得到结果:

#> # A tibble: 58 × 3
#> # Groups:   CheckoutYear [5]
#>   CheckoutYear CheckoutMonth TotalCheckouts
#>          <int>         <int>          <int>
#> 1         2018             1         355101
#> 2         2018             2         309813
#> 3         2018             3         344487
#> 4         2018             4         330988
#> 5         2018             5         318049
#> 6         2018             6         341825
#> # ℹ 52 more rows

像 dbplyr 一样,arrow 只理解一些 R 表达式,所以你可能无法编写与平时完全相同的代码。然而,支持的操作和函数列表相当广泛,并且在不断增长;在 ?acero 中可以找到当前支持的函数的完整列表。

22.5.1 性能

让我们快速看一下从 CSV 切换到 parquet 对性能的影响。首先,让我们计时计算 2021 年每个月借出的图书数量需要多长时间,当数据存储为单个大型 csv 时:

seattle_csv |>
    filter(CheckoutYear == 2021, MaterialType == "BOOK") |>
    group_by(CheckoutMonth) |>
    summarize(TotalCheckouts = sum(Checkouts)) |>
    arrange(desc(CheckoutMonth)) |>
    collect() |>
    system.time()
#>  用户  系统  流逝 
#> 11.53  1.61 11.09

现在让我们使用我们新版本的数据集,其中西雅图图书馆的借阅数据被分成了 18 个较小的 parquet 文件:

seattle_pq |>
    filter(CheckoutYear == 2021, MaterialType == "BOOK") |>
    group_by(CheckoutMonth) |>
    summarize(TotalCheckouts = sum(Checkouts)) |>
    arrange(desc(CheckoutMonth)) |>
    collect() |>
    system.time()
#> 用户 系统 流逝 
#> 0.29 0.00 0.11

性能上约 100 倍的提速归因于两个因素:多文件分区和单个文件的格式:

  • 分区提高了性能,因为这个查询使用 CheckoutYear == 2021 来过滤数据,而 arrow 足够聪明,能识别出它只需要读取 18 个 parquet 文件中的 1 个。
  • parquet 格式通过以二进制格式存储数据来提高性能,这种格式可以更直接地读入内存。列式格式和丰富的元数据意味着 arrow 只需要读取查询中实际使用的四列(CheckoutYearMaterialTypeCheckoutMonthCheckouts)。

这种巨大的性能差异就是为什么将大型 CSV 转换为 parquet 是值得的!

22.5.2 结合使用 duckdb 和 arrow

parquet 和 arrow 还有一个最后的优势——通过调用 arrow::to_duckdb(),将 arrow 数据集转换为 DuckDB 数据库(Chapter 21)非常容易:

seattle_pq |>
    to_duckdb() |>
    filter(CheckoutYear >= 2018, MaterialType == "BOOK") |>
    group_by(CheckoutYear) |>
    summarize(TotalCheckouts = sum(Checkouts)) |>
    arrange(desc(CheckoutYear)) |>
    collect()
#> Warning: Missing values are always removed in SQL aggregation functions.
#> Use `na.rm = TRUE` to silence this warning
#> This warning is displayed once every 8 hours.
#> # A tibble: 5 × 2
#>   CheckoutYear TotalCheckouts
#>          <int>          <dbl>
#> 1         2022        2431502
#> 2         2021        2266438
#> 3         2020        1241999
#> 4         2019        3931688
#> 5         2018        3987569

to_duckdb() 的妙处在于传输不涉及任何内存复制,这体现了 arrow 生态系统的目标:实现从一个计算环境到另一个计算环境的无缝过渡。

22.5.3 练习

  1. 找出每年最受欢迎的书。
  2. 哪位作者在西雅图图书馆系统中的图书最多?
  3. 在过去 10 年中,实体书与电子书的借阅情况是如何变化的?

22.6 总结

在本章中,你初步了解了 arrow 包,它提供了一个用于处理大型磁盘数据集的 dplyr 后端。它可以处理 CSV 文件,但如果你将数据转换为 parquet,速度会快得多。Parquet 是一种专为现代计算机上的数据分析而设计的二进制数据格式。与 CSV 相比,能处理 parquet 文件的工具要少得多,但其分区、压缩和列式结构使其分析效率高得多。

接下来,你将学习你的第一个非矩形数据源,你将使用 tidyr 包提供的工具来处理它。我们将专注于来自 JSON 文件的数据,但通用原则适用于任何树状数据,无论其来源如何。