In the previous chapter, you learned the basic concepts of object-oriented programming. These include class and methods, and how they are connected by generic functions in R through method dispatch. You learned about the basic usage of S3, S4, RC, and R6, including defining classes and generic functions as well as implementing methods for certain classes.
Now that we have covered most of the important features of R, it is time we go ahead and discuss more practical topics. In this chapter, we will begin the discussion with how R can be used to work with databases, which is perhaps the first step of many data-analysis projects: extracting data from a database. More specifically, we will cover the following topics:
In the previous chapters, we used a family of built-in functions such as read.csv
and read.table
to import data from separator-delimited files, such as those in the csv format. Using text formats to store data is handy and portable. When the data file is large, however, such a storage method may not be the best way.
There are three main reasons why text formats can no longer be easy to use. They are as follows:
read.csv()
are mostly used to load the whole file into memory, that is, a data frame in R. If the data is too large to fit into the computer memory, we simply cannot do it.Using a database is the best solution for these scenarios. It makes it much easier to store data that may exceed computer memory. Data in a database is queryable subject to user-supplied condition, which also makes it easier to update existing records and insert new records within a database.
A relational database is a collection of tables and relations between tables. A table in a relational database has the same representation with a data frame in R. Tables can have relations that make it easier to join the information of multiple tables.
In this section, we will start from the simplest database, SQLite (http://sqlite.org/), a portable, lightweight database engine.
To work with SQLite databases in R, we will use the RSQLite
package. To install it from CRAN, run the following code:
install.packages("RSQLite")
First, let's see how to create a SQLite database. If we want to create an example database at data/example.sqlite
, we need to ensure that the directory is available. If the directory does not exist, we have to create one:
if (!dir.exists("data")) dir.create("data")
Now, the data/
directory is available. Next, we will load the RSQLite
package and create a connection by supplying a database driver (SQLite()
) and database file (data/example.sqlite
). Although the file does not exist, the driver creates an empty file that is an empty SQLite database:
library(RSQLite) ## Loading required package: DBI con <- dbConnect(SQLite(), "data/example.sqlite")
The database connection, con
, is a layer between the user and the system. We can create a connection to a relational database and query, fetch, or update data through it. The connection will be used in all subsequent operations until we close the connection. In a typical relational database, we can create tables with a name and columns of certain names and data types, insert records as rows to a table, and update existing records. A table in a relational database looks very similar to a data frame in R.
Now, we will create a simple data frame that is to be inserted as a table to the database:
example1 <- data.frame( id = 1:5, type = c("A", "A", "B", "B", "C"), score = c(8, 9, 8, 10, 9), stringsAsFactors = FALSE) example1 ## id type score ## 1 1 A 8 ## 2 2 A 9 ## 3 3 B 8 ## 4 4 B 10 ## 5 5 C 9
The data frame is ready and we will call dbWriteTable()
to write this data frame as a table to the database:
dbWriteTable(con, "example1", example1) ## [1] TRUE
In the preceding code, we may well use other table names but still store the same data. Finally, we will disconnect the database using dbDisconnect()
so that con
is no longer available for data operations:
dbDisconnect(con) ## [1] TRUE
A SQLite database is a collection of tables. Therefore, we can store many tables in one database.
This time, we put the diamonds
dataset in ggplot2
and the flights
dataset in nycflights13
as two tables into one database. If you haven't installed these two packages, run the following code:
install.packages(c("ggplot2", "nycflights13"))
When the packages are available, we will call data()
to load the two data frames:
data("diamonds", package ="ggplot2") data("flights", package ="nycflights13")
We will repeat the same operation as we did earlier, but dbWriteTable()
ends up with errors:
con <- dbConnect(SQLite(), "data/datasets.sqlite") dbWriteTable(con, "diamonds", diamonds, row.names = FALSE) ## Error in (function (classes, fdef, mtable) : unable to find an inherited method for function 'dbWriteTable' for signature '"SQLiteConnection", "character", "tbl_df"' dbWriteTable(con, "flights", flights, row.names = FALSE) ## Error in (function (classes, fdef, mtable) : unable to find an inherited method for function 'dbWriteTable' for signature '"SQLiteConnection", "character", "tbl_df"' dbDisconnect(con) ## [1] TRUE
It can be useful to take a look at the class of these two variables:
class(diamonds) ## [1] "tbl_df" "tbl" "data.frame" class(flights) ## [1] "tbl_df" "tbl" "data.frame"
Note that diamonds
and flights
are not simply of class data.frame
but something more complex. To write them into the database, we need to convert them to plain data.frame
objects using as.data.frame()
:
con <- dbConnect(SQLite(), "data/datasets.sqlite") dbWriteTable(con, "diamonds", as.data.frame(diamonds), row.names = FALSE) ## [1] TRUE dbWriteTable(con, "flights", as.data.frame(flights), row.names = FALSE) ## [1] TRUE dbDisconnect(con) ## [1] TRUE
Now, the database contains two tables.
As mentioned in the beginning of this section, appending records to a table in the database is fairly easy. Here is a simple example where we produce several chunks of data and append them to a database table in turn:
con <- dbConnect(SQLite(), "data/example2.sqlite") chunk_size <- 10 id <- 0 for (i in 1:6) { chunk <- data.frame(id = ((i - 1L) * chunk_size):(i * chunk_size -1L), type = LETTERS[[i]], score =rbinom(chunk_size, 10, (10 - i) /10), stringsAsFactors =FALSE) dbWriteTable(con, "products", chunk, append = i > 1, row.names = FALSE) } dbDisconnect(con) ## [1] TRUE
Note that each chunk is a data frame with some determined data and some random numbers. Each time, we append these records to a table named products
. The difference between this example and the previous ones is that when we call dbWriteTable()
, we use append = FALSE
for the first chunk to create that table in the database and use append = TRUE
for each subsequent chunk to append to the existing table.
Once we have a SQLite database, we can access not only the data we store in the tables, but also some metadata, such as the names of all tables and the columns of a table.
To demonstrate, we will connect to the SQLite database we created previously:
con <- dbConnect(SQLite(), "data/datasets.sqlite")
We can use dbExistsTable()
to detect whether a table exists in the database:
dbExistsTable(con, "diamonds") ## [1] TRUE dbExistsTable(con, "mtcars") ## [1] FALSE
Since we only wrote diamonds
and flights
in datasets.sqlite
previously,
dbExistsTable()
returns the correct values. On the contrary to detecting table existence, we can use dbListTables()
to list all the existing tables in the database:
dbListTables(con) ## [1] "diamonds" "flights"
For a certain table, we can also list the names of all columns (or fields) with dbListFields()
:
dbListFields(con, "diamonds") ## [1] "carat" "cut" "color" "clarity" "depth" ## [6] "table" "price" "x" "y" "z"
Contrary to dbWriteTable()
, dbReadTable()
reads the whole table into a data frame:
db_diamonds <- dbReadTable(con, "diamonds") dbDisconnect(con) ## [1] TRUE
We can make a comparison between the data frame (db_diamonds
) we read from the database and the original version (diamonds
):
head(db_diamonds, 3) ## carat cut color clarity depth table price x y ## 1 0.23 Ideal E SI2 61.5 55 326 3.95 3.98 ## 2 0.21 Premium E SI1 59.8 61 326 3.89 3.84 ## 3 0.23 Good E VS1 56.9 65 327 4.05 4.07 ## z ## 1 2.43 ## 2 2.31 ## 3 2.31 head(diamonds, 3) ## carat cut color clarity depth table price x y ## 1 0.23 Ideal E SI2 61.5 55 326 3.95 3.98 ## 2 0.21 Premium E SI1 59.8 61 326 3.89 3.84 ## 3 0.23 Good E VS1 56.9 65 327 4.05 4.07 ## z ## 1 2.43 ## 2 2.31 ## 3 2.31
The data in both data frames looks exactly the same. However, if we use identical()
to compare them, they are not really identical:
identical(diamonds, db_diamonds) ## [1] FALSE
To spot the difference, we can call str()
to reveal the structure of both data frames. First, here is the structure of the data frame in the database:
str(db_diamonds) ## 'data.frame': 53940 obs. of 10 variables: ## $ carat : num 0.23 0.21 0.23 0.29 0.31 0.24 0.24... ## $ cut : chr "Ideal" "Premium" "Good" "Premium" ... ## $ color : chr "E" "E" "E" "I" ... ## $ clarity: chr "SI2" "SI1" "VS1" "VS2" ... ## $ depth : num 61.5 59.8 56.9 62.4 63.3 62.8 62.3... ## $ table : num 55 61 65 58 58 57 57 55 61 61 ... ## $ price : int 326 326 327 334 335 336 336 337 337 ... ## $ x : num 3.95 3.89 4.05 4.2 4.34 3.94 3.95... ## $ y : num 3.98 3.84 4.07 4.23 4.35 3.96 3.98... ## $ z : num 2.43 2.31 2.31 2.63 2.75 2.48 2.47...
Then, here is the structure of the original version:
str(diamonds) ## Classes 'tbl_df', 'tbl' and 'data.frame': 53940 obs. of 10 variables: ## $ carat : num 0.23 0.21 0.23 0.29 0.31 0.24 0.24... ## $ cut : Ord.factor w/ 5 levels "Fair"<"Good"<..: 5 4 2 4 2 3 3 3 1 3 ... ## $ color : Ord.factor w/ 7 levels "D"<"E"<"F"<"G"<..: 2 2 2 6 7 7 6 5 2 5 ... ## $ clarity: Ord.factor w/ 8 levels "I1"<"SI2"<"SI1"<..: 2 3 5 4 2 6 7 3 4 5 ... ## $ depth : num 61.5 59.8 56.9 62.4 63.3 62.8 62.3 61.9 65.1 59.4 ... ## $ table : num 55 61 65 58 58 57 57 55 61 61 ... ## $ price : int 326 326 327 334 335 336 336 337 337... ## $ x : num 3.95 3.89 4.05 4.2 4.34 3.94 3.95... ## $ y : num 3.98 3.84 4.07 4.23 4.35 3.96 3.98... ## $ z : num 2.43 2.31 2.31 2.63 2.75 2.48 2.47...
Now, the difference is obvious. In the original version, cut
, color
, and clarity
are ordered factor variables that are essentially integers with some metadata (ordered levels). By contrast, in the database version, these columns are stored as text instead. This change is simply because SQLite does not have built-in support of ordered factors. Therefore, except for common data types (numbers, texts, logical, and so on), R-specific types will be converted to types supported by SQLite before the data frame is inserted.
In the previous section, you learned how to write data into a SQLite database. In this section, you will learn how to query such a database so that we can get data from it according to our needs. We'll use data/datasets.sqlite
(we created previously) in the following examples.
First, we need to establish a connection to the database:
con <- dbConnect(SQLite(), "data/datasets.sqlite") dbListTables(con) ## [1] "diamonds" "flights"
There are two tables in the database. Then, we can select all data from diamonds
using the select
statement. Here, we want to select all columns (or fields). So, we will call dbGetQuery()
with the database connection, con
, and a query string:
db_diamonds <- dbGetQuery(con, "select * from diamonds") head(db_diamonds, 3) ## carat cut color clarity depth table price x y ## 1 0.23 Ideal E SI2 61.5 55 326 3.95 3.98 ## 2 0.21 Premium E SI1 59.8 61 326 3.89 3.84 ## 3 0.23 Good E VS1 56.9 65 327 4.05 4.07 ## z ## 1 2.43 ## 2 2.31 ## 3 2.31
Note that *
means all fields (or, equivalently, columns). If we only need a subset of fields, we can name the fields in turn:
db_diamonds <-dbGetQuery(con, "select carat, cut, color, clarity, depth, price from diamonds") head(db_diamonds, 3) ## carat cut color clarity depth price ## 1 0.23 Ideal E SI2 61.5 326 ## 2 0.21 Premium E SI1 59.8 326 ## 3 0.23 Good E VS1 56.9 327
If we want to select all distinct cases that appear in the data, we can use select distinct
. For example, the following code returns all distinct values of cut
in diamonds
:
dbGetQuery(con, "select distinct cut from diamonds") ## cut ## 1 Ideal ## 2 Premium ## 3 Good ## 4 Very Good ## 5 Fair
Note that dbGetQuery()
always returns data.frame
, even though sometimes there is only one column. To retrieve the values as an atomic vector, just extract the first column from the data frame:
dbGetQuery(con, "select distinct clarity from diamonds")[[1]] ## [1] "SI2" "SI1" "VS1" "VS2" "VVS2" "VVS1" "I1" "IF"
When we use select
to select columns to query, sometimes, the column name is not exactly what we want. In this case, we can use A as B
to get column B
with the same data as A
:
db_diamonds <- dbGetQuery(con, "select carat, price, clarity as clarity_level from diamonds") head(db_diamonds, 3) ## carat price clarity_level ## 1 0.23 326 SI2 ## 2 0.21 326 SI1 ## 3 0.23 327 VS1
In some other cases, the value we want is not present in the database, but needs some calculation to figure out. Now, we will use A as B
in which A
can be an arithmetic calculation between existing columns:
db_diamonds <- dbGetQuery(con, "select carat, price, x * y * z as size from diamonds") head(db_diamonds, 3) ## carat price size ## 1 0.23 326 38.20203 ## 2 0.21 326 34.50586 ## 3 0.23 327 38.07688
What if we create a new column with existing columns and create another column with the new column, just like the following example?
db_diamonds <- dbGetQuery(con, "select carat, price, x * y * z as size, price / size as value_density from diamonds") ## Error in sqliteSendQuery(con, statement, bind.data): error in statement: no such column: size
We simply can't do this. In A as B
, A
must be composed of existing columns. However, if we insist on doing so, we can use nested query, that is, we select
columns from a temporary table produced by a nested select
:
db_diamonds <- dbGetQuery(con, "select *, price / size as value_density from (select carat, price, x * y * z as size from diamonds)") head(db_diamonds, 3) ## carat price size value_density ## 1 0.23 326 38.20203 8.533578 ## 2 0.21 326 34.50586 9.447672 ## 3 0.23 327 38.07688 8.587887
In this case, size
is defined in the temporary table when price
/size
is being computed.
The next important component of a database query is a condition. We can use where
to specify the conditions that the results must satisfy. For example, we can select diamonds with Good
cut:
good_diamonds <- dbGetQuery(con, "select carat, cut, price from diamonds where cut = 'Good'") head(good_diamonds, 3) ## carat cut price ## 1 0.23 Good 327 ## 2 0.31 Good 335 ## 3 0.30 Good 339
Note that records with good cut are only a small proportion of all records:
nrow(good_diamonds) /nrow(diamonds) ## [1] 0.09095291
If we have multiple conditions that must be met simultaneously, we can use and
to combine these conditions. For example, we will select all records with Good
cut and color E
:
good_e_diamonds <- dbGetQuery(con, "select carat, cut, color, price from diamonds where cut = 'Good' and color = 'E'") head(good_e_diamonds, 3) ## carat cut color price ## 1 0.23 Good E 327 ## 2 0.23 Good E 402 ## 3 0.26 Good E 554 nrow(good_e_diamonds) /nrow(diamonds) ## [1] 0.017297
Similar logical operations also include or
and not
.
In addition to the simple logical operations, we can also use in
to filter records by examining whether the value of a field is contained in a given set. For example, we can select records with colors E
and F
:
color_ef_diamonds <- dbGetQuery(con, "select carat, cut, color, price from diamonds where color in ('E','F')") nrow(color_ef_diamonds) ## [1] 19339
We can verify the result by the following table:
table(diamonds$color) ## ## D E F G H I J ## 6775 9797 9542 11292 8304 5422 2808
To use in
, we need to specify a set. Similar to in
, we can also use between and
which that allow us to specify a range:
some_price_diamonds <- dbGetQuery(con, "select carat, cut, color, price from diamonds where price between 5000 and 5500") nrow(some_price_diamonds) /nrow(diamonds) ## [1] 0.03285132
In fact, the range does not have to be numeric. As long as the data type of the field is comparable, we can specify a range. For string column, we can write between 'string1' to 'string2'
to filter records by lexical ordering.
Another useful operator for string column is like
, which enables us to filter records with simple string patterns. For example, we can select all records with a cut
variable that ends with Good
. It can be either Good
or Very Good
. The notation is like '%Good'
where %
matches all strings:
good_cut_diamonds <- dbGetQuery(con, "select carat, cut, color, price from diamonds where cut like '%Good'") nrow(good_cut_diamonds) /nrow(diamonds) ## [1] 0.3149425
Another major functionality of database query is sorting data with specified columns. We can do this with order by
. For example, we can get the carat
and price
of all records but in an ascending order of price
:
cheapest_diamonds <- dbGetQuery(con, "select carat, price from diamonds order by price")
Therefore, we have a data frame of diamonds that is ordered from the cheapest to the most expensive ones:
head(cheapest_diamonds) ## carat price ## 1 0.23 326 ## 2 0.21 326 ## 3 0.23 327 ## 4 0.29 334 ## 5 0.31 335 ## 6 0.24 336
We can do the opposite by adding desc
to the sorting column so that we get a data frame that is ordered in the opposite way:
most_expensive_diamonds <- dbGetQuery(con, "select carat, price from diamonds order by price desc") head(most_expensive_diamonds) ## carat price ## 1 2.29 18823 ## 2 2.00 18818 ## 3 1.51 18806 ## 4 2.07 18804 ## 5 2.00 18803 ## 6 2.29 18797
We can also sort the records with more than one column. For example, the following results are sorted by price in the ascending order first. If two records have equal price, the one with greater carat will be put ahead:
cheapest_diamonds <- dbGetQuery(con, "select carat, price from diamonds order by price, carat desc") head(cheapest_diamonds) ## carat price ## 1 0.23 326 ## 2 0.21 326 ## 3 0.23 327 ## 4 0.29 334 ## 5 0.31 335 ## 6 0.24 336
Like select
, the column to sort can be computed from existing columns:
dense_diamonds <- dbGetQuery(con, "select carat, price, x * y * z as size from diamonds order by carat / size desc") head(dense_diamonds) ## carat price size ## 1 1.07 5909 47.24628 ## 2 1.41 9752 74.41726 ## 3 1.53 8971 85.25925 ## 4 1.51 7188 133.10400 ## 5 1.22 3156 108.24890 ## 6 1.12 6115 100.97448
We can also query the sorted subset of all records using where
and order by
at the same time:
head(dbGetQuery(con, "select carat, price from diamonds where cut = 'Ideal' and clarity = 'IF' and color = 'J' order by price")) ## carat price ## 1 0.30 489 ## 2 0.30 489 ## 3 0.32 521 ## 4 0.32 533 ## 5 0.32 533 ## 6 0.35 569
If we only care about the first several results, we can use limit
to constrain the number of records to retrieve:
dbGetQuery(con, "select carat, price from diamonds order by carat desc limit 3") ## carat price ## 1 5.01 18018 ## 2 4.50 18531 ## 3 4.13 17329
In addition to column selection, conditional filtering, and sorting, we can also aggregate the records in database in groups. For example, we can count the number of records for each color:
dbGetQuery(con, "select color, count(*) as number from diamonds group by color") ## color number ## 1 D 6775 ## 2 E 9797 ## 3 F 9542 ## 4 G 11292 ## 5 H 8304 ## 6 I 5422 ## 7 J 2808
The results can be verified by calling table()
with the original data:
table(diamonds$color) ## ## D E F G H I J ## 6775 9797 9542 11292 8304 5422 2808
In addition to counting, we also have aggregating functions such as avg()
, max()
, min()
, and sum()
. For example, we can summarize the data by looking at the average price for each level of clarity:
dbGetQuery(con, "select clarity, avg(price) as avg_price from diamonds group by clarity order by avg_price desc") ## clarity avg_price ## 1 SI2 5063.029 ## 2 SI1 3996.001 ## 3 VS2 3924.989 ## 4 I1 3924.169 ## 5 VS1 3839.455 ## 6 VVS2 3283.737 ## 7 IF 2864.839 ## 8 VVS1 2523.115
We can also examine the maximal carat at the five lowest prices:
dbGetQuery(con, "select price, max(carat) as max_carat from diamonds group by price order by price limit 5") ## price max_carat ## 1 326 0.23 ## 2 327 0.23 ## 3 334 0.29 ## 4 335 0.31 ## 5 336 0.24
We can also perform multiple calculations in a group. The following code calculates the range of prices and their average value for each clarity level:
dbGetQuery(con, "select clarity, min(price) as min_price, max(price) as max_price, avg(price) as avg_price from diamonds group by clarity order by avg_price desc") ## clarity min_price max_price avg_price ## 1 SI2 326 18804 5063.029 ## 2 SI1 326 18818 3996.001 ## 3 VS2 334 18823 3924.989 ## 4 I1 345 18531 3924.169 ## 5 VS1 327 18795 3839.455 ## 6 VVS2 336 18768 3283.737 ## 7 IF 369 18806 2864.839 ## 8 VVS1 336 18777 2523.115
The following example calculates an average price for each clarity level weighted by carat, that is, a price with greater carat has more weight:
dbGetQuery(con, "select clarity, sum(price * carat) / sum(carat) as wprice from diamonds group by clarity order by wprice desc") ## clarity wprice ## 1 SI2 7012.257 ## 2 VS2 6173.858 ## 3 VS1 6059.505 ## 4 SI1 5919.187 ## 5 VVS2 5470.156 ## 6 I1 5233.937 ## 7 IF 5124.584 ## 8 VVS1 4389.112
Just like sorting with more than one column, we can also group the data by multiple columns. The following code computes the average price for each clarity and color pair, and shows the top five pairs with the highest average prices:
dbGetQuery(con, "select clarity, color, avg(price) as avg_price from diamonds group by clarity, color order by avg_price desc limit 5") ## clarity color avg_price ## 1 IF D 8307.370 ## 2 SI2 I 7002.649 ## 3 SI2 J 6520.958 ## 4 SI2 H 6099.895 ## 5 VS2 I 5690.506
The most relational operation in a relational database should be table join, that is, joining a number of tables together by some columns. For example, we will create a data frame of cut
, color
, and clarity
to select records with exactly the same field values of the three cases in diamond_selector
:
diamond_selector <- data.frame( cut = c("Ideal", "Good", "Fair"), color = c("E", "I", "D"), clarity = c("VS1", "I1", "IF"), stringsAsFactors = FALSE ) diamond_selector ## cut color clarity ## 1 Ideal E VS1 ## 2 Good I I1 ## 3 Fair D IF
After creating the data frame, we write it to the database so that we can join diamonds
and diamond_selector
to filter the desirable records:
dbWriteTable(con, "diamond_selector", diamond_selector, row.names = FALSE, overwrite = TRUE) ## [1] TRUE
We can specify the columns to match in the join-clause:
subset_diamonds <- dbGetQuery(con, "select cut, color, clarity, carat, price from diamonds join diamond_selector using (cut, color, clarity)") head(subset_diamonds) ## cut color clarity carat price ## 1 Ideal E VS1 0.60 2774 ## 2 Ideal E VS1 0.26 556 ## 3 Ideal E VS1 0.70 2818 ## 4 Ideal E VS1 0.70 2837 ## 5 Good I I1 1.01 2844 ## 6 Ideal E VS1 0.26 556
In total, we have only a tiny portion of all records that satisfy one of the three cases:
nrow(subset_diamonds) /nrow(diamonds) ## [1] 0.01121617
Finally, don't forget to disconnect the database to ensure that all resources are properly released:
dbDisconnect(con) ## [1] TRUE
In the previous examples, we only showed the basic use of SQL to query a relational database such as SQLite. In fact, SQL is richer and much more powerful than we have demonstrated. For more details, visit http://www.w3schools.com/sql and learn more.
In the beginning of the section, we mentioned that one of the advantages of using a relational database is that we can store a large amount of data. Usually, we only take out a subset of the database and do some research. However, sometimes, we need to go through an amount of data that exceeds the capacity of computer memory. Obviously, we cannot load all of the data into memory, but must process the data chunk by chunk.
Most reasonable relational databases support fetching a query result set chunk by chunk. In the following example, we will use dbSendQuery()
instead of dbGetQuery()
to get a result set. Then, we will repeat fetching chunks (a number of rows) from the result set until all results are fetched. In this way, we can process the data chunk by chunk without using a large amount of working memory:
con <- dbConnect(SQLite(), "data/datasets.sqlite") res <- dbSendQuery(con, "select carat, cut, color, price from diamonds where cut = 'Ideal' and color = 'E'") while (!dbHasCompleted(res)) { chunk <- dbFetch(res, 800) cat(nrow(chunk), "records fetched ") # do something with chunk } ## 800 records fetched ## 800 records fetched ## 800 records fetched ## 800 records fetched ## 703 records fetched dbClearResult(res) ## [1] TRUE dbDisconnect(con) ## [1] TRUE
In practice, the database may have billions of records. The query may result in tens of millions of records. If you use dbGetQuery()
to fetch the whole result set at once, your memory may not be sufficient. If the task can be finished by processing data chunks, it can be much cheaper to work chunk by chunk.
Popular relational databases have a strong ability to ensure consistency. When we insert or update data, we do it via transactions. If a transaction fails, we can undo the transaction and rollback the database to ensure that everything is consistent.
The following example is a simple simulation of the data accumulation process that may fail in the middle of the process. Suppose we need to accumulate the data of some products and store it in data/products.sqlite
. Each time a chunk of data is produced, we need to append it to a table in the database. In each iteration, however, the process may fail with a probability of 20 percent:
set.seed(123) con <- dbConnect(SQLite(), "data/products.sqlite") chunk_size <- 10 for (i in 1:6) { cat("Processing chunk", i, " ") if (runif(1) <= 0.2) stop("Data error") chunk <- data.frame(id = ((i - 1L) * chunk_size):(i * chunk_size - 1L), type = LETTERS[[i]], score = rbinom(chunk_size, 10, (10 - i) /10), stringsAsFactors = FALSE) dbWriteTable(con, "products", chunk, append = i > 1, row.names = FALSE) } ## Processing chunk 1 ## Processing chunk 2 ## Processing chunk 3 ## Processing chunk 4 ## Processing chunk 5 ## Error in eval(expr, envir, enclos): Data error
The accumulation fails when processing chunk 5. Then, we will count the records in the table:
dbGetQuery(con, "select COUNT(*) from products") ## COUNT(*) ## 1 40 dbDisconnect(con) ## [1] TRUE
We can find that the table has stored a number of records. In some cases, we want either all records to be properly stored or we want nothing to be put into the database. In both cases, the database is consistent. However, if only half of the data is stored, some other problems may occur. To ensure that a series of database changes succeed or fail as a whole, we can call dbBegin()
before we write any data, call dbCommit()
after all changes are made, and call dbRollback()
if anything goes wrong.
The following code is an enhanced version of the previous example. We use transactions to make sure either all chunks are written to the database or none. More specifically, we put the data-writing process in tryCatch
. Before the writing begins, we begin a transaction by calling dbBegin()
. Then, in tryCatch
, we will write data chunk by chunk to the database. If everything goes well, we will call dbCommit()
to commit the transaction so that all the changes are committed. If anything goes wrong, the error will be captured by the error function in which we produce a warning and rollback by dbRollback()
:
set.seed(123) file.remove("data/products.sqlite") ## [1] TRUE con <- dbConnect(SQLite(), "data/products.sqlite") chunk_size <- 10 dbBegin(con) ## [1] TRUE res <- tryCatch({ for (i in 1:6) { cat("Processing chunk", i, " ") if (runif(1) <= 0.2) stop("Data error") chunk <- data.frame(id = ((i - 1L) * chunk_size):(i * chunk_size - 1L), type = LETTERS[[i]], score = rbinom(chunk_size, 10, (10 - i) /10), stringsAsFactors = FALSE) dbWriteTable(con, "products", chunk, append = i > 1, row.names = FALSE) } dbCommit(con) }, error = function(e) { warning("An error occurs: ", e, " Rolling back", immediate. = TRUE) dbRollback(con) }) ## Processing chunk 1 ## Processing chunk 2 ## Processing chunk 3 ## Processing chunk 4 ## Processing chunk 5 ## Warning in value[[3L]](cond): An error occurs: Error in doTryCatch(return(expr), name, parentenv, handler): Data error ## ## Rolling back
We can see that the same error happens again. However, this time, the error is captured, the transaction cancelled, and the database rolled back. To verify, we can again count the number of records in the products
table:
dbGetQuery(con, "select COUNT(*) from products") ## Error in sqliteSendQuery(con, statement, bind.data): error in statement: no such table: products dbDisconnect(con) ## [1] TRUE
It may be surprising that the counting query results in an error. Why does it not return 0? If we take a closer look at the example, we should understand that the first time we call dbWriteTable()
, it creates a new table first and then inserts the data in the first chunk. In other words, the table creation is included in the transaction. So, when we roll back, the table creation is undone too. As a result, the preceding counting query produces an error because products
does not exist at all. If the table exists before we begin a transaction, the count should be equal to the number of records before the transaction as if nothing happened.
Another example that requires strong consistency is account transfer. When we transfer an amount of money from one account to another, we need to ensure that the system withdraws the money from one account and deposits the same amount to the other account. The two changes must both happen or both fail to keep consistency. This can be easily done with transactions of relational databases.
Suppose we define a function that creates a SQLite database of a virtual bank. We will use dbSendQuery()
to send commands to create a table of accounts and a table of transactions:
create_bank <- function(dbfile) { if (file.exists(dbfile)) file.remove(dbfile) con <- dbConnect(SQLite(), dbfile) dbSendQuery(con, "create table accounts (name text primary key, balance real)") dbSendQuery(con, "create table transactions (time text, account_from text, account_to text, value real)") con }
The accounts table has two columns: name
and balance
. The transactions table has four columns: time
, account_from
, account_to
, and value
. The first table stores all the information of accounts, and the second one stores all historic transactions.
We will also define a function to create an account with a name and initial balance. The function uses insert into
to write a new record to the accounts table:
create_account <- function(con, name, balance) { dbSendQuery(con, sprintf("insert into accounts (name, balance) values ('%s', %.2f)", name, balance)) TRUE }
Note that we uses sprintf
to produce the preceding SQL statement. It is acceptable for local and personal use, but it is generally not safe for web applications, because a hacker can easily write a partial expression to run any disastrous statements to manipulate the whole database.
Next, we will define a transfer function. The function checks whether the withdrawing account and receiving account both exist in the database. It ensures that the balance of the withdrawing account is sufficient for such an amount of transfer. If the transfer is valid, then it updates the balance of both accounts and adds a transaction record to the database:
transfer <- function(con, from, to, value) { get_account <- function(name) { account <- dbGetQuery(con, sprintf("select * from accounts where name = '%s'", name)) if (nrow(account) == 0) stop(sprintf("Account '%s' does not exist", name)) account } account_from <- get_account(from) account_to <- get_account(to) if (account_from$balance < value) { stop(sprintf("Insufficient money to transfer from '%s'", from)) } else { dbSendQuery(con, sprintf("update accounts set balance = %.2f where name = '%s'", account_from$balance - value, from)) dbSendQuery(con, sprintf("update accounts set balance = %.2f where name = '%s'", account_to$balance + value, to)) dbSendQuery(con, sprintf("insert into transactions (time, account_from, account_to, value) values ('%s', '%s', '%s', %.2f)", format(Sys.time(), "%Y-%m-%d %H:%M:%S"), from, to, value)) } TRUE }
Although we have some basic checking against possible insufficient funds of the withdrawing account, we still cannot ensure that the transfer is safe, because it can be interrupted by other causes. Therefore, we will implement a safe version of transfer
in which we will use transaction to ensure that any changes made by transfer
can be undone if anything goes wrong:
safe_transfer <- function(con, ...) { dbBegin(con) tryCatch({ transfer(con, ...) dbCommit(con) }, error = function(e) { message("An error occurs in the transaction. Rollback...") dbRollback(con) stop(e) }) }
In fact, safe_transfer
is a wrapper function of transfer
. It just puts transfer
in a sandbox of tryCatch
. If an error occurs, we call dbRollback()
to ensure that the database is consistent.
Before putting the functions into tests, we need functions to view the balance of a given account as well as all successful transactions that happened between accounts:
get_balance <- function(con, name) { res <- dbGetQuery(con, sprintf("select balance from accounts where name = '%s'", name)) res$balance } get_transactions <- function(con, from, to) { dbGetQuery(con, sprintf("select * from transactions where account_from = '%s' and account_to = '%s'", from, to)) }
Now, we can do some tests. First, we will create a virtual bank using create_bank()
that returns a SQLite connection to the database file. Then, we will create two accounts with some initial balance:
con <- create_bank("data/bank.sqlite") create_account(con, "David", 5000) ## [1] TRUE create_account(con, "Jenny", 6500) ## [1] TRUE get_balance(con, "David") ## [1] 5000 get_balance(con, "Jenny") ## [1] 6500
Then, we will use safe_transfer()
to transfer some money from David's account to Jenny's account:
safe_transfer(con, "David", "Jenny", 1500) ## [1] TRUE get_balance(con, "David") ## [1] 3500 get_balance(con, "Jenny") ## [1] 8000
The transfer succeeds, and the balances of both accounts are changed in a consistent manner. Now, we will make another transfer. This time, the balance of David's account is not sufficient, so the transfer will end up with an error:
safe_transfer(con, "David", "Jenny", 6500) ## An error occurs in the transaction. Rollback... ## Error in transfer(con, ...): Insufficient money to transfer from 'David' get_balance(con, "David") ## [1] 3500 get_balance(con, "Jenny") ## [1] 8000
The error is captured, and the function rolls back the database. The balances of both accounts do not change. Now, we will query all successful transactions:
get_transactions(con, "David", "Jenny") ## time account_from account_to value ## 1 2016-06-08 23:24:39 David Jenny 1500
We can see the first transaction, but the failed transaction does not appear in the database. Finally, we should always remember to close the database connection:
dbDisconnect(con) ## [1] TRUE
When we deal with large data files, we may usually get stuck with issues of reading and writing data. There are two extremes in practice. One extreme is a really big text-format data source that is almost impossible to load into memory. The other is a large number of small pieces of data files that will require some effort to integrate them into one data frame.
For the first case, we can read the big source data chunk by chunk and append each chunk to a certain table in a database. The following function is designed for appending rows to a database table from a big source given an input file, an output database, a table name, and a chunk size. Consider that the input data may be too large to load into the memory, so the function will read one chunk each time to write to database and, thus, only require a small working memory:
chunk_rw <- function(input, output, table, chunk_size = 10000) { first_row <- read.csv(input, nrows = 1, header = TRUE) header <- colnames(first_row) n <- 0 con <- dbConnect(SQLite(), output) on.exit(dbDisconnect(con)) while (TRUE) { df <- read.csv(input, skip = 1 + n * chunk_size, nrows = chunk_size, header = FALSE, col.names = header, stringsAsFactors = FALSE) if (nrow(df) == 0) break; dbWriteTable(con, table, df, row.names = FALSE, append = n > 0) n <- n + 1 cat(sprintf("%d records written ", nrow(df))) } }
The trick here is to correctly calculate the offset of each chunk in the input file.
To test the function, we will first write diamonds
into a csv file and use chunk_rw()
to write the csv file into a SQLite database chunk by chunk. With this method, the writing process only requires a much smaller working memory than is required for loading the whole data into memory:
write.csv(diamonds, "data/diamonds.csv", quote = FALSE, row.names = FALSE) chunk_rw("data/diamonds.csv", "data/diamonds.sqlite", "diamonds") ## 10000 records written ## 10000 records written ## 10000 records written ## 10000 records written ## 10000 records written ## 3940 records written
Another extreme of loading data is that we need to read from many small data files. In this case, we can put all the data distributed in these files in a database so that we can easily query data from it. The following function is intended for putting the data of all csv files in a folder to one database:
batch_rw <- function(dir, output, table, overwrite = TRUE) { files <- list.files(dir, "\.csv$", full.names = TRUE) con <- dbConnect(SQLite(), output) on.exit(dbDisconnect(con)) exist <- dbExistsTable(con, table) if (exist) { if (overwrite) dbRemoveTable(con, table) else stop(sprintf("Table '%s' already exists", table)) } exist <- FALSE for (file in files) { cat(file, "... ") df <- read.csv(file, header = TRUE, stringsAsFactors = FALSE) dbWriteTable(con, table, df, row.names = FALSE, append = exist) exist <- TRUE cat("done ") } }
To demonstrate, we have a number of small csv files in data/groups
, and we use batch_rw()
to put all the data into a database:
batch_rw("data/groups", "data/groups.sqlite", "groups") ## data/groups/group1.csv ... done ## data/groups/group2.csv ... done ## data/groups/group3.csv ... done
Now, all the data in the files is put into the database. We can query or read the whole table and see what is looks like:
con <- dbConnect(SQLite(), "data/groups.sqlite") dbReadTable(con, "groups") ## group id grade ## 1 1 I-1 A ## 2 1 I-2 B ## 3 1 I-3 A ## 4 2 II-1 C ## 5 2 II-2 C ## 6 3 III-1 B ## 7 3 III-2 B ## 8 3 III-3 A ## 9 3 III-4 C dbDisconnect(con) ## [1] TRUE
In this section, you learned some basic knowledge and usage of SQLite database. However, many popular relational databases share many common features of functionality and the query language. With almost the same knowledge, you can work with MySQL via RMySQL, PostreSQL via RPostges, Microsoft SQL Server via RSQLServer, and ODBC-compatible databases (Microsoft Access and Excel) via RODBC. They share almost the same operating functions, so if you are familiar with one, you shouldn't have a problem working with others.