Chapter 4. Loading Data into BigQuery
In the previous chapter, we wrote the following query:
SELECT state_name FROM `bigquery-public-data`.utility_us.us_states_area WHERE ST_Contains( state_geom, ST_GeogPoint(-122.33, 47.61))
We also learned that the city at the location (-122.33, 47.61)
is in the state of Washington. Where did the data for the state_name
and state_geom
come from?
Note the FROM
clause in the query. The owners of the bigquery-public-data
project had already loaded the state boundary information into a table called us_states_area
in a dataset called utility_us
. Because the team shared the utility_us
dataset with all authenticated users of BigQuery (more restrictive permissions are available), we were able to query the us_states_area
table that is in that dataset.
But how did they get the data into BigQuery in the first place? In this chapter, we look at various ways to load data into BigQuery, starting with the basics.
The Basics
Data values such as the boundaries of US states change rarely,1 and the changes are small enough that most applications can afford to ignore them. In data warehousing lingo, we call this a slowly changing dimension. As of this writing, the last change of US state boundaries occurred on January 1, 2017, and affected 19 home owners and one gas station.2
State boundary data is, therefore, the type of data that is often loaded just once. Analysts query the single table and ignore the fact that the data could change over time. For example, a retail firm might care only about which state a home is in currently to ensure that the correct tax rate is applied to purchases from that home. So when a change does happen, such as through a treaty between states or due to a change in the path of a river channel, the owners of the dataset might decide to replace the table with more up-to-date data. The fact that queries could potentially return slightly different results after an update compared to what was returned before the update is ignored.
Ignoring the impact of time on the correctness of the data might not always be possible. If the state boundary data is to be used by a land title firm that needs to track ownership of land parcels, or if an audit firm needs to validate the state tax paid on shipments made in different years, it is important that there be a way to query the state boundaries as they existed in years past. So even though the first part of this chapter covers how to do a one-time load, carefully consider whether you would be better off planning on periodically updating the data and allowing users of the data to know about the version of the data that they are querying.
Loading from a Local Source
The US government issues a “scorecard” for colleges to help consumers compare the cost and perceived value of higher education. Let’s load this data into BigQuery as an illustration. The raw data is available on catalog.data.gov. For convenience, we also have it available as 04_load/college_scorecard.csv.gz in the GitHub repository for this book. The comma-separated values (CSV) file was downloaded from data.gov and compressed using the open source software utility gzip.
Tip
Why did we compress the file? The raw, uncompressed file is about 136 MB, whereas the gzipped file is only 18 MB. Because we are about to send the file over the wire to BigQuery, it makes sense to optimize the bandwidth being transferred. The BigQuery load command can handle gzipped files, but it cannot load parts of a gzipped file in parallel. Loading would be much faster if we were to hand BigQuery a splittable file, either an uncompressed CSV file that is already on Cloud Storage (so that the network transfer overhead is minimized) or data in a format such as Avro for which each block is internally compressed but the file as a whole can be split across workers.
A splittable file can be loaded by different workers starting at different parts of the file, but this requires that the workers be able to “seek” to a predictable point in the middle of the file without having to read it from the beginning. Compressing the entire file using gzip doesn’t allow this, but a block-by-block compression such as Avro does. Therefore, using a compressed, splittable format such as Avro is an unmitigated good. However, if you have CSV or JSON files that are splittable only when uncompressed, you should measure whether the faster network transfer is counterbalanced by the increased load time.
From Cloud Shell, you can page through the gzipped file using zless
:
zless college_scorecard.csv.gz
Note
Here are detailed steps:
-
Open Cloud Shell in your browser by visiting https://console.cloud.google.com/cloudshell.
-
In the terminal window, type:
git clone https://github.com/GoogleCloudPlatform/bigquery-oreilly-book
. -
Navigate to the folder containing the college scorecard file:
cd bigquery-oreilly-book/04_load
. -
Type the command
zless college_scorecard.csv.gz
, and then use the space bar to page through the data. Type the letterq
to quit.
The file contains a header line with the names of the columns. Each of the lines following the header contains one row of data.
To load the data into BigQuery, first create a dataset called ch04
to hold the data:
bq --location=US mk ch04
The bq
command-line tool provides a convenient point of entry to interact with the BigQuery service on Google Cloud Platform (GCP), although everything you do with bq
you also can do using the REST API. And you can accomplish most things using the GCP Cloud Console. We are asking it here to make (mk
) a dataset named ch04
.
Datasets in BigQuery function like top-level folders that are used to organize and control access to tables, views, and machine learning models. The dataset is created in the current project,3 and it is to this project that storage costs for tables in this dataset will be billed (queries are charged to the project of the querier).
We also specify that the dataset should be created in the US location (this is the default, so we could have omitted that). Location choices include multiregional locations (such as US
, EU
) and specific regions (e.g., us-east4
, europe-west2
and australia-southeast1
).4 Be careful when choosing a region for loading data: as of this writing, queries cannot join tables held in different regions. In this book, we will use the US multiregion location so that our queries can join against tables in the public datasets that are located in the United States.
Then, from the directory containing your clone of the GitHub repository, load the data in the file as a table in BigQuery:
bq --location=US \ load \ --source_format=CSV --autodetect \ ch04.college_scorecard \ ./college_scorecard.csv.gz
In this case, we are asking bq
to load the dataset, informing the tool that the source format is CSV and that we would like the tool to autodetect the schema (i.e., the data types of individual columns). We then specify that the table to be created is called college_scorecard
in the dataset ch04
and that the data is to be loaded from college_scorecard.csv.gz
in the current directory.
When we did this, though, we ran into an issue:
Could not parse 'NULL' as int for field HBCU (position 26) starting at location 11945910
This caused the load job to fail with the following error:5
CSV table encountered too many errors, giving up. Rows: 591; errors: 1.
The problem is that, based on most of the data in the CSV file, BigQuery’s schema autodetection expects that the 26th column (whose name is HBCU
) should be an integer, but the 591st row of the file has the text NULL
in that field—this usually signifies that the college in question did not answer the survey question corresponding to this field.6
There are several ways in which we can fix this problem. For example, we could edit the data file itself if we knew what the value ought to be. Another fix could be to specify explicitly the schema for each column and change the column type of the HBCU
column to be a string so that NULL
is an acceptable value. Alternatively, we could ask BigQuery to ignore a few bad records by specifying, for example, --max_bad_records=20
. Finally, we could instruct the BigQuery load program that this particular file uses the string NULL
to mark nulls (the standard way in CSV is to use empty fields to represent nulls).
Let’s apply the last method, because it seems to be the most appropriate:7
bq --location=US \ load --null_marker=NULL \ --source_format=CSV --autodetect \ ch04.college_scorecard \ ./college_scorecard.csv.gz
You can find the full list of bq load
options by typing bq load --help
. By default, bq load
will append to a table. Here, you want to replace the existing table, so you should add --replace
:
bq --location=US \ load --null_marker=NULL --replace \ --source_format=CSV --autodetect \ ch04.college_scorecard \ ./college_scorecard.csv.gz
You can also specify --replace=false
to append rows to an existing table.
It is worth noting that you can do one-time loads from the BigQuery web user interface (UI). Click your project, and you will be presented with a button to create a dataset (ch04
, in our case); click the dataset, and you will be presented with a button to create a table. You can then follow the prompts to upload the file as a BigQuery table. As of this writing, however, use of the web UI to load data from a local file is limited to data whose size is less than 10 MB and 16,000 rows. Hence, it would not work for the college scorecard dataset unless we had staged it in Google Cloud Storage first.
Even if you did not (or cannot) use the web UI to load the data, it is a good idea to look at the created table using the web UI to ensure that details about the table as well as the autodetected schema are correct. It is also possible to edit some details about the table even after it has been created. For example, it is possible to specify that the table should automatically expire after a certain number of days, add columns, or relax a required field to become nullable.
Note
You can also set an expiration date using the ALTER TABLE SET OPTIONS
statement—for example:
ALTER TABLE ch04.college_scorecard SET OPTIONS ( expiration_timestamp= TIMESTAMP_ADD(CURRENT_TIMESTAMP(), INTERVAL 7 DAY), description="College Scorecard table that expires seven days from now" )
For more details, see https://cloud.google.com/bigquery/docs/reference/standard-sql/data-definition-language#alter_table_set_options_statement.
Regardless of how the table is loaded, anyone who is allowed to access the dataset in which the table is located can query it. The default is to make a newly created dataset visible only to people with project-level view permissions. You can, however, share the dataset9 with specific individuals (identified by their Google account), a domain (e.g., xyz.com), or a Google group. We discuss using Identity and Access Management (IAM) to share datasets in Chapter 10. For now, though, anyone with view access to the project holding the dataset can query it:
SELECT INSTNM , ADM_RATE_ALL , FIRST_GEN , MD_FAMINC , MD_EARN_WNE_P10 , SAT_AVG FROM ch04.college_scorecard WHERE SAFE_CAST(SAT_AVG AS FLOAT64) > 1300 AND SAFE_CAST(ADM_RATE_ALL AS FLOAT64) < 0.2 AND SAFE_CAST(FIRST_GEN AS FLOAT64) > 0.1 ORDER BY CAST(MD_FAMINC AS FLOAT64) ASC
This query pulls out institution name (INSTNM
), admission rate, and other information for colleges whose average SAT score is more than 1300 and whose admission rate is less than 20%, which is a plausible definition of “elite” colleges. It also filters by colleges that admit first-generation college goers at a rate greater than 10% and ranks them in ascending order of median family income, thus finding elite colleges that admit culturally or economically disadvantaged students. The query also pulls the median earnings of students 10 years after entry:
Row | INSTNM | ADM_RATE_ALL | FIRST_GEN | MD_FAMINC |
MD_EARN _WNE_P10 |
SAT_AVG |
---|---|---|---|---|---|---|
1 | University of California–Berkeley | 0.1692687830816 | 0.3458005249 | 31227 | 64700 | 1422 |
2 | Columbia University in the City of New York | 0.06825366802669 | 0.2504905167 | 31310.5 | 83300 | 1496 |
3 | University of California–Los Angeles | 0.17992627069775 | 0.3808913934 | 32613.5 | 60700 | 1334 |
4 | Harvard University | 0.05404574677902 | 0.25708061 | 33066 | 89700 | 1506 |
5 | Princeton University | 0.06521516568269 | 0.2773972603 | 37036 | 74700 | 1493 |
Look, however, at the query itself. Notice how several of the WHERE
clauses need a cast:
SAFE_CAST(ADM_RATE_ALL AS FLOAT64)
Had we not included the cast, we would have received an error:
No matching signature for operator > for argument types: STRING, INT64.
Had we simply cast as a float, it would have failed on a row where the value was a string (PrivacySuppressed
) that cannot be cast as a float:
Bad double value: PrivacySuppressed; while executing the filter ...
This is because the automatic schema detection did not identify the admission rate column as numeric. Instead, that column is being treated as a string because, in some of the rows, the value is suppressed for privacy reasons (e.g., if the number of applications is very small) and replaced by the text PrivacySuppressed
. Indeed, even the median family income is a string (it happens to always be numeric for colleges that meet the criteria we outlined), and so we need to cast it before ordering.10
Specifying a Schema
Inevitably in real-world datasets, we will need to do some cleanup and transformations before loading the data into BigQuery. Although later in this chapter we look at building more sophisticated data processing pipelines to do this, a simple way is to use Unix tools to replace privacy-suppressed data with NULL
s:
zless ./college_scorecard.csv.gz | \ sed 's/PrivacySuppressed/NULL/g' | \ gzip > /tmp/college_scorecard.csv.gz
Here, we are using a string editor (sed
) to replace all occurrences of PrivacySuppressed
by NULL
, compressing the result and writing it to a temporary folder. Now, instead of loading the original file, we can load the cleaner file.
When presented with the cleaner file, BigQuery correctly identifies many more of the columns as integers or floats, but not SAT_AVG
or ADM_RATE_ALL
; those columns are still autodetected as strings. This is because the algorithm to autodetect the schema does not look at all the rows in the file; it looks at only a sample of them. Because a large number of rows have a null SAT_AVG
(fewer than 20% of colleges report SAT scores), the algorithm was unable to infer the type of the field. The safe choice is to treat any column that the tool is not sure of as a string.
It is therefore best practice to not autodetect the schema of files that you receive in production—you will be at the mercy of whatever data happens to have been sampled. For production workloads, insist on the data type for a column by specifying it at the time of load.
You can use the autodetect feature to avoid starting to write a schema from scratch. You can display the schema of the table as it currently exists:
bq show --format prettyjson --schema ch04.college_scorecard
You can also save the schema to a file:
bq show --format prettyjson --schema ch04.college_scorecard > schema.json
Now, you can open the schema file in your favorite text editor (if you don’t have a preference, use the pen icon in Cloud Shell to open up the default editor) and change the type of the columns you care about. Specifically, change the four columns in the WHERE
clause (SAT_AVG
, ADM_RATE_ALL
, FIRST_GEN
, and MD_FAMINC
) to be FLOAT64
:
{ "mode": "NULLABLE", "name": "FIRST_GEN", "type": "FLOAT64" },
In addition, also change (for now) the T4APPROVALDATE
to be a string, because it is in a nonstandard date format:11
{ "mode": "NULLABLE", "name": "T4APPROVALDATE", "type": "STRING" },
With the schema updated, we can load the data with this schema rather than with the autodetect:
bq --location=US \ load --null_marker=NULL --replace \ --source_format=CSV \ --schema=schema.json --skip_leading_rows=1 \ ch04.college_scorecard \ ./college_scorecard.csv.gz
Because we are supplying a schema, we need to instruct BigQuery to ignore the first row of the CSV file (which contains the header information).
After the table has been loaded, we can repeat the query of the previous section:
SELECT INSTNM , ADM_RATE_ALL , FIRST_GEN , MD_FAMINC , MD_EARN_WNE_P10 , SAT_AVG FROM ch04.college_scorecard WHERE SAT_AVG > 1300 AND ADM_RATE_ALL < 0.2 AND FIRST_GEN > 0.1 ORDER BY MD_FAMINC ASC
Notice that, because SAT_AVG
, ADM_RATE_ALL
, and the others are no longer strings, our query is much cleaner because we no longer need to cast them to floating-point numbers. The reason they are no longer strings is that we made a decision on how to deal with the privacy-suppressed data (treat them as being unavailable) during the Extract, Transform, and Load (ETL) process.
Copying into a New Table
The table as loaded contains many columns that we do not need. It is possible to create a cleaner, more purposeful table from the original table by using the CREATE TABLE
statement and populating the new table with only the columns of interest:
CREATE OR REPLACE TABLE ch04.college_scorecard_etl AS SELECT INSTNM , ADM_RATE_ALL , FIRST_GEN , MD_FAMINC , SAT_AVG , MD_EARN_WNE_P10 FROM ch04.college_scorecard
By using a robust ETL pipeline and making decisions early, downstream queries are cleaner and more concise. The trade-off is that the ETL process involves extra work (determining the data types and specifying the schema) and might involve irrevocable decisions (e.g., there is no way to get back whether a field is unavailable because it was not collected, because it was suppressed due to privacy reasons, or because it was deleted). Later in this chapter, we discuss how an ELT pipeline in SQL can help us delay making irrevocable decisions.
Data Management (DDL and DML)
Why cover data management in a chapter on loading data? Because loading data is typically only part of the task of managing data. If data is loaded by mistake, you might need to delete it. Sometimes you need to delete data because of regulations and compliance.
Warning
Even though we normally want you to try all the commands and queries in this book, don’t try the ones in this section, because you will lose your data!
The easiest way to delete a table (or view) as a whole is from the BigQuery UI. You can also carry out the delete from the bq
command-line tool:
bq rm ch04.college_scorecard bq rm -r -f ch04
The first line removes a single table, whereas the second one removes recursively (-r
) and without prompting (-f
, for force) the dataset ch04
and all of the tables it contains.
You can also delete a table (or view) by using SQL:
DROP TABLE IF EXISTS ch04.college_scorecard_gcs
It is also possible to specify that a table needs to be expired at a certain time in the future. You can so this with the ALTER TABLE SET OPTIONS
statement:
ALTER TABLE ch04.college_scorecard SET OPTIONS ( expiration_timestamp=TIMESTAMP_ADD(CURRENT_TIMESTAMP(), INTERVAL 7 DAY), description="College Scorecard expires seven days from now" )
The DROP TABLE
and ALTER TABLE
statements, like the CREATE TABLE
statement, are examples of Data Definition Language (DDL) statements.
It is possible to delete only specific rows from a table—for example:
DELETE FROM ch04.college_scorecard WHERE SAT_AVG IS NULL
Similarly, it is also possible to INSERT
rows into an existing table instead of replacing the entire table. For example, it is possible to insert more values into the college_scorecard
table using the following:
INSERT ch04.college_scorecard (INSTNM , ADM_RATE_ALL , FIRST_GEN , MD_FAMINC , SAT_AVG , MD_EARN_WNE_P10 ) VALUES ('abc', 0.1, 0.3, 12345, 1234, 23456), ('def', 0.2, 0.2, 23451, 1232, 32456)
It is possible to use a subquery to extract values from one table and copy them into another:
INSERT ch04.college_scorecard SELECT * FROM ch04.college_scorecard_etl WHERE SAT_AVG IS NULL
The DELETE
, INSERT
, and MERGE
statements are examples of Data Manipulation Language (DML) statements.
Tip
As of this writing, BigQuery does not support an SQL COPY
statement. To copy tables, use bq cp
to copy one table to another:
bq cp ch04.college_scorecard someds.college_scorecard_copy
You are not billed for running a query, but you will be billed for the storage of the new table. The bq cp
command supports appending (specify -a
or --append_table
) and replacement (specify -noappend_table
).
You can also use the idiomatic Standard SQL method of using either CREATE TABLE AS SELECT
or INSERT VALUES
, depending on whether the destination already exists. However, bq cp
is faster (because it copies only the table metadata) and doesn’t incur query costs.
Loading Data Efficiently
Although BigQuery can load data from CSV files, CSV files are inefficient and not very expressive (for example, there is no way to represent arrays and structs in CSV). If you have a choice, you should choose to export your data in a different format. What format should you choose?
An efficient and expressive format is Avro. Avro uses self-describing binary files that are broken into blocks and can be compressed block by block. Because of this, it is possible to parallelize the loading of data from Avro files and the export of data into Avro files. Because the blocks are compressed, the file sizes will also be smaller than the data size might indicate. In terms of expressiveness, the Avro format is hierarchical and can represent nested and repeated fields, something that BigQuery supports but CSV files don’t have an easy way to store. Because Avro files are self-describing, you never need to specify a schema.
There are two drawbacks to Avro files. One is that they are not human readable. If readability and expressiveness are important to you, use newline-delimited JSON files12 to store your data. JSON supports the ability to store hierarchical data but requires that binary columns be base-64 encoded. However, JSON files are larger than even the equivalent CSV files because the name of each field is repeated on every line. The second drawback is that Avro files are stored row by row. This makes Avro files not as efficient for federated queries.
The Parquet file format was inspired by Google’s original Dremel ColumnIO format,13 and like Avro, Parquet is binary, block oriented, compact, and capable of representing hierarchical data. However, whereas Avro files are stored row by row, Parquet files are stored column by column. Columnar files are optimized for reading a subset of the columns; loading data requires reading all columns, and so columnar formats are somewhat less efficient at the loading of data. However, the columnar format makes Parquet a better choice than Avro for federated queries, a topic that we discuss shortly. Optimized Row Columnar (ORC) files are another open source columnar file format. ORC is similar to Parquet in performance and efficiency.
Therefore, if you have a choice of file formats, we recommend Avro if you plan to load the data into BigQuery and discard the files. We recommend Parquet if you will be retaining the files for federated queries. Use JSON for small files where human readability is important.
Impact of compression and staging via Google Cloud Storage
For formats such as CSV and JSON that do not have internal compression, you should consider whether you should compress the files using gzip. Compressed files are faster to transmit and take up less space, but they are slower to load into BigQuery. The slower your network, the more you should lean toward compressing the data.
If you are on a slow network or if you have many files or very large files, it is possible to set up a multithreaded upload of the data using gsutil cp
. After the data is all on Google Cloud Storage, then you can invoke bq load
from the Cloud Storage location:
gsutil -m cp *.csv gs://BUCKET/some/location bqload … gs://BUCKET/some/location/*.csv
This experiment captures the various trade-offs involved with compression and with staging the college scorecard data on Cloud Storage before invoking bq load
. Table 4-1 examines this further. Your results will vary, of course, depending on your network and the actual data you are loading.14 Therefore, you should carry out a similar measurement for your loading job and choose the method that provides you with the best performance on the measures you care about.
Compressed file | Stage on GCS? | GCS size | Network time (if separate) | Time to load into BigQuery | Total time |
---|---|---|---|---|---|
Yes | No | None | N/A | 105 seconds | 105 seconds |
No | No | None | N/A | 255 seconds | 255 seconds |
Yes | Yes | 16 MB | 47 sec | 42 seconds | 89 seconds |
No | Yes | 76 MB | 139 sec | 28 sec | 167 sec |
Staging the file on Google Cloud Storage involves paying storage costs at least until the BigQuery load job finishes. However, storage costs are generally quite low and so, on this dataset and this network connection (see Table 4-1), the best option is to stage compressed data in Cloud Storage and load it from there. Even though it is faster to load uncompressed files into BigQuery, the network time to transfer the files dwarfs whatever benefits you’d get from a faster load.
As of this writing, the loading of compressed CSV and JSON files is limited to files less than 4 GB in size because BigQuery has to uncompress the files on the fly on workers whose memory is finite. If you have larger datasets, split them across multiple CSV or JSON files. Splitting files yourself can allow for some degree of parallelism when doing the loads, but depending on how you size the files, this can lead to suboptimal file sizes in the table until BigQuery decides to optimize the storage.
Price and quota
BigQuery does not charge for loading data. Ingestion happens on a set of workers that is distinct from the cluster providing the slots used for querying. Hence, your queries (even on the same table into which you are ingesting data) are not slowed down by the fact that data is being ingested.
Data loads are atomic. Queries on a table will either reflect the presence of all the data that is loaded in through the bq load
operation or reflect none of it. You will not get query results on a partial slice of the data.
The drawback of loading data using a “free” cluster is that load times can become unpredictable and bottlenecked by preexisting jobs. As of this writing, load jobs are limited to 1,000 per table and 100,000 per project per day. In the case of CSV and JSON files, cells and rows are limited to 100 MB, whereas in Avro, blocks are limited to 16 MB. Files cannot exceed 5 TB in size. If you have a larger dataset, split it across multiple files, each smaller than 5 TB. However, a single load job can submit a maximum of 15 TB of data split across a maximum of 10 million files. The load job must finish executing in less than six hours or it will be cancelled.
Federated Queries and External Data Sources
You can use BigQuery without first loading the data. It is possible to leave the data in-place, specify the structure of the data, and use BigQuery as just the query engine. In contrast to the queries thus far for which BigQuery queried its own native storage, we discuss the use of “federated queries” to query “external data sources” in this section and explain when you might want to use such queries.
Currently supported external data sources include Google Cloud Storage, Cloud Bigtable, Cloud SQL, and Google Drive. You will notice that all of these sources are external to BigQuery but are, nevertheless, within the Google Cloud perimeter. This is necessary because otherwise the network overhead and security considerations would make the queries either slow or infeasible.
How to Use Federated Queries
There are three steps to querying data in an external data source:
-
Create a table definition using
bq mkdef
. -
Make a table using
bq mk
, passing in the external table definition. -
Query the table as normal.
As with querying data in native storage, you can do this either in the web UI or by using a programmatic interface. To use the web UI, follow the just-listed steps to create a table, but make sure to specify that you want an external table, not a native one, as demonstrated in Figure 4-1.
Using the command-line interface, create a table definition using bq mkdef
. As with bq load
, you have the option of using --autodetect
:
bq mkdef --source_format=CSV \ --autodetect \ gs://bigquery-oreilly-book/college_scorecard.csv
This prints a table definition file to standard output. The normal course of action is to redirect this to a file and use that table definition to make a table using bq mk
:
bq mkdef --source_format=CSV \ --autodetect \ gs://bigquery-oreilly-book/college_scorecard.csv \ > /tmp/mytable.json bq mk --external_table_definition=/tmp/mytable.json \ ch04.college_scorecard
With these two steps, you can query the table college_scorecard
as in the previous section, except that the queries will happen on the CSV file stored in Google Cloud Storage—the data is not ingested into BigQuery’s native storage.
Wildcards
Many big data frameworks such as Apache Spark, Apache Beam, and others shard their output across hundreds of files with names such as course_grades.csv-00095-of-00313. When loading such files, it would be convenient if we could avoid having to list each file individually.
Indeed, it is possible to use a wildcard in the path to bq mkdef
(and bq load
) so that you can match multiple files:
bq mkdef --source_format=CSV \ --autodetect \ gs://bigquery-oreilly-book/college_* \ > /tmp/mytable.json
This creates a table that refers to all the files matched by the pattern.
Temporary table
It is also possible to condense the three steps (mkdef
, mk
, and query
) by passing in the table definition parameters along with a query, thus ensuring that the table definition will be used only for the duration of the query:
LOC="--location US" INPUT=gs://bigquery-oreilly-book/college_scorecard.csv SCHEMA=$(gsutil cat $INPUT | head -1 | awk -F, '{ORS=","}{for (i=1; i <= NF; i++){ print $i":STRING"; }}' | sed 's/,$//g'| cut -b 4- ) bq $LOC query \ --external_table_definition=cstable::${SCHEMA}@CSV=${INPUT} \ 'SELECT SUM(IF(SAT_AVG != "NULL", 1, 0))/COUNT(SAT_AVG) FROM cstable'
In the preceding query, the external table definition consists of the temporary table name (cstable
), two colons, the schema string, the @ symbol, the format (CSV
), an equals sign, and the Google Cloud Storage URL corresponding to the data file(s). If you already have a table definition file, you can specify it directly:
--external_table_definition=cstable::${DEF}
It is possible to specify a JSON schema file as well as to query JSON, Avro, and other supported formats directly from Cloud Storage, Cloud Bigtable, and other supported data sources.
While undeniably convenient, federated queries leave much to be desired in terms of performance. Because CSV files are stored row-wise and the rows themselves are stored in some arbitrary order, much of the efficiency that we commonly associate with BigQuery is lost. It is also not possible for BigQuery to estimate how much data it is going to need to scan before running the query.
Loading and querying Parquet and ORC
As previously mentioned, Parquet and ORC are columnar data formats. Therefore, federated querying of these formats will provide better query performance than if the data was stored in row-based formats such as CSV or JSON (queries will still be slower than BigQuery’s native Capacitor storage, however).
Because Parquet and ORC are self-describing (i.e., the schema is implicit in the files themselves), it is possible to create table definitions without specifying a schema:
bq mkdef --source_format=PARQUET gs://bucket/dir/files* > table_def.json bq mk --external_table_definition=table_def.json <dataset>.<table>
As with querying external tables created from CSV files, querying this table works like querying any other table in BigQuery.
Even though Parquet and ORC files provide better query performance than row-based file formats, they are still subject to the limitations of external tables.
Loading and querying Hive partitions
Apache Hive allows for reading, writing, and managing an Apache Hadoop–based data warehouse using a familiar SQL-like query language. Cloud Dataproc, on Google Cloud, enables Hive software to work on distributed data stored in Hive partitions on Google Cloud Storage. A common public cloud migration pattern is for on-premises Hive workloads to be moved to Cloud Dataproc and for newer workloads to be written using BigQuery’s federated querying capability. This way, the current Hive workloads work as-is, whereas newer workloads can take advantage of the serverless, large-scale querying capability provided by BigQuery.
You can load Hive partitions on Google Cloud Storage by specifying a Hive partitioning mode to bq load
:
bq load --source_format=ORC --autodetect \ --hive_partitioning_mode=AUTO <dataset>.<table> <gcs_uri>
The Cloud Storage URI in the case of Hive tables needs to encode the table path prefix without including any partition keys in the wildcard. Thus, if the partition key for a Hive table is a field named datestamp
, the Cloud Storage URI should be of the following form:
gs://some-bucket/some-dir/some-table/*
This is true even if the files themselves all begin with the following:
gs://some-bucket/some-dir/some-table/datestamp=
As of this writing, the AUTO
partitioning mode can detect the following types: STRING
, INTEGER
, DATE
, and TIMESTAMP
. It is also possible to request that the partition keys be detected as strings (this can be helpful in exploratory work):
bq load --source_format=ORC --autodetect \ --hive_partitioning_mode=STRINGS <dataset>.<table> <gcs_uri>
As with CSV files from Google Cloud Storage, federated querying of Hive partitions requires the creation of a table definition file, and the options closely mirror that of load:
bq mkdef --source_format=ORC --autodetect \ --hive_partitioning_mode=AUTO <gcs_uri> > table_def.json
After the table definition file is created, querying is the same whether the underlying external dataset consists of CSV files or Hive partitions.
In addition to ORC, as shown earlier, data in other formats is also supported. For example, to create a table definition of data stored in newline-delimited JSON, you can use this:
bq mkdef --source_format=NEWLINE_DELIMITED_JSON --autodetect -- hive_partitioning_mode=STRINGS <gcs_uri> <schema> > table_def.json
Note that in the preceding command, the partition keys are being autodetected, but not the data types of the partition keys, because we explicitly specify that they ought to be treated as strings and not the data types of the other columns, since we pass in an explicit schema.
We started this section by saying that a common use case for querying Hive partitions is to support cloud migration efforts where significant Hive workloads already exist but allow future workloads to be implemented using BigQuery. Although Apache Hive allows full management (reading and writing) of the data, BigQuery’s external tables are read-only. Moreover, even though BigQuery can handle the data being modified (e.g., from Hive) while a federated query is running, it does not currently support concepts such as reading data at a specific point in time. Because external tables in BigQuery have these limitations, it is better over time to move the data to BigQuery’s native storage and rewrite the Hive workloads in BigQuery. When the data is in BigQuery’s native storage, features such as DML, streaming, clustering, table copies, and more all become possible.
When to Use Federated Queries and External Data Sources
Querying external sources is slower than querying data that is natively in BigQuery, thus federated queries are typically not recommended in the long term for frequently accessed data. There are, however, situations for which federated queries can be advantageous:
-
Carrying out exploratory work using federated queries to determine how best to transform the raw data before loading it into BigQuery. For example, evidence of actual analysis workloads could dictate the transformations present in production tables. You might also treat original, external data sources as staging, and use federated queries to transform the data and write it to production tables.
-
Keeping data in Google Sheets if the spreadsheet will be edited interactively, and using federated queries exclusively if the results of those queries need to reflect the live data in that sheet.
-
Keeping data in an external data source if ad hoc SQL querying of the data is relatively infrequent. For example, you might keep the data in Cloud Bigtable if the predominant use of that data is for low-latency, high-volume streaming ingest and if most queries on the data can be accomplished using key prefixes.
For large, relatively stable, well-understood datasets that will be updated periodically and queried often, BigQuery native storage is a better choice. In the rest of this section, we look at the implementation details of each of these situations, beginning with exploratory work using federated queries.
Exploratory work using federated queries
Autodetect is a convenience feature that works by sampling a few (on the order of hundreds) rows of the input files to determine the type of a column. It is not fool-proof unless you are using self-describing file formats, such as Avro, Parquet, or ORC. To ensure that your ETL pipeline works properly, you should verify the value of every row to ensure that the data type for each column is correct. For example, it is possible that a column contains integers except for a handful of rows that have floats. If so, then it’s quite likely that the autodetect will detect the column as being an integer because the chance of selecting one of the rows containing the floating-point value is rather low. You won’t learn there is a problem until you issue a query that does a table scan of this column’s values.
The best practice is to use self-describing file formats, in which case you don’t need to worry about how BigQuery interprets the data. If you need to use CSV or JSON, we recommend that you explicitly specify a schema. Although it is possible to specify the schema in an accompanying JSON file, it is also possible to pass in the schema on the command line of bq mkdef
by creating a string with this format:
FIELD1:DATATYPE1,FIELD2:DATATYPE2,...
If you are unsure of the quality of your data, you should specify everything as a STRING
. Note that this is the default data type, so the formatting command becomes just this:
FIELD1,FIELD2,FIELD3,,...
Why treat everything as a string? Even if you believe that some of the fields are integers and others are floats, it is best to validate this assumption. Define everything as a string and learn what transformations you need to carry out as you query the data and discover errors.
We can extract the column names by using the first line of the CSV file to create a schema string of the desired format:15
INPUT=gs://bigquery-oreilly-book/college_scorecard.csv SCHEMA=$(gsutil cat $INPUT | head -1 | cut -b 4- )
If we are going to specify the schema, we should ask that the first row be skipped and that the tool allow empty lines in the file. We can do this by piping the table definition through sed
, a line editor:16
LOC="--location US" OUTPUT=/tmp/college_scorecard_def.json bq $LOC \ mkdef \ --source_format=CSV \ --noautodetect \ $INPUT \ $SCHEMA \ | sed 's/"skipLeadingRows": 0/"skipLeadingRows": 1/g' \ | sed 's/"allowJaggedRows": false/"allowJaggedRows": true/g' \ > $OUTPUT
We define that we are operating in the US location and that we want to save the output (the table definition) to the /tmp
folder.
At this point, we have a table that we can query. Note two things: this table is defined on an external data source, so we are able to start querying the data without the need to wait for the data to be ingested; and all of the columns are strings—we have not made any irreversible changes to the raw data.
Let’s begin our data exploration by trying to do a cast:
SELECT MAX(CAST(SAT_AVG AS FLOAT64)) AS MAX_SAT_AVG FROM `ch04.college_scorecard_gcs`
The query fails with the following error message:
Bad double value: NULL
This indicates that we need to handle the nonstandard way that missing data is encoded in the file. In most CSV files, missing data is encoded as an empty string, but in this one, it is encoded as the string NULL
.
We could fix this problem by checking before we do the cast:
WITH etl_data AS ( SELECT SAFE_CAST(SAT_AVG AS FLOAT64) AS SAT_AVG FROM `ch04.college_scorecard_gcs` ) SELECT MAX(SAT_AVG) AS MAX_SAT_AVG FROM etl_data
Notice that we have started a WITH
clause containing all the ETL operations that need to be performed on the dataset. Indeed, as we go through exploring the dataset and culminate with the query of the previous section, we learn that we need a reusable function to clean up numeric data:
CREATE TEMP FUNCTION cleanup_numeric(x STRING) AS ( IF ( x != 'NULL' AND x != 'PrivacySuppressed', CAST(x as FLOAT64), NULL ) ); WITH etl_data AS ( SELECT INSTNM , cleanup_numeric(ADM_RATE_ALL) AS ADM_RATE_ALL , cleanup_numeric(FIRST_GEN) AS FIRST_GEN , cleanup_numeric(MD_FAMINC) AS MD_FAMINC , cleanup_numeric(SAT_AVG) AS SAT_AVG , cleanup_numeric(MD_EARN_WNE_P10) AS MD_EARN_WNE_P10 FROM `ch04.college_scorecard_gcs` ) SELECT * FROM etl_data WHERE SAT_AVG > 1300 AND ADM_RATE_ALL < 0.2 AND FIRST_GEN > 0.1 ORDER BY MD_FAMINC ASC LIMIT 10
At this point, we can export the cleaned-up data (note the SELECT *
) into a new table (note the CREATE TABLE
) for just the columns of interest by running the following query:
CREATE TEMP FUNCTION cleanup_numeric(x STRING) AS ( IF ( x != 'NULL' AND x != 'PrivacySuppressed', CAST(x as FLOAT64), NULL ) ); CREATE TABLE ch04.college_scorecard_etl OPTIONS(description="Cleaned up college scorecard data") AS WITH etl_data AS ( SELECT INSTNM , cleanup_numeric(ADM_RATE_ALL) AS ADM_RATE_ALL , cleanup_numeric(FIRST_GEN) AS FIRST_GEN , cleanup_numeric(MD_FAMINC) AS MD_FAMINC , cleanup_numeric(SAT_AVG) AS SAT_AVG , cleanup_numeric(MD_EARN_WNE_P10) AS MD_EARN_WNE_P10 FROM `ch04.college_scorecard_gcs` ) SELECT * FROM etl_data
It is also possible to script this out by removing the CREATE TABLE
statement from the preceding query, invoking bq query
and passing in a --destination_table
.
ELT in SQL for experimentation
In many organizations, there are many more data analysts than there are engineers. Thus, the needs of the data analysis teams usually greatly outpace what the data engineers can deliver. In such cases, it can be helpful if data analysts themselves can create an experimental dataset in BigQuery and get started with analysis tasks.
The organization can then use the evidence of actual analytics workloads to prioritize what data engineers focus on. For example, as a data engineer, you might not yet know what fields you need to extract out of a log file. So you might set up an external data source as an experiment and allow data analysts to query the raw data on Google Cloud Storage directly.
If the raw log files are in JSON format, with each of the rows having a different structure because the logs come from different applications, the analysts could define the entire log message as a single BigQuery string column and use JSON_EXTRACT
and string manipulation functions to pull out the necessary data. At the end of a month, you could analyze the BigQuery query logs for which fields they actually did access, and how they did such access, and then build a pipeline to routinely load those fields into BigQuery.
For example, you can export BigQuery audit logs from Stackdriver in JSON format with the entire log message in a nested column named protopayload_auditlog.metadataJson
. Here is a query to count log messages with the root element tableDataRead
and use the count to rank datasets in terms of the number of times each dataset is accessed:
SELECT REGEXP_EXTRACT(protopayload_auditlog.resourceName, '^projects/[^/]+/datasets/([^/]+)/tables') AS datasetRef, COUNTIF(JSON_EXTRACT(protopayload_auditlog.metadataJson, "$.tableDataRead") IS NOT NULL) AS dataReadEvents, FROM `ch04.cloudaudit_googleapis_com_data_access_2019*` WHERE JSON_EXTRACT(protopayload_auditlog.metadataJson, "$.tableDataRead") IS NOT NULL GROUP BY datasetRef ORDER BY dataReadEvents DESC LIMIT 5
The method JSON_EXTRACT
takes the column name (protopayload_auditlog.metadataJson
) as the first parameter and a JSONPath17 as the second parameter.
If the original data is in a relational database management system (RDBMS), it is possible to export the data periodically as a tab-separated values (TSV) file to Google Cloud Storage. For example, if you are using MySQL with a database named somedb
, the relevant command would be as follows:
mysql somedb < select_data.sql | \ gsutil cp - gs://BUCKET/data_$(date -u "+%F-%T").tsv
The select_data.sql
would contain a query to pull just the most recent records (here, those from the previous 10 days):
select * from my_table where transaction_date >= DATE_SUB(CURDATE(), INTERVAL 10 DAY)
Given these periodically exported files, it is straightforward for an analyst to get started querying the data using federated queries. After the value of the dataset is proven, the data can be loaded routinely and/or in real time through a data pipeline.
The reason that this is not always suitable for operationalization is that it doesn’t handle the case of mutations to the database. If data that is more than 10 days old is updated, the tab-separated dumps will not be synchronized. Realistically, dumps to TSV files work only for small datasets (on the order of a few gigabytes) where the original database fields themselves do not need to be transformed or corrected before they are used for analytics queries.
If you do want to operationalize synchronization from an operational database to BigQuery, there are a number of third-party companies that partner with Google, each with a menu of connectors and transformation options.18 These tools can do change data capture (CDC) to allow you to stream changes from a database to a BigQuery table.
External query in Cloud SQL
BigQuery supports external queries, not just federated queries. Whereas a federated query allows you to query an external data source using BigQuery, an external query allows you to run the query in the external database and seamlessly join the results against data in BigQuery. At the time of writing, MySQL and PostgresSQL databases in Cloud SQL (the managed relational database service in Google Cloud) are supported.
There is an initial one-time setup to create a connection resource in BigQuery and grant users permission to use this connection resource. Once this connection resource has been set up, it can be used from an EXTERNAL_QUERY
as follows:
SELECT * FROM EXTERNAL_QUERY(connection_id, cloud_sql_query);
In this example, connection_id
is the name of the database connection resource that you created in BigQuery using the web UI, a REST API, or the command-line tool.
The performance of the external query depends on the speed of the external database and, because it involves an intermediate temporary table, will usually be slower than queries that are purely in Cloud SQL or purely in BigQuery. Still, there is a tremendous benefit to being able to query data residing in an RDBMS in real time without having to move data around, thus avoiding unnecessary ETL, scheduling, and orchestration.
For example, suppose we wish to create a report of gift cards belonging to customers who have not made any recent purchases. The date of the latest order for each customer is available in Cloud SQL and updated in real time. The balance associated with every gift card our store has ever issued, however, is available in BigQuery. We can join the result of an external query of the orders data in Cloud SQL with the gift card balance data in BigQuery to create an up-to-date report without having to move any data around:
SELECT c.customer_id , c.gift_card_balance , rq.latest_order_date FROM ch04.gift_cards AS c LEFT OUTER JOIN EXTERNAL_QUERY( 'connection_id', '''SELECT customer_id, MAX(order_date) AS latest_order_date FROM orders GROUP BY customer_id''') AS rq ON rq.customer_id = c.customer_id WHERE c.gift_card_balance > 100 ORDER BY rq.latest_order_date ASC;
Interactive Exploration and Querying of Data in Google Sheets
Google Sheets is part of G Suite, a set of productivity and collaboration tools from Google Cloud. It provides the means of creating, viewing, editing, and publishing spreadsheets. A spreadsheet contains tabular values in individual cells; some of these values are data and some are the result of computations carried out on the values of other cells. Google Sheets brings spreadsheets online—multiple people can collaboratively edit a spreadsheet, and you can access it from a variety of devices.
Loading Google Sheets data into BigQuery
Google Sheets is an external source, so loading and querying a Google Sheets spreadsheet is a federated query; it works similarly to querying a CSV file from Google Cloud Storage. We create a table definition in BigQuery to point to the data in Google Sheets, and then we can query that table as if it were a native BigQuery table.
Let’s begin by creating a Google Sheets spreadsheet that we can query. Open a web browser, and then, in the URL navigation bar, type https://sheets.new --
. Visiting this URL opens a blank spreadsheet.
Type in the following data (or download the corresponding CSV file from GitHub and do a File > Import of the data into Google Sheets):
Student | Home state | SAT score |
---|---|---|
Aarti | KS | 1111 |
Billy | LA | 1222 |
Cao | MT | 1333 |
Dalia | NE | 1444 |
Next, navigate to the BigQuery section of the GCP Cloud Console, create a dataset (if necessary), and create a table, specifying that the source of the table is on Drive and its URL, and that it is a Google Sheet. Ask for the schema to be autodetected, as demonstrated in Figure 4-2.
After you do this, you can query the spreadsheet like any other BigQuery table:
SELECT * from advdata.students
Try changing the spreadsheet and verify that the returned results reflect the current state of the table (the results of federated queries on external datasets are not cached).
Even though querying a spreadsheet using SQL like this is possible, it is unlikely that you’d want to do this, because it’s usually more convenient to use the interactive filtering and sorting options built into Google Sheets. For example, you can click the Explore button and type in the natural language query “average SAT score of students in KS,” which returns the results shown in Figure 4-3.
There are several broad use cases for the tie between Google Sheets and BigQuery:
-
Populating a spreadsheet with data from BigQuery
-
Exploring BigQuery tables using Sheets
-
Querying Sheets data using SQL
Let’s look at these three cases.
Populating a Google Sheets spreadsheet with data from BigQuery
The BigQuery data connector in Google Sheets allows you to query BigQuery tables19 and use the results to populate a spreadsheet. This can be extremely useful when sharing data with nontechnical users. In most businesses, nearly all office workers know how to read/interpret spreadsheets. They don’t need to have anything to do with BigQuery or SQL to be able to use Google Sheets and work with the data in the sheet.
From Google Sheets, click Data > Data Connectors > BigQuery, select your project, and write a query to populate the spreadsheet from the BigQuery table of college scorecard data:
SELECT * FROM ch04.college_scorecard_etl
Exploring BigQuery tables using Sheets
One of the reasons that you might want to populate a Google Sheets spreadsheet with data from a BigQuery table is that Sheets is a familiar interface for business users creating charts, formulas, and pivot tables. For example, from the college scorecard data in Sheets, it is quite straightforward to create a formula to rank colleges by the increase in median income experienced by their graduates:
-
In a new column, enter the following formula:
=ArrayFormula(IF(ISBLANK(D2:D), 0, F2:F/D2:D))
Note that the spreadsheet has now been populated with the ratio of the value in the F-column to the value in the D-column—that is, by the increase in income.
-
From the Data menu, create a filter on the newly created column and turn off blanks and zeros.
-
Sort the spreadsheet Z to A based on this column.
Selecting the first few rows of the sheet, we can quickly create a chart to showcase the best colleges in terms of economic improvement of the student body, as illustrated in Figure 4-4.
In addition to interactively creating the charts you want, you can use the machine learning features of Google Sheets to further explore your data.
In Google Sheets, click the Explore button and notice the charts that are automatically created through machine learning.20 For example, the automatically generated insight depicted in Figure 4-5 captures a striking inequality.
Figure 4-6 shows a subsequent automatically created chart that puts the SAT_AVG
in context.
We can even ask for specific charts using natural language. Typing “histogram of sat_avg where first_gen more than 0.5” in the “Ask a question” box returns the answer displayed in Figure 4-7.
Exploring BigQuery tables as a data sheet in Google Sheets
In the previous section, we loaded the entire BigQuery table into Google Sheets, but this was possible only because our college scorecard dataset was small enough. Loading the entire BigQuery table into Google Sheets is obviously not feasible for larger BigQuery tables.
Google Sheets does allow you to access, analyze, visualize, and share even large BigQuery datasets as a BigQuery Data Sheet. To try this out, start a new Google Sheets document and navigate via the menu by clicking Data > Data Connectors > BigQuery Data Sheet.
Choose your Cloud project (that should be billed), and navigate via the menu to the table you want to load into the Data Sheet by clicking bigquery-public-data > usa_names > usa_1910_current > Connect. This table contains nearly six million rows and is too large to load in its entirety. Instead, BigQuery acts as a cloud backend for the data shown in Sheets.
Unlike when loading the entire table into Sheets (as in the previous section), only the first 500 rows of a Data Sheet are loaded in the UI. These 500 rows are best thought of as a preview of the full dataset. Another difference is in editing: if the entire table is loaded, Google Sheets holds a copy of the data; thus, you can edit cells and save the changed spreadsheet. On the other hand, if BigQuery is acting as a cloud backend, cells are not editable—users can filter and pivot the BigQuery Data Sheet, but they cannot edit the data. When users do filtering and pivoting, these actions happen on the entire BigQuery table, not just the preview that is shown in Sheets.
As an example of the kind of analysis that is possible, let’s create a Pivot table by clicking the Pivot table button. In the Pivot table editor, choose state
as the Rows
, and select year
as the Columns
. For Values
, choose number
, and ask Sheets to summarize by COUNTUNIQUE
and show as Default
, as shown in Figure 4-8.
As Figure 4-8 illustrates, we get a table of the number of unique baby names in each state, broken down by year.
Joining Sheets data with a large dataset in BigQuery
Both BigQuery and Google Sheets are capable of storing and providing access to tabular data. However, BigQuery is primarily an analytics data warehouse, whereas Google Sheets is primarily an interactive document. As we saw in the earlier sections, the familiarity of Sheets and the exploration and charting capabilities makes loading BigQuery data into Sheets very powerful.
However, there is a practical limitation on the size of BigQuery datasets that you can load into Sheets. For example, BigQuery holds information on Stack Overflow questions, answers, and users. Even with BigSheets, these petabyte-scale datasets are much too large to load directly into Google Sheets. However, it is still possible to write queries that join a small dataset in Sheets with such large datasets in BigQuery and proceed from there. Let’s look at an example.
From the previous section, we have a spreadsheet with college scorecard data. Let’s assume that we don’t already have the data in BigQuery. We could create a table in BigQuery using the spreadsheet as a source, calling the resulting table college_scorecard_gs
, as depicted in Figure 4-9.
Now we can issue a query in BigQuery that joins this relatively small table (7,700 rows) with a massive table consisting of Stack Overflow data (10 million rows) to find which colleges are most commonly listed in Stack Overflow users’ profiles:
SELECT INSTNM, COUNT(display_name) AS numusers FROM `bigquery-public-data`.stackoverflow.users, ch04.college_scorecard_gs WHERE REGEXP_CONTAINS(about_me, INSTNM) GROUP BY INSTNM ORDER BY numusers DESC LIMIT 5
This yields the following:21
Row | INSTNM | numusers |
---|---|---|
1 | Institute of Technology | 2364 |
2 | National University | 332 |
3 | Carnegie Mellon University | 169 |
4 | Stanford University | 139 |
5 | University of Maryland | 131 |
The first two entries are suspect,22 but it appears that Carnegie Mellon and Stanford are well represented on Stack Overflow.
The result of this query is again small enough to load directly into Google Sheets and perform interactive filtering and charting. Thus the SQL querying capability of Sheets data from BigQuery is particularly useful to join a small, human-editable dataset (in Google Sheets) with large enterprise datasets (in BigQuery).
SQL Queries on Data in Cloud Bigtable
Cloud Bigtable is a fully managed NoSQL database service that scales up to petabytes of data. Cloud Bigtable is meant to be used in situations for which some combination of low latency (on the order of milliseconds), high throughput (millions of operations per second), replication for high availability, and seamless scalability (from gigabytes to petabytes) is desired. Cloud Bigtable, therefore, finds heavy use in finance (trade reconciliation and analytics, payment fraud detection, etc.), Internet of Things (IoT) applications (for centralized storage and processing of real-time sensor data), and advertising (real-time bidding, placement, and behavioral analysis). Although Cloud Bigtable itself is available only on GCP, it supports the open source Apache HBase API, enabling easy migration of workloads in a hybrid cloud environment.
NoSQL Queries based on a row-key prefix
Cloud Bigtable provides high-performance queries that look up rows or sets of rows that match a specific row-key, a row-key prefix, or a range of prefixes. Even though Cloud Bigtable requires an instance, consisting of one or more logical clusters, to be provisioned and available in your project, it uses that cluster only for compute (and not for storage)—the data itself is stored on Colossus, and the nodes themselves need only to know about the location of row-ranges on Colossus. Because the data is not stored on the Cloud Bigtable nodes, it is possible to easily scale the Cloud Bigtable cluster up and down without expensive data migration.
In financial analysis, a common pattern is to store time-series data in Cloud Bigtable as it arrives in real- time and support low-latency queries on that data based on the row-key (e.g., all buy orders, if any, for GOOG stock in the past 10 minutes). This allows dashboards that require recent data to provide automatic alerts and actions based on recent activity. Cloud Bigtable also supports being able to quickly obtain a range of data (e.g., all the buy orders for GOOG stock in any given day), a necessity for financial analytics and reporting. Prediction algorithms themselves need to be trained on historical data (e.g., the time-series of ask prices for GOOG over the past five years), and this is possible because machine learning frameworks like TensorFlow can read and write directly from and to Cloud Bigtable. These three workloads (real-time alerting, reporting, and machine learning training) can occur on the same data, with the cluster potentially being scaled up and down with workload spikes due to the separation of compute and storage.
All three workloads in the previous paragraph involve obtaining ask prices for Google stock. Cloud Bigtable will provide efficient retrieval of records if the row-key with which the time-series data is stored is of the form GOOG#buy#20190119-090356.0322234
—that is, the security name and the timestamp. Then the queries of ask prices, whether over the previous 10 minutes or over the past five years, all involve requesting records that fall within a range of prefixes.
What if, though, we desire to perform ad hoc analytics over all of the Cloud Bigtable data, and our query is not of a form that will result in retrieving only a subset of records—what if, in other words, our query does not filter based on the row-key prefix? Then the NoSQL paradigm of Cloud Bigtable falls down, and it is better to resort to the ad hoc SQL querying capabilities offered by BigQuery instead, with the understanding that BigQuery results will be subject to higher latency.
Ad hoc SQL queries on Cloud Bigtable data
Just as BigQuery can directly query files in certain formats (CSV, Avro, etc.) in Google Cloud Storage by treating it as an external data source, BigQuery can directly query data in Cloud Bigtable. Just as with data in Cloud Storage, data in Cloud Bigtable can be queried using either a permanent table or a temporary table. A permanent table can be shared by sharing the dataset that it is part of; a temporary table is valid only for the duration of a query and so cannot be shared.
A table in Cloud Bigtable is mapped to a table in BigQuery. In this section, we use a time-series of point-of-sale data to illustrate. To follow along, run the script setup_data.sh in the GitHub repository for this book to create a Cloud Bigtable instance populated with some example data. Because the setup script creates a Cloud Bigtable instance with a cluster, remember to delete the instance when you are done.
We begin by using the BigQuery UI to create an external table in BigQuery to point to the data in Cloud Bigtable, as shown in Figure 4-10. The location is a string of the form https://googleapis.com/bigtable/projects/[PROJECT_ID]/instances/[INSTANCE_ID]/tables/[TABLE_NAME]
. The PROJECT_ID
, INSTANCE_ID
, and TABLE_NAME
refer to the project, instance, and table in Cloud Bigtable.23
Data in Cloud Bigtable consists of records, each of which has a row-key and data tied to the row-key that is organized into column families, which are key/value pairs, where the key is the name of the column family and the value is a set of related columns.
Cloud Bigtable does not require every record to have every column family and every column allowed in a column family; in fact, the presence or absence of a specific column can itself be considered data. Therefore, BigQuery allows you to create a table that is tied to data in Cloud Bigtable without explicitly specifying any column names. If you do that, BigQuery exposes the values in a column family as an array of columns and each column as an array of values written at different timestamps.
In many cases, the column names are known beforehand, and if that is the case, it is better to supply the known columns in the table definition. In our case, we know the schema of each record in the logs-table
of Cloud Bigtable:
-
A row-key, which is the store ID followed by the timestamp of each transaction
-
A column family named “
sales
” to capture sales transactions at the register -
Within the
sales
column family, we capture:-
The item ID (a string)
-
The price at which the item was sold (a floating-point number)
-
The number of items bought in this transaction (an integer)
-
Notice from Figure 4-10 that we have specified all of this information in the Column Families section of the table definition.
Cloud Bigtable treats all data simply as byte strings, so the schema (string, float, integer) are meant more for BigQuery so that we can avoid the need to cast the values each time in our queries. Avoiding the cast is also the reason why we ask for the row-key to be treated as a string. When the BigQuery table is created, each of the columns in Cloud Bigtable is mapped to a column in BigQuery of the appropriate type:
sales.price | RECORD | NULLABLE | Describe this field... |
---|---|---|---|
sales.price.cell | RECORD | NULLABLE | Describe this field... |
sales.price.cell.timestamp | TIMESTAMP | NULLABLE | Describe this field... |
sales.price.cell.value | FLOAT | NULLABLE | Describe this field... |
With the BigQuery table in place, it is now possible to issue a good, old-fashioned SQL query to aggregate the total number of itemid
12345
that have been sold:
SELECT SUM(sales.qty.cell.value) AS num_sold FROM ch04.logs WHERE sales.itemid.cell.value = '12345'
Improving performance
When we issue a federated query on data held in Google Cloud Storage, the work is carried out by BigQuery workers. On the other hand, when we issue a federated query on data held in Cloud Bigtable, the work is carried out on the Cloud Bigtable cluster. The performance of the second query is, therefore, limited by the capacity of the Cloud Bigtable cluster and the load on it at the time that the query is being submitted.
As with any analytics query, the overall query speed also depends on the number of rows that need to be read and the size of the data being read. BigQuery does try to limit the amount of data that needs to be read by reading only the column families referenced in the query, and Cloud Bigtable will split the data across nodes to take advantage of the distribution of row-key prefixes across the full dataset.
Note
If you have data that has a high update frequency or you need low-latency point lookups, Cloud Bigtable will provide the best performance for queries that can filter on a range of row-key prefixes. It can be tempting to think of BigQuery as providing an end run around Cloud Bigtable performance by supporting ad hoc point lookups of Cloud Bigtable data that aren’t limited by row-keys. However, this pattern often gives disappointing performance, and you should benchmark it on your workload before deciding on a production architecture.
BigQuery stores data in a column-oriented order, which is optimized for table scans, whereas Cloud Bigtable stores data in a row-major order, which is optimized for small reads and writes. Queries of external data stored in Cloud Bigtable do not provide the benefits of BigQuery’s internal column-based storage and will be performant only if they read a subset of rows, not if they do a full table scan. Hence, you should be careful to ensure that your BigQuery federated queries filter on the Bigtable row-key; otherwise, they will need to read the entire Cloud Bigtable table every time.
The knob you do have under your control is the number of nodes in your Cloud Bigtable cluster. If you are going to routinely issue SQL queries against your Cloud Bigtable data, monitor the Cloud Bigtable CPU usage and increase the number of Cloud Bigtable nodes if necessary.
As with federated queries over Google Cloud Storage, consider whether it is advantageous to set up an ELT pipeline when performing analytics over data in Cloud Bigtable; that is, consider extracting data from Cloud Bigtable using a federated query and loading it into a BigQuery table for further analysis and transformations. This approach, illustrated in Figure 4-11, allows you to carry out your analytics workload in an environment where you are not at the mercy of the operational load on Cloud Bigtable. Analytics on an internal BigQuery table can be carried out on thousands of machines rather than a much smaller cluster. The analytics queries will, therefore, finish more quickly in BigQuery (assuming that these analytics cannot be achieved using row-key prefixes) than if you use federated queries on an external table. The drawback is, of course, that the extracted data is duplicated in both Cloud Bigtable and BigQuery. Still, storage tends to be inexpensive, and the advantages of scale and speed might be enough compensation.
It is possible to schedule such data ingest into internal BigQuery tables to happen periodically. We look at that in the next section.
Transfers and Exports
So far, we have looked at loading data on a one-off basis and avoiding the movement of data by using federated queries. In this section, we look at turn-key services to transfer data into BigQuery from a variety of sources on a periodic basis.
Data Transfer Service
The BigQuery Data Transfer Service allows you to schedule recurring data loads from a variety of data sources into BigQuery. As with most BigQuery capabilities, you can access the BigQuery Data Transfer Service using the web UI or the command-line tool, or through a REST API. For repeatability, we show you the command-line tool.
After you configure a data transfer, BigQuery will automatically load data on the schedule you specify. However, in case there is a problem with the original data, you can also initiate data backfills to recover from any outages or gaps. This is called refreshing, and you can initiate it from the web UI.
The Data Transfer Service supports loading data from a number of Software as a Service (SaaS) applications, such as Google Ads, Google Play, Amazon Redshift, and YouTube, as well as from Google Cloud Storage. We look at how to set up routine ingest of files that show up in Cloud Storage, noting along the way any differences with data transfer of a SaaS dataset, using YouTube channel reports as a running example.
Data locality
As we discussed earlier in the chapter, BigQuery datasets are created in a specific region (such as asia-northeast1
, which is Tokyo) or in a multiregional location (e.g., EU
).25 When you set up a Data Transfer Service to a dataset, it processes and stages data in the same location as the target BigQuery dataset.
If your Cloud Storage bucket is in the same region as your BigQuery dataset, the data transfer does not incur charges. Transferring data between regions (e.g., from a Cloud Storage bucket in one region to a BigQuery dataset in a different region) will incur network charges, whether the transfer happens via loads, exports, or data transfers.
BigQuery Data Transfer Service needs to be enabled (you can do this from the BigQuery web UI), and you need to have been granted the bigquery.admin
role in order to create transfers and write data to the destination dataset.
Setting up destination table
The data transfer service does not have the ability to create a new table, autodetect schema, and so on. Instead, you need to provide a template table that has the desired schema. If you are writing all the data to a column-partitioned table, specify the partitioning column as a TIMESTAMP
or DATE
column when you create the destination table schema. We cover partitions in detail in Chapter 7.
Here, we illustrate the process on the college scorecard dataset. We have it stored in the US multiregion, so you should create a dataset in the US multiregion if you want to try out the following steps.
In BigQuery, run the following query:
CREATE OR REPLACE TABLE ch04.college_scorecard_dts AS SELECT * FROM ch04.college_scorecard_gcs LIMIT 0
This is an example of a DDL statement. It will save the result of the SELECT
query (which will have no rows and not incur any charges) as a table named college_scorecard_dts
in the ch04
dataset.
Create a transfer job
On the command line, issue the following command to set up a transfer job:
bq mk --transfer_config --data_source=google_cloud_storage \ --target_dataset=ch04 --display_name ch04_college_scorecard \ --params='{"data_path_template":"gs://bigquery-oreilly-book/college_*.csv", "destination_table_name_template":"college_scorecard_dts", "file_format":"CSV", "max_bad_records":"10", "skip_leading_rows":"1", "allow_jagged_rows":"true"}'
This command specifies that the data source is to be Google Cloud Storage (if you’re transferring from YouTube Channel, for example, the data source would be youtube_channel) and that the target dataset is ch04
. The display name is used as a human-readable name on various user interfaces to refer to the transfer job.
In the case of YouTube, the destination tables are automatically partitioned on the time of import and named appropriately. However, in the case of Cloud Storage, you will need to explicitly specify this in the destination table name. For example, specifying mytable_{run_time|"%Y%m%d"}
as the destination table name template indicates that the table name should start with mytable
and have the job runtime appended using the datetime
formatting parameters specified.26 A convenient shortcut is ytable_{run_date}
. This simply uses the date in the format YYYYMMDD. It is also possible to supply a time offset. For example, to name the table based on the timestamp 45 minutes after the runtime, we could specify the following:
{run_time+45m|"%Y%m%d"}_mytable_{run_time|"%H%M%s"}
This yields a table name of the form 20180915_mytable_004500
.
The parameters themselves are specific to the data source. In the case of transferring files from Google Cloud Storage, we should specify the following:
-
The input data path, with an optional wildcard.
-
The destination table name template.
-
The file format. The transfer service from Cloud Storage supports all of the data formats that the federated querying capability supports (CSV, JSON, Avro, Parquet, etc.). In the case that the file format is CSV, we can specify CSV-specific options, such as the number of header lines to skip.
The parameters for the YouTube Channel data transfer include the page_id
(in YouTube) and table_suffix
(in BigQuery).
When you run the bq mk
command, as just shown, you will get a URL as part of an OAuth2 workflow; provide the necessary token by signing in via the browser, and the transfer job will be created.
You can also initiate a Data Transfer Service from the web UI. Initiate a transfer and choose the data source, as illustrated in Figure 4-12.
Note that we have not specified a schedule; by default, the job will run every 24 hours, starting “now.” It is possible to edit the schedule of the transfer job from the BigQuery web UI, as demonstrated in Figure 4-13.
The price of data transfers varies by the source. As of this writing, data transfers from YouTube Channel costs $5 per channel per month, whereas data transfers from Cloud Storage incur no charge. However, because the Data Transfer Service uses load jobs to load Cloud Storage data into BigQuery, this is subject to the BigQuery limits on load jobs.
Scheduled queries
BigQuery supports the scheduling of queries to run on a recurring basis and saving the results in BigQuery tables. In particular, you can use a federated query to extract data from an external data source, transform it, and load it into BigQuery. Because such scheduled queries can include DDL and DML statements, it is possible to build sophisticated workflows purely in SQL.
You can open the dialog box to set up a scheduled query by clicking the Schedule Query button in the BigQuery UI, as shown in Figure 4-14.27
Scheduled queries are built on top of the Data Transfer Service, so many of the features are similar. Thus you can specify the destination table using the same parameter settings (e.g., run_date
and run_time
) as for the Data Transfer Service (see the previous section).
Cross-region dataset copy
BigQuery supports the scheduling of cross-region dataset copies via the Data Transfer Service. In the Data Transfer Service web UI, choose Cross Region Copy as the Source. You will also need to specify as the source dataset the name of the dataset from which tables are to be copied into the destination dataset, as depicted in Figure 4-15.
Because the source and destination datasets are both BigQuery datasets, the initiator needs to have permission to initiate data transfers, list tables in the source dataset, view the source dataset, and edit the destination dataset.
A cross-region copy can also be initiated from bq mk
by specifying cross_region_copy
as the data source.
Exporting Cloud Logging Logs
Log data from GCP virtual machines (VMs) and services28 can be stored, monitored, and analyzed using Cloud Logging logs. Cloud Logging thus serves as a unified view of all the activity in your GCP account. It is helpful, therefore, to export Cloud Logging and Firebase logs to BigQuery. You can do this by using the command-line interface, a REST API, or the web UI, which is shown in Figure 4-16.
To export all the logs from the BigQuery service, click the Create Export button at the top of the Cloud Logging Logs Viewer and then fill in the following information:
-
Select BigQuery and All Logs to view the logs from BigQuery. Do you see your recent activity?
-
Provide a sink name, perhaps
bq_logs
. -
Specify the sink service: BigQuery, because we want to export to BigQuery.
-
Specify the sink destination:
ch04
, the dataset to which we want to export.
Let’s look at the logs generated by running a query. Go to the BigQuery UI and try running a query:
SELECT gender, AVG(tripduration / 60) AS avg_trip_duration FROM `bigquery-public-data`.new_york_citibike.citibike_trips GROUP BY gender HAVING avg_trip_duration > 14 ORDER BY avg_trip_duration
In the BigQuery UI, if you now do (change the date appropriately)
SELECT protopayload_auditlog.status.message FROM ch04.cloudaudit_googleapis_com_data_access_20190128
you will find a list of BigQuery log messages, including a message about reading the results of the preceding query. Depending on your date filter, you should also see the logs corresponding to earlier operations that you carried out.
Note a few things about the export capability:
-
The schema and even the table name were set by Cloud Logging. We simply specified the destination dataset.
-
The data was updated in near real time. This is an example of a streaming buffer—a BigQuery table updated in real time by Cloud Logging (although the typical latency of BigQuery queries implies that the data you see is a few seconds old).
Using Cloud Dataflow to Read/Write from BigQuery
As we’ve discussed, BigQuery supports federated querying from sources such as Google Sheets. Its Data Transfer Service supports sources such as Google Ads and YouTube. Products such as Stackdriver Logging and Firestore provide the ability to export their data to BigQuery.
What if you are using a product such as MySQL that does not provide an export capability and is not supported by the Data Transfer Service? One option is to use Cloud Dataflow. Cloud Dataflow is a fully managed service on GCP that simplifies the execution of data pipelines that are built using the open source Apache Beam API by handling operational details such as performance, scaling, availability, security, and compliance, so that users can focus on programming instead of managing server clusters. You can use Dataflow for transforming and enriching data both in streaming (real time) mode as well as in batch (historical) mode with the same reusable code across both streaming and batch pipelines.
Using a Dataflow template to load directly from MySQL
Although you could write your own Cloud Dataflow pipelines (we do that in “Writing a Dataflow job”), Dataflow template pipelines are available on GitHub for many common needs. Looking at the list of available templates, it appears that the Jdbc to BigQuery template might fit our requirements and allow us to transfer data from our MySQL database to BigQuery.
Open the GCP Cloud Console and navigate to the Cloud Dataflow section. Next, select “Create job from template,” choose “Jdbc to BigQuery,” and then fill out the resulting form with information about the source database table in MySQL and the destination table in BigQuery, as illustrated in Figure 4-17.
When you click the “Run job” button, a Dataflow job is launched. It will execute the JDBC query you specified and write the resulting rows to BigQuery.
Writing a Dataflow job
If you have a format for which there is no federated querying, no Data Transfer Service, no export capability, and no prebuilt Dataflow template, you can write your own Dataflow pipeline to load the data into BigQuery.
Even though both federated querying and a Data Transfer Service exist for CSV files on Google Cloud Storage, we will use CSV files to demonstrate what this looks like. The code is written to the Apache Beam API and can be written in Python, Java, or Go. Here, we use Python.
The crux of the code is to extract the input data, transform it by extracting and cleaning up the desired fields, and load it into BigQuery:
INPATTERNS = 'gs://bigquery-oreilly-book/college_*.csv' RUNNER = 'DataflowRunner' with beam.Pipeline(RUNNER, options = opts) as p: (p | 'read' >> beam.io.ReadFromText(INPATTERNS, skip_header_lines=1) | 'parse_csv' >> beam.FlatMap(parse_csv) | 'pull_fields' >> beam.FlatMap(pull_fields) | 'write_bq' >> beam.io.gcp.bigquery.WriteToBigQuery(bqtable, bqdataset, schema=get_output_schema()) )
In this code, we create a Beam pipeline, specifying that it will be executed by Cloud Dataflow. Other options for the RUNNER
include DirectRunner (executed on the local machine) and SparkRunner (executed by Apache Spark on a Hadoop cluster, such as Cloud Dataproc on GCP).
The first step of the pipeline is to read all of the files that match the specified input patterns. These files can be on local disk or on Google Cloud Storage. The data from the text files is streamed line by line to the next step of the pipeline, where the parse_csv
method is applied to each line:
def parse_csv(line): try: values = line.split(',') rowdict = {} for colname, value in zip(COLNAMES, values): rowdict[colname] = value yield rowdict except: logging.warn('Ignoring line ...')
The parse_csv
method splits the line based on commas and converts the values into a dictionary, where the key is the name of the column and the value is the value of the cell.
This dictionary is next sent to the method pull_fields
, which will extract the data of interest (the INSTNM
column and a few numeric fields) and transform it:
def pull_fields(rowdict): result = {} # required string fields for col in 'INSTNM'.split(','): if col in rowdict: result[col] = rowdict[col] else: logging.info('Ignoring line missing {}', col) return # float fields for col in \ 'ADM_RATE_ALL,FIRST_GEN,MD_FAMINC,SAT_AVG,MD_EARN_WNE_P10'.split(','): try: result[col] = (float) (rowdict[col]) except: result[col] = None yield result
These dictionaries with the extracted fields are streamed into BigQuery row by row. The BigQuery sink (beam.io.gcp.bigquery.WriteToBigQuery
) requires the name of the table, the name of the dataset, and an output schema of the following form:
INSTNM:string,ADM_RATE_ALL:FLOAT64,FIRST_GEN:FLOAT64,...
The BigQuery table is created if needed, and rows are appended. Other options exist as well, for example, to truncate the table (i.e., to replace it).
Running the Python program29 will launch a Dataflow job that will read the CSV file, parse it line by line, pull necessary fields, and write the transformed data to BigQuery.
Even though we demonstrated the Dataflow program on a batch pipeline (i.e., the input is not unbounded), essentially you can use the same pipeline to parse, transform, and write out records received in a streaming mode (e.g., from Cloud Pub/Sub), as will be the case in many logging and IoT applications. The Dataflow approach thus provides a way to transform data on the fly and load it into BigQuery.
Note that Dataflow uses streaming inserts to load the data into BigQuery, whether you are operating in batch mode or in streaming mode. Streaming inserts offer the advantage that the data shows up in a timely manner, into a streaming buffer, and can be queried even as the data is being written. The disadvantage is that, unlike BigQuery load jobs, streaming inserts are not free. Recall that loading data into BigQuery might be free, but because of performance reasons, there are limits on how many load jobs you can do. Streaming inserts provide a way to avoid the limits and quotas placed on load jobs without sacrificing query performance.
Using the Streaming API directly
We presented Apache Beam on Cloud Dataflow as a way to extract, transform, and load data in BigQuery in streaming mode, but it is not the only data processing framework that is capable of writing to BigQuery. If your team is more familiar with Apache Spark, writing the ETL pipeline in Spark and executing it on a Hadoop cluster (such as Cloud Dataproc on GCP) is a viable alternative to Dataflow. This is because client libraries exist for a variety of different languages, and BigQuery supports a streaming API.
We cover the client library and streaming in greater detail in Chapter 5, but here is a snippet that illustrates how to load data using the Streaming API in Python after you have a client:
# create an array of tuples and insert as data becomes available rows_to_insert = [ (u'U. Puerto Rico', 0.18,0.46,23000,1134,32000), (u'Guam U.', 0.43,0.21,28000,1234,33000) ] errors = client.insert_rows(table, rows_to_insert) # API request
As new data becomes available, the insert_rows()
method on the BigQuery client is invoked. This method in turn invokes the REST API’s tabledata.insertAll
method. The data is held in a streaming buffer by BigQuery and is available immediately for querying, although it can take up to 90 minutes for the data to become available for exporting.
Moving On-Premises Data
In Chapter 1, we discussed that one of the key factors that makes BigQuery tick is the separation of compute and storage across a petabit-per-second bandwidth network. BigQuery works best on datasets that are within the datacenter and behind the Google Cloud firewall—if BigQuery had to read its data from across the public internet or a slower network connection, it would not be as performant. Therefore, for BigQuery to work well, it is essential that the data be in the cloud.
BigQuery is a highly scalable analytics platform and is the recommended place to store structured data except that meant for real-time, transactional use. So if BigQuery is the place to store all structured data that will be used for data analytics, how do you move your on-premises data into BigQuery?
Data Migration Methods
If you have a good network with fast interconnect speeds to Google Cloud, you could use bq load
to load the data into BigQuery. As discussed in this chapter, it is preferable that the data being loaded is already present on Google Cloud Storage. You can use the command-line tool gsutil
to copy the data from on-premises to Cloud Storage.
When copying many files, especially large files, to Google Cloud Storage, use the -m
option to enable multithreading. Multithreading will allow the gsutil
tool to copy files in parallel:
gsutil -m cp /some/dir/myfiles*.csv gs://bucket/some/dir
Because it is likely that data continues to be collected, moving data is often not a one-time process but an ongoing one. One approach to handling this is to launch a Cloud Function to automatically invoke bq load
whenever a file shows up on Cloud Storage.30 As the frequency of file arrival increases (and as those files grow smaller), you are better off using Cloud Pub/Sub31 rather than Cloud Storage to store the incoming data as messages that will be processed by a Cloud Dataflow pipeline and streamed directly into BigQuery.
These three approaches—gsutil
, Cloud Functions, and Cloud Dataflow—are shown in the first three rows of Table 4-2 and work when the network connection is quite good.
What you want to migrate | Recommended migration method |
---|---|
Relatively small files | gsutil cp -m bq load |
Loading occasional (e.g., once per day) files into BigQuery when they are available | gsutil cp Cloud Function invokes bq load |
Loading streaming messages into BigQuery | Post data to Cloud Pub/Sub and then use Cloud Dataflow to stream into BigQuery Typically, you have to implement the pipeline in Python, Java, Go, etc. Alternately, use the Streaming API from the client library. This will be covered in more detail in Chapter 5. |
Hive partitions | Migrate Hive workload to Cloud Dataproc Query Hive partitions as external table |
Petabytes of data or poor network | Transfer appliancebq load |
Region to region or from other clouds | Cloud Storage Transfer Service |
Load from a MySQL dump | Open source Dataflow templates that can be configured and run |
Transfer from Google Cloud Storage, Google Ads, Google Play, etc. to BigQuery | BigQuery Data Transfer Service Set this up in BigQuery. All of the Data Transfer Service functions work similarly. |
Migrate from other data warehouses such as Amazon Redshift, Teradata, etc. | BigQuery Data Transfer Service migrates data and schema. Depending on the source data warehouse, it could use migration agents that reside in the source data warehouse and may support both one-time and incremental transfers. |
Stackdriver Logging, Firestore, etc. | These tools provide capability to export to BigQuery. Set this up in the other tool (Stackdriver, Firestore, etc.). |
Although data migration using gsutil
to stage the data on Cloud Storage and then invoking bq load
might be easy to do if you have only a few small datasets, it is more difficult if you have many datasets or if your datasets are large. As data size increases, the incidence of errors also increases. Therefore, migrating large datasets requires paying attention to details—for example, check-summing data at capture and ingest, working with firewalls so that they don’t block transfers or drop packets, avoiding exfiltration of sensitive data, and ensuring that your data is encrypted and protected against loss during and after migration.
Another issue with the gsutil
method is that it is quite likely that your business will not be able to dedicate bandwidth for data transfers because such dedicated bandwidth is often too expensive and will disrupt routine operations that convey data over the corporate network.
For cases in which it is not possible to copy data to Google Cloud because of data size or network limitations, consider using the Transfer Appliance. This is a rackable, high-capacity storage server that is shipped to you, and then you fill it up and ship it back to Google Cloud or one of its authorized partners. The Transfer Appliance is best used for lots of data (hundreds of terabytes to petabytes) for which your network situation won’t meet transfer demands.
If your data is held not on-premises but in another public cloud (such as in an Amazon Web Services Simple Storage Service bucket), you can use the Cloud Storage Transfer Service to migrate the data. Common use cases include running an application on Amazon Web Services but analyzing its log data in BigQuery. The Cloud Storage Transfer Service is also a great way to transfer large amounts of data between regions at Google.
The BigQuery Data Transfer Service automates loading data into BigQuery from Google properties like YouTube, Google Ads, and more. Other tools such as Stackdriver Logging and Firestore provide the capability to export to BigQuery.
The BigQuery Data Transfer Service also supports automatic migration of data and schema from other data warehouse products like Amazon Redshift and Teradata. In the case of Teradata, an on-premises migration agent connects to the local data warehouse and copies the data to BigQuery. Both one-time and incremental transfers are supported. Partition-by-partition extraction is also supported. At the time of writing, only data and schema are migrated, not ETL pipelines and stored procedures, although there are partner tools that can carry out automatic SQL translation and data warehouse virtualization. See the documentation for details of what is supported for each source data warehouse.
Although you can carry out data migration yourself, it is unlikely to be something that your IT department has much experience with given that migration is often just a one-time task. It might be advantageous to use a GCP authorized partner32 to carry out the data migration.
Summary
The bq
command-line tool provides a single point of entry to interact with the BigQuery service on GCP. After your data is on Google Cloud Storage, you can do a one-time load of the data using the bq load
utility. It supports schema autodetection, but it can also use a specific schema that you supply. Depending on whether your load job is CPU-bound or I/O-bound, it might be advantageous to either compress the data or leave it uncompressed.
It is possible to leave the data in place, specify the structure of the data, and use BigQuery as just the query engine. These are called external datasets, and queries over external datasets are called federated queries. Use federated queries for exploratory work, or where the primary use of the data is in the external format (e.g., low-latency queries in Cloud Bigtable or interactive work in Sheets). EXTERNAL_QUERY
provides the ability to do real-time joins against MySQL and Postgres databases without any data movement. For large, relatively stable, well-understood datasets that will be updated periodically and queried often, BigQuery native storage is a better choice. Federated queries are also useful in an Extract, Load, and Transform (ELT) workflow for which the data is not yet well understood.
It is possible to set up a scheduled transfer of data from a variety of platforms into BigQuery. Other tools also support mechanisms to export their data into BigQuery. For routine loading of data, consider using Cloud Functions; for ongoing, streaming loads, use Cloud Dataflow. It is also possible to schedule queries (including federated queries) to run periodically and have these queries load data into tables.
1 Six to eight changes every decade—see https://oreil.ly/Merow.
2 See https://abc7ny.com/news/border-of-north-and-south-carolina-shifted-on-january-1st/1678605/ and https://www.nytimes.com/2014/08/24/opinion/sunday/how-the-carolinas-fixed-their-blurred-lines.html.
3 This is set through a drop-down box in the GCP Cloud Console, or when you last did a gcloud init
. Typically, a project corresponds to a workload or to a small team.
4 For an updated list, see https://cloud.google.com/bigquery/docs/locations.
5 The autodetect algorithm continues to handle more and more corner cases, and so this might not happen for you. In general, though, schema autodetection will never be perfect. Regardless of the details of what aspect of the schema is not correctly captured, our larger point is this: use the autodetected schema as a starting point and build on top of it, as we do in this section.
6 It is possible for an integer column to be nullable, but the file is encoding NULL
values in a nonstandard way. BigQuery is interpreting the text NULL
as a string, which is why the load fails.
7 The NULL
string in the file represents a lack of data for that field, and this is what a NULL
value in our BigQuery table should mean as well.
8 As we’ve noted in earlier chapters, we believe all mentions of price to be correct as of the writing of this book, but please do refer to the relevant policy and pricing sheets (https://cloud.google.com/bigquery/pricing), as these are subject to change.
9 As of this writing, this capability does not exist in the “new” UI; you must access it through the bq
command-line tool.
10 Strings are sorted lexically. If stored as a string, “100” would be less than “20” for the same reason that “abc” comes before “de” when the two strings are sorted. When sorted numerically, 20 is less than 100, as you would expect.
11 The file contains D/M/YYYY, whereas the standard format for a date is YYYY-MM-DD (which matches ISO 8601). Although autodetect can look at multiple rows and infer whether 12/11/1965 is the 12th of November or the 11th of December, we don’t want the schema-based BigQuery load making any such assumptions. The transformation pipeline that we build later in this chapter converts the dates into the standard format. For now, let’s just treat it as a string.
12 Newline-delimited JSON often goes by the name of jsonl, or “JSON lines format.”
13 See https://blog.twitter.com/engineering/en_us/a/2013/dremel-made-simple-with-parquet.html. In Chapter 6, we discuss Capacitor, BigQuery’s backend storage format, which is the successor to ColumnIO.
14 Try it out by running the load_*.sh scripts in the 04_load of the GitHub repository for this book.
15 This particular file includes a “byte order marker” (\u0eff
) as its first character, so we remove the first few bytes using cut: cut -b 4-
.
16 The complete script is called load_external_gcs.sh and is located in the GitHub repository for this book.
17 For the grammar of a JSONPath, see https://restfulapi.net/json-jsonpath/.
18 These partners include Alooma, Informatica, and Talend. For a full, and current, list of BigQuery partners, visit https://cloud.google.com/bigquery/partners/.
19 As of this writing, there are size restrictions on the BigQuery table.
20 Due to continuing changes and improvements in the products, the graphs you see might be different.
21 This query will be slow because we are doing a regular expression match and doing so 77 billion times.
22 Most likely, the rows include data from multiple colleges, such as National University of Singapore, National University of Ireland, Massachusetts Institute of Technology, Georgia Institute of Technology, and so on.
23 If you followed along by running the setup_data.sh file in the GitHub repository, the project_id
will be your unique project ID, the instance_id
will be bqbook-instance
, and the table_name
will be logs-table
.
24 As of this writing, this capability is available only in the “old” UI at https://bigquery.cloud.google.com/ and not in the “new” UI that is part of the GCP Cloud Console (https://console.cloud.google.com/bigquery).
25 See https://cloud.google.com/bigquery/docs/locations for BigQuery dataset locations and https://cloud.google.com/storage/docs/bucket-locations for Cloud Storage locations.
26 For a list of available formatting options, see the BigQuery docs for formatting datetime columns.
27 As of this writing, this is available only in the “classic UI.”
28 Also from VMs and services running in Amazon Web Services.
29 See 04_load/dataflow.ipynb in the book’s GitHub repository.
30 We cover how to do so programmatically in Chapter 5.
31 A message bus service—see https://cloud.google.com/pubsub/.
32 See https://cloud.google.com/bigquery/providers/. As this book was being written, GCP announced its intent to acquire Alooma, a provider of cloud migration services—see https://cloud.google.com/blog/topics/inside-google-cloud/google-announces-intent-to-acquire-alooma-to-simplify-cloud-migration.
Get Google BigQuery: The Definitive Guide now with the O’Reilly learning platform.
O’Reilly members experience books, live events, courses curated by job role, and more from O’Reilly and nearly 200 top publishers.