22 Arrow
22.1 引言
CSV 文件被设计为易于人类阅读。它们是一种很好的交换格式,因为它们非常简单,并且几乎所有工具都能读取它们。但 CSV 文件效率不高:你需要做相当多的工作才能将数据读入 R。在本章中,你将学习一种强大的替代方案:parquet 格式,这是一种被大数据系统广泛使用的基于开放标准的格式。
我们将把 parquet 文件与 Apache Arrow 配对使用,这是一个为高效分析和传输大型数据集而设计的多语言工具箱。我们将通过 arrow 包 来使用 Apache Arrow,它提供了一个 dplyr 后端,允许你使用熟悉的 dplyr 语法来分析大于内存的数据集。另外一个好处是,arrow 非常快:你将在本章后面看到一些例子。
arrow 和 dbplyr 都提供了 dplyr 后端,所以你可能会想知道何时使用哪一个。在很多情况下,选择已经为你做好了,因为数据已经存在于数据库或 parquet 文件中,而你希望直接使用它。但如果你是从自己的数据(也许是 CSV 文件)开始,你可以将其加载到数据库中或转换为 parquet。总的来说,很难知道哪种方法效果最好,所以在你分析的早期阶段,我们鼓励你尝试两者,并选择最适合你的那一个。
(非常感谢 Danielle Navarro,她贡献了本章的初版。)
22.1.1 前提条件
在本章中,我们将继续使用 tidyverse,特别是 dplyr,但我们会将其与专门为处理大数据而设计的 arrow 包配对使用。
在本章的后面,我们还会看到 arrow 和 duckdb 之间的一些联系,所以我们还需要 dbplyr 和 duckdb。
22.2 获取数据
我们首先获取一个值得使用这些工具的数据集:西雅图公共图书馆的图书借阅数据集,可在线获取:data.seattle.gov/Community/Checkouts-by-Title/tmmm-ytt6。这个数据集包含 41,389,465 行,告诉你从 2005 年 4 月到 2022 年 10 月,每本书每月被借阅了多少次。
以下代码将为你获取该数据的缓存副本。数据是一个 9GB 的 CSV 文件,所以下载需要一些时间。我强烈推荐使用 curl::multi_download()
来获取非常大的文件,因为它正是为此目的而构建的:它会给你一个进度条,并且如果下载中断,它可以恢复下载。
# eval: !expr "!file.exists('data/seattle-library-checkouts.csv')"
dir.create("data", showWarnings = FALSE)
curl::multi_download(
"https://r4ds.s3.us-west-2.amazonaws.com/seattle-library-checkouts.csv",
"data/seattle-library-checkouts.csv",
resume = TRUE
)
#> # A tibble: 1 × 10
#> success status_code resumefrom url destfile error
#> <lgl> <dbl> <dbl> <chr> <chr> <chr>
#> 1 TRUE 416 0 https://r4ds.s3.us-we… "E:\\r4ds-cn\\… <NA>
#> # ℹ 4 more variables: type <chr>, modified <dttm>, time <dbl>,
#> # headers <list>
22.3 打开数据集
让我们先来看看数据。这个文件有 9 GB,足够大,我们可能不想把整个文件都加载到内存中。一个好的经验法则是,你通常需要至少是数据大小两倍的内存,而许多笔记本电脑的内存上限是 16 GB。这意味着我们想避免使用 read_csv()
,而是使用 arrow::open_dataset()
:
seattle_csv <- open_dataset(
sources = "data/seattle-library-checkouts.csv",
col_types = schema(ISBN = string()),
format = "csv"
)
当这段代码运行时发生了什么?open_dataset()
会扫描几千行来确定数据集的结构。ISBN
列在前 80,000 行包含空白值,所以我们必须指定列类型来帮助 arrow 确定数据结构。一旦数据被 open_dataset()
扫描过,它会记录下它所发现的内容然后停止;它只会在你明确请求时才读取更多的行。这个元数据就是我们打印 seattle_csv
时看到的内容:
seattle_csv
#> FileSystemDataset with 1 csv file
#> 12 columns
#> UsageClass: string
#> CheckoutType: string
#> MaterialType: string
#> CheckoutYear: int64
#> CheckoutMonth: int64
#> Checkouts: int64
#> Title: string
#> ISBN: string
#> Creator: string
#> Subjects: string
#> Publisher: string
#> PublicationYear: string
输出的第一行告诉你 seattle_csv
是作为单个 CSV 文件存储在本地磁盘上的;它只会在需要时才被加载到内存中。输出的其余部分告诉了你 arrow 为每一列推断出的列类型。
我们可以用 glimpse()
来看实际内容。这揭示了有大约 4100 万行和 12 列,并向我们展示了几个值。
seattle_csv |> glimpse()
#> FileSystemDataset with 1 csv file
#> 41,389,465 rows x 12 columns
#> $ UsageClass <string> "Physical", "Physical", "Digital", "Physical", "Ph…
#> $ CheckoutType <string> "Horizon", "Horizon", "OverDrive", "Horizon", "Hor…
#> $ MaterialType <string> "BOOK", "BOOK", "EBOOK", "BOOK", "SOUNDDISC", "BOO…
#> $ CheckoutYear <int64> 2016, 2016, 2016, 2016, 2016, 2016, 2016, 2016, 20…
#> $ CheckoutMonth <int64> 6, 6, 6, 6, 6, 6, 6, 6, 6, 6, 6, 6, 6, 6, 6, 6, 6,…
#> $ Checkouts <int64> 1, 1, 1, 1, 1, 1, 1, 1, 4, 1, 1, 2, 3, 2, 1, 3, 2,…
#> $ Title <string> "Super rich : a guide to having it all / Russell S…
#> $ ISBN <string> "", "", "", "", "", "", "", "", "", "", "", "", ""…
#> $ Creator <string> "Simmons, Russell", "Barclay, James, 1965-", "Tim …
#> $ Subjects <string> "Self realization, Conduct of life, Attitude Psych…
#> $ Publisher <string> "Gotham Books,", "Pyr,", "Random House, Inc.", "Di…
#> $ PublicationYear <string> "c2011.", "2010.", "2015", "2005.", "c2004.", "c20…
我们可以开始用 dplyr 动词来使用这个数据集,并用 collect()
来强制 arrow 执行计算并返回一些数据。例如,这段代码告诉我们每年的总借阅量:
seattle_csv |>
group_by(CheckoutYear) |>
summarise(Checkouts = sum(Checkouts)) |>
arrange(CheckoutYear) |>
collect()
#> # A tibble: 18 × 2
#> CheckoutYear Checkouts
#> <int> <int>
#> 1 2005 3798685
#> 2 2006 6599318
#> 3 2007 7126627
#> 4 2008 8438486
#> 5 2009 9135167
#> 6 2010 8608966
#> # ℹ 12 more rows
多亏了 arrow,无论底层数据集有多大,这段代码都能工作。但它目前相当慢:在 Hadley 的电脑上,它运行了大约 10 秒。考虑到我们拥有的数据量,这不算太糟,但我们可以通过切换到更好的格式来使其快得多。
22.4 parquet 格式
为了让这个数据更容易处理,让我们切换到 parquet 文件格式,并将其分割成多个文件。接下来的部分将首先向你介绍 parquet 和分区 (partitioning),然后将我们学到的知识应用到西雅图图书馆的数据上。
22.4.1 parquet 的优势
像 CSV 一样,parquet 用于矩形数据,但它不是一个你可以用任何文件编辑器读取的文本格式,而是一个专为大数据需求设计的自定义二进制格式。这意味着:
Parquet 文件通常比等效的 CSV 文件小。Parquet 依赖于高效的编码方式来减小文件大小,并支持文件压缩。这有助于使 parquet 文件快速,因为从磁盘移动到内存的数据更少。
Parquet 文件有丰富的类型系统。正如我们在 Section 7.3 中讨论的,CSV 文件不提供任何关于列类型的信息。例如,CSV 读取器必须猜测
"08-10-2022"
应该被解析为字符串还是日期。相比之下,parquet 文件以一种记录了类型和数据的方式存储数据。Parquet 文件是“列式”(column-oriented) 的。这意味着它们是按列组织的,很像 R 的数据框。与按行组织的 CSV 文件相比,这通常为数据分析任务带来更好的性能。
Parquet 文件是“分块”(chunked) 的,这使得可以同时处理文件的不同部分,并且,如果你幸运的话,可以完全跳过一些块。
parquet 文件有一个主要缺点:它们不再是“人类可读的”,也就是说,如果你用 readr::read_file()
查看一个 parquet 文件,你只会看到一堆乱码。
22.4.2 分区 (Partitioning)
随着数据集变得越来越大,将所有数据存储在单个文件中变得越来越痛苦,将大数据集分割到多个文件中通常很有用。当这种结构化做得巧妙时,这种策略可以带来显著的性能提升,因为许多分析只会需要文件的一个子集。
关于如何对你的数据集进行分区,没有硬性规定:结果将取决于你的数据、访问模式以及读取数据的系统。你可能需要做一些实验才能找到适合你情况的理想分区方式。作为一个粗略的指南,arrow 建议你避免小于 20MB 和大于 2GB 的文件,并避免产生超过 10,000 个文件的分区。你还应该尝试按你过滤时使用的变量进行分区;正如你稍后将看到的,这允许 arrow 通过只读取相关文件来跳过大量工作。
22.4.3 重写西雅图图书馆数据
让我们将这些想法应用到西雅图图书馆的数据上,看看它们在实践中是如何发挥作用的。我们将按 CheckoutYear
进行分区,因为很可能一些分析只想看最近的数据,而按年分区会产生 18 个大小合理的块。
为了重写数据,我们使用 dplyr::group_by()
定义分区,然后用 arrow::write_dataset()
将分区保存到一个目录中。write_dataset()
有两个重要的参数:一个是我们将在其中创建文件的目录,另一个是我们使用的格式。
pq_path <- "data/seattle-library-checkouts"
## | eval: !expr "!file.exists(pq_path)"
seattle_csv |>
group_by(CheckoutYear) |>
write_dataset(path = pq_path, format = "parquet")
这需要大约一分钟的时间来运行;正如我们稍后将看到的,这是一项初步的投资,它通过使未来的操作快得多来得到回报。
让我们看看我们刚刚产生了什么:
tibble(
files = list.files(pq_path, recursive = TRUE),
size_MB = file.size(file.path(pq_path, files)) / 1024^2
)
#> # A tibble: 18 × 2
#> files size_MB
#> <chr> <dbl>
#> 1 CheckoutYear=2005/part-0.parquet 109.
#> 2 CheckoutYear=2006/part-0.parquet 164.
#> 3 CheckoutYear=2007/part-0.parquet 177.
#> 4 CheckoutYear=2008/part-0.parquet 194.
#> 5 CheckoutYear=2009/part-0.parquet 214.
#> 6 CheckoutYear=2010/part-0.parquet 222.
#> # ℹ 12 more rows
我们单个 9GB 的 CSV 文件被重写成了 18 个 parquet 文件。文件名使用了 Apache Hive 项目使用的“自描述”约定。Hive 风格的分区 (Hive-style partitions) 使用“键=值”的约定来命名文件夹,所以你可能猜到,CheckoutYear=2005
目录包含了所有 CheckoutYear
是 2005 的数据。每个文件在 100 到 300 MB 之间,总大小现在约为 4 GB,略多于原始 CSV 文件的一半大小。这正如我们预期的那样,因为 parquet 是一种更高效的格式。
22.5 结合使用 dplyr 和 arrow
现在我们已经创建了这些 parquet 文件,我们需要再次读取它们。我们再次使用 open_dataset()
,但这次我们给它一个目录:
seattle_pq <- open_dataset(pq_path)
现在我们可以编写我们的 dplyr 管道了。例如,我们可以计算过去五年中每个月借出的图书总数:
为 arrow 数据编写 dplyr 代码在概念上与 dbplyr 类似,Chapter 21:你编写 dplyr 代码,它被自动转换为 Apache Arrow C++ 库能理解的查询,然后在你调用 collect()
时执行。如果我们打印出 query
对象,我们可以看到一些关于我们期望 Arrow 在执行发生时返回什么的信息:
query
#> FileSystemDataset (query)
#> CheckoutYear: int32
#> CheckoutMonth: int64
#> TotalCheckouts: int64
#>
#> * Grouped by CheckoutYear
#> * Sorted by CheckoutYear [asc], CheckoutMonth [asc]
#> See $.data for the source Arrow object
我们可以通过调用 collect()
来得到结果:
#> # A tibble: 58 × 3
#> # Groups: CheckoutYear [5]
#> CheckoutYear CheckoutMonth TotalCheckouts
#> <int> <int> <int>
#> 1 2018 1 355101
#> 2 2018 2 309813
#> 3 2018 3 344487
#> 4 2018 4 330988
#> 5 2018 5 318049
#> 6 2018 6 341825
#> # ℹ 52 more rows
像 dbplyr 一样,arrow 只理解一些 R 表达式,所以你可能无法编写与平时完全相同的代码。然而,支持的操作和函数列表相当广泛,并且在不断增长;在 ?acero
中可以找到当前支持的函数的完整列表。
22.5.1 性能
让我们快速看一下从 CSV 切换到 parquet 对性能的影响。首先,让我们计时计算 2021 年每个月借出的图书数量需要多长时间,当数据存储为单个大型 csv 时:
现在让我们使用我们新版本的数据集,其中西雅图图书馆的借阅数据被分成了 18 个较小的 parquet 文件:
性能上约 100 倍的提速归因于两个因素:多文件分区和单个文件的格式:
- 分区提高了性能,因为这个查询使用
CheckoutYear == 2021
来过滤数据,而 arrow 足够聪明,能识别出它只需要读取 18 个 parquet 文件中的 1 个。 - parquet 格式通过以二进制格式存储数据来提高性能,这种格式可以更直接地读入内存。列式格式和丰富的元数据意味着 arrow 只需要读取查询中实际使用的四列(
CheckoutYear
、MaterialType
、CheckoutMonth
和Checkouts
)。
这种巨大的性能差异就是为什么将大型 CSV 转换为 parquet 是值得的!
22.5.2 结合使用 duckdb 和 arrow
parquet 和 arrow 还有一个最后的优势——通过调用 arrow::to_duckdb()
,将 arrow 数据集转换为 DuckDB 数据库(Chapter 21)非常容易:
seattle_pq |>
to_duckdb() |>
filter(CheckoutYear >= 2018, MaterialType == "BOOK") |>
group_by(CheckoutYear) |>
summarize(TotalCheckouts = sum(Checkouts)) |>
arrange(desc(CheckoutYear)) |>
collect()
#> Warning: Missing values are always removed in SQL aggregation functions.
#> Use `na.rm = TRUE` to silence this warning
#> This warning is displayed once every 8 hours.
#> # A tibble: 5 × 2
#> CheckoutYear TotalCheckouts
#> <int> <dbl>
#> 1 2022 2431502
#> 2 2021 2266438
#> 3 2020 1241999
#> 4 2019 3931688
#> 5 2018 3987569
to_duckdb()
的妙处在于传输不涉及任何内存复制,这体现了 arrow 生态系统的目标:实现从一个计算环境到另一个计算环境的无缝过渡。
22.5.3 练习
- 找出每年最受欢迎的书。
- 哪位作者在西雅图图书馆系统中的图书最多?
- 在过去 10 年中,实体书与电子书的借阅情况是如何变化的?
22.6 总结
在本章中,你初步了解了 arrow 包,它提供了一个用于处理大型磁盘数据集的 dplyr 后端。它可以处理 CSV 文件,但如果你将数据转换为 parquet,速度会快得多。Parquet 是一种专为现代计算机上的数据分析而设计的二进制数据格式。与 CSV 相比,能处理 parquet 文件的工具要少得多,但其分区、压缩和列式结构使其分析效率高得多。
接下来,你将学习你的第一个非矩形数据源,你将使用 tidyr 包提供的工具来处理它。我们将专注于来自 JSON 文件的数据,但通用原则适用于任何树状数据,无论其来源如何。