Chapter 4. Streaming Data: Publication and Ingest with Pub/Sub and Dataflow

In Chapter 3, we developed a dashboard to explain a contingency table–based model of suggesting whether to cancel a meeting. However, the dashboard that we built lacked immediacy because it was not tied to users’ context. Because users need to be able to view a dashboard and see the information that is relevant to them at that point, we need to build a real-time dashboard with location cues.

How would we add context to our dashboard? We’d have to show maps of delays in real time. To do that, we’ll need locations of the airports, and we’ll need real-time data. Airport locations can be obtained from the US Bureau of Transportation Statistics (BTS; the same US government agency from which we obtained our historical flight data). Real-time flight data, however, is a commercial product. If we were to build a business out of predicting flight arrivals, we’d purchase that data feed. For the purposes of this book, however, let’s just simulate it.

Simulating the creation of a real-time feed from historical data has the advantage of allowing us to see both sides of a streaming pipeline (production as well as consumption). In the following section, we look at how we could stream data into the database if we were to receive it in real time.

All of the code snippets in this chapter are available in the folder 04_streaming of the book’s GitHub repository. See the README.md file in that directory for instructions on how to do the steps described in this chapter.

Designing the Event Feed

Let’s assume that we wish to create an event feed, not with all 100 fields in the raw BTS dataset, but with only the few fields that we selected in Chapter 3 as being relevant to the flight delay prediction problem (see Figure 4-1).

Figure 4-1. In Chapter 3, we created a view in BigQuery with the fields relevant to the flight delay prediction problem. In this chapter, we will simulate a real-time stream of this information.

To simulate a real-time stream of the flight information shown in Figure 4-1, we can begin by using the historical data in the flights view in BigQuery but will need to transform it further. What kinds of transformations are needed?

Transformations Needed

Note that FL_DATE is a Date while DEP_TIME is a STRING. This is because FL_DATE is of the form 2015-07-03 for July 3, 2015, whereas DEP_DATE is of the form 1406 for 2:06 p.m. local time. This is unfortunate. I’m not worried about the separation of date and time into two columns—we can remedy that. What’s unfortunate is that there is no time zone offset associated with the departure time. Thus, in this dataset, a departure time of 1406 in different rows can be different times depending on the time zone of the origin airport.

The time zone offsets (there are two, one for the origin airport and another for the destination) are not present in the data. Because the offset depends on the airport location, we need to find a dataset that contains the time zone offset of each airport and then mash this data with that dataset.1 To simplify downstream analysis, we will then put all the times in the data in a common time zone—Coordinated Universal Time (UTC) is the traditional choice of common time zone for datasets. We cannot, however, get rid of the local time—we will need the local time in order to carry out analysis, such as the typical delay associated with morning flights versus evening flights. So, although we will convert the local times to UTC, we will also store the time zone offset (e.g., −3,600 minutes) to retrieve the local time if necessary.

Therefore, we are going to carry out two transformations to the original dataset. First, we will convert all the time fields in the raw dataset to UTC. Second, in addition to the fields present in the raw data, we will add three fields to the dataset for the origin airport and the same three fields for the destination airport: the latitude, longitude, and time zone offset. These fields will be named:

DEP_AIRPORT_LAT, DEP_AIRPORT_LON, DEP_AIRPORT_TZOFFSET
ARR_AIRPORT_LAT, ARR_AIRPORT_LON, ARR_AIRPORT_TZOFFSET

The third transformation that we will need to carry out is that, for every row in the historical dataset, we will need to publish multiple events. This is because it would be too late if we wait until the aircraft has arrived to send out a single event containing all the row data. If we do this at the time the aircraft departs, our models will be violating causality constraints. Instead, we will need to send out events corresponding to each state the flight is in. Let’s choose to send out five events for each flight: when the flight is first scheduled, when the flight departs the gate, when the flight lifts off, when the flight lands, and when the flight arrives. These five events cannot have all the same data associated with them because the knowability of the columns changes during the flight. For example, when sending out an event at the departure time, we will not know the arrival time. For simplicity, we can notify the same structure, but we will need to ensure that unknowable data is marked by a null and not with the actual data value.

Architecture

Table 4-1 lists when those events can be sent out and the fields that will be included in each event.

Table 4-1. Fields that will be included in each of the five events that will be published. Compare the order of the fields with those in the schema in Figure 4-1.
Event Sent at (UTC) Fields included in event message
Scheduled CRS_DEP_TIME minus 7 days FL_DATE, UNIQUE_CARRIER, ORIGIN_AIRPORT_SEQ_ID, ORIGIN, DEST_AIRPORT_SEQ_ID, DEST, CRS_DEP_TIME [nulls], CRS_ARR_TIME [nulls], DISTANCE
Departed DEP_TIME All fields available in scheduled message, plus:
  • DEP_TIME, DEP_DELAY CANCELLED
  • CANCELLATION_CODE
  • DEP_AIRPORT_LAT, DEP_AIRPORT_LON, DEP_AIRPORT_TZOFFSET
Wheelsoff WHEELS_OFF All fields available in departed message, plus: TAXI_OUT and WHEELS_OFF
Wheelson WHEELS_ON All fields available in wheelsoff message, plus:
  • WHEELS_ON
  • DIVERTED
  • ARR_AIRPORT_LAT, ARR_AIRPORT_LON, ARR_AIRPORT_TZOFFSET
Arrived ARR_TIME All fields available in wheelson message, plus: ARR_TIME and ARR_DELAY

We will carry out the transformations needed and then store the transformed data in a database so that it is ready for the event simulation code to use. Figure 4-2 shows the steps we are about to carry out in our extract-transform-load (ETL) pipeline and the subsequent steps to simulate an event stream from these events, and then create a real-time dashboard from the simulated event stream.

Figure 4-2. Steps in our ETL (extract-transform-load) pipeline to (a) transform the raw data into events, (b) simulate the event stream, and (c) process the event stream to populate a real-time dashboard.

Getting Airport Information

In order to do the time correction, we need to obtain the latitude and longitude of each airport. The BTS has a dataset that contains this information, which we can use to do the lookup. For convenience, I’ve downloaded the data and made it publicly available at gs://data-science-on-gcp/edition2/raw/airports.csv.

Let’s examine the data to determine how to get the latitude and longitude of the airports. In Chapter 2, when I needed to explore the flights data to create the first delays model, I loaded the data into BigQuery.

Do we have to import all the data that is shared with us into our BigQuery dataset in order to do exploration? Of course not. We can query BigQuery datasets in other projects without having to make our own copies of the data. In the FROM clause of the BigQuery query, all that we have to do is to specify the name of the project that the dataset lives in:

SELECT 
  airline,
  AVG(departure_delay) AS avg_dep_delay
 FROM `bigquery-samples.airline_ontime_data.flights`
 GROUP BY airline
 ORDER by avg_dep_delay DESC

What if someone shares a comma-separated values (CSV) file with us, though? Do we have to load the data into BigQuery in order to see what’s in the file? No.

BigQuery allows us to query data in Cloud Storage through its federated query capabilities. This is the ability of BigQuery to query data that is not stored within the data warehouse product, but instead operate on data sources such as Google Sheets (a spreadsheet on Google Drive) or files on Cloud Storage. Thus, we could leave the files as CSV on Cloud Storage, define a table structure on it, and query the CSV files directly. Recall that we suggested using Cloud Storage if your primary analysis pattern involves working with your data at the level of flat files—this is a way of occasionally applying SQL queries to such datasets.

The first step is to get the schema of these files. Let’s look at the first line:

gsutil cat gs://data-science-on-gcp/edition2/raw/airports.csv | head -1

We get:

"AIRPORT_SEQ_ID","AIRPORT_ID","AIRPORT","DISPLAY_AIRPORT_NAME",
"DISPLAY_AIRPORT_CITY_NAME_FULL","AIRPORT_WAC_SEQ_ID2","AIRPORT_WAC",
"AIRPORT_COUNTRY_NAME","AIRPORT_COUNTRY_CODE_ISO","AIRPORT_STATE_NAME",
"AIRPORT_STATE_CODE","AIRPORT_STATE_FIPS","CITY_MARKET_SEQ_ID","CITY_MARKET_ID",
"DISPLAY_CITY_MARKET_NAME_FULL","CITY_MARKET_WAC_SEQ_ID2","CITY_MARKET_WAC",
"LAT_DEGREES","LAT_HEMISPHERE","LAT_MINUTES","LAT_SECONDS","LATITUDE",
"LON_DEGREES","LON_HEMISPHERE","LON_MINUTES","LON_SECONDS","LONGITUDE",
"UTC_LOCAL_TIME_VARIATION","AIRPORT_START_DATE","AIRPORT_THRU_DATE",
"AIRPORT_IS_CLOSED","AIRPORT_IS_LATEST"

Use this header to write a BigQuery schema string of the format (specify STRING for any column you are not sure about, since you can always CAST it to the appropriate format when querying the data):

AIRPORT_SEQ_ID:INTEGER,AIRPORT_ID:STRING,AIRPORT:STRING, ...

Alternately, if you have a similar dataset lying around, start from its schema and edit it:

bq show --format=prettyjson dsongcp.sometable > starter.json

Once we have the schema of the GCS files, we can make a table definition for the federated source:2

bq mk --external_table_definition= \
./airport_schema.json@CSV=gs://data-science-on-gcp/edition2/raw/airports.csv \
dsongcp.airports_gcs

If you visit the BigQuery web console now, you should see a new table listed in the dsongcp dataset (reload the page if necessary). This is a federated data source in that its storage remains the CSV file on Cloud Storage. Yet you can query it just like any other BigQuery table:

SELECT 
AIRPORT_SEQ_ID, AIRPORT_ID, AIRPORT, DISPLAY_AIRPORT_NAME,
LAT_DEGREES, LAT_HEMISPHERE, LAT_MINUTES, LAT_SECONDS, LATITUDE
FROM dsongcp.airports_gcs
WHERE DISPLAY_AIRPORT_NAME LIKE '%Seattle%'

In the preceding query, I am trying to find in the file which airport column and which latitude column I need to use. The result indicates that AIRPORT and LATITUDE are the columns of interest, but that there are several rows corresponding to the airport SEA:

Row AIRPORT_​SEQ_​ID AIRPORT_​ID AIRPORT DISPLAY_​AIRPORT_​NAME LAT_​DEGREES LAT_​HEMISPHERE LAT_​MINUTES LAT_​SECONDS LATITUDE
1 1247701 12477 JFB Seattle 1st National.Bank Helipad 47 N 36 25 47.60694444
2 1474701 14747 SEA Seattle International 47 N 26 50 47.44722222
3 1474702 14747 SEA Seattle/Tacoma International 47 N 26 57 47.44916667
4 1474703 14747 SEA Seattle/Tacoma International 47 N 27 0 47.45

Fortunately, there is a column that indicates which row is the latest information, so what I need to do is:

SELECT 
  AIRPORT, LATITUDE, LONGITUDE
FROM dsongcp.airports_gcs
WHERE AIRPORT_IS_LATEST = 1 AND AIRPORT = 'DFW'

Don’t get carried away by federated queries, though. The most appropriate uses of federated sources involve frequently changing, relatively small datasets that need to be joined with large datasets in BigQuery native tables. Because the columnar storage in BigQuery is so fundamental to its performance, we will load most data into BigQuery’s native format.

Sharing Data

Now that we have the airports.csv in Cloud Storage and the airports’ dataset in BigQuery, it is quite likely that our colleagues will want to use this data too. Let’s share it with them—one of the benefits of bringing your data to the cloud (and more specifically into a data warehouse) is to allow the mashing of datasets across organizational boundaries. So, unless you have a clear reason not to do so, like security precautions, try to make your data widely accessible.

Costs of querying are borne by the person submitting the query to the BigQuery engine, so you don’t need to worry that you are incurring additional costs for your division by doing this. It is possible to make a GCS bucket “requester-pays” to get the same sort of billing separation for data in Cloud Storage.

Sharing a Cloud Storage dataset

To share some data in Cloud Storage, use gsutil:

gsutil -m acl ch -r -u abc@xyz.com:R gs://$BUCKET/data

In the preceding command, the -m indicates multithreaded mode, the -r provides access recursively starting with the top-level directory specified, and the -u indicates that this is a user being granted read (:R) access.

We could provide read access to the entire organization or a Google Group using -g:

gsutil -m acl ch -r -g xyz.com:R gs://$BUCKET/data

Sharing a BigQuery dataset

BigQuery sharing can happen at the granularity of a column, a table, or a dataset. None of our BigQuery tables hold personally identifiable or confidential information. Therefore, there is no compelling access-control reason to control the sharing of flight information at a column or table level. So, we can share the dsongcp dataset that was created in Chapter 2, and we can make everyone in the organization working on this project a bigquery.user so that they can carry out queries on this dataset. You can do this from the BigQuery web console from the dataset menu.

In some cases, you might find that your dataset or table contains certain columns that have personally identifying or confidential information. You might need to restrict access to those columns while leaving the remainder of the table accessible to a wider audience.3 Whenever you need to provide access to a subset of a table in BigQuery (whether it is specific columns or specific rows), you can use views. Put the table itself in a dataset that is accessible to a very small set of users. Then, create a view on this table that will pull out the relevant columns and rows and save this view in a separate dataset that has wider accessibility. Your users will query only this view, and because the personally identifying or confidential information is not even present in the view, the chances of inadvertent leakage are lowered.

Another way to restrict access at the level of a BigQuery table is to use Cloud IAM. To control access at the level of a column, you’d use policy tags and Data Catalog.

Dataplex and Analytics Hub

Once you get into the habit of sharing data widely, governance can become problematic. It is better if you can administer data across Cloud Storage in a consistent manner and track lineage, etc. That’s what Dataplex is for.

It can be rather cumbersome to share tables and datasets one at a time with one user or one group at a time. To implement sharing at scale and get statistics on how people are using the data you have shared, use Analytics Hub.

Time Correction

Correcting times reported in local time to UTC is not a simple endeavor. There are several steps:

  • Local time depends on, well, the location. The flight data that we have records only the name of the airport (e.g., ALB for Albany). We, therefore, need to obtain the latitude and longitude given an airport code. The BTS has a dataset that contains this information, which we can use to do the lookup.

  • Given a latitude/longitude pair, we need to look up the time zone from a map of global time zones. For example, given the latitude and longitude of the airport in Albany, we would need to get back America/New_York. There are several web services that do this, but the Python package timezonefinder is a more efficient option because it works completely offline. The drawback is that this package does not handle oceanic areas and some historical time zone changes,4 but that’s a trade-off that we can make for now.

  • The time zone offset (from Greenwich Mean Time [GMT/UTC]) at a location changes during the year due to daylight savings corrections. In New York, for example, it is six hours in summer and five hours in winter behind UTC. Given the time zone (America/New_York), therefore, we also need the local departure date and time (say Jan. 13, 2015, 2:08 p.m.) in order to find the corresponding time zone offset. The Python package pytz provides this capability by using the underlying operating system.

The problem of ambiguous times still remains—every instant between 01:00 and 02:00 local time occurs twice on the day that the clock switches from daylight savings time (summer time) to standard time (winter time). So, if our dataset has a flight arriving at 01:30, we need to make a choice of what time that represents. In a real-world situation, you would look at the typical duration of the flight and choose the one that is more likely. For the purposes of this book, I’ll always assume the winter time (i.e., is_dst is False) on the dubious grounds that it is the standard time zone for that location.

The complexity of these steps should, I hope, convince you to follow best practices when storing time.

Apache Beam/Cloud Dataflow

The canonical way to build data pipelines on Google Cloud Platform is to use Cloud Dataflow. Cloud Dataflow is an externalization of technologies called Flume and MillWheel that have been in widespread use at Google for several years. It employs a programming model that handles both batch and streaming data in a uniform manner, thus providing the ability to use the same codebase both for batch and continuous stream processing. The code itself is written in Apache Beam, either in Java, Python, or Go,5 and it is portable in the sense that it can be executed on multiple execution environments, including Apache Flink and Apache Spark. On GCP, Cloud Dataflow provides a fully managed (serverless) service that is capable of executing Beam pipelines. Resources are allocated on demand, and they autoscale so as to achieve both minimal latency and high resource utilization.

Beam programming involves building a pipeline (a series of data transformations) that is submitted to a runner. The runner will build a graph and then stream data through it. Each input dataset comes from a source and each output dataset is sent to a sink. Figure 4-3 illustrates the Beam pipeline that we are about to build.

Compare the steps in Figure 4-2 with the block diagram of the ETL (extract-transform-load) pipeline in Figure 4-3. Let’s build the data pipeline piece by piece.

Figure 4-3. The Dataflow pipeline that we are about to build.

Parsing Airports Data

You can download information about the location of airports from the BTS website. I selected all of the fields, downloaded the CSV file to my local hard drive, extracted it, and compressed it with gzip. The gzipped airports file is available in the GitHub repository for this book.

In order to use Apache Beam from Cloud Shell, we need to install it into our Python environment. Go ahead and install the time zone packages also at this time:6

virtualenv ~/beam_env
source ~/beam_env/bin/activate
python3 -m pip install --upgrade \
             timezonefinder pytz \
             'apache-beam[gcp]'

The Read transform in the Beam pipeline that follows reads in the airports file line by line:7

with beam.Pipeline('DirectRunner') as pipeline:
   airports = (pipeline
      | beam.io.ReadFromText('airports.csv.gz')
      | beam.Map(lambda line: next(csv.reader([line])))
      | beam.Map(lambda fields: (fields[0], (fields[21], fields[26])))
   )

For example, suppose that one of the input lines read out of the text file source is the following:

1000401,10004,"04A","Lik Mining Camp","Lik, AK",101,1,"United
States","US","Alaska","AK","02",3000401,30004,"Lik,
AK",101,1,68,"N",5,0,68.08333333,163,"W",10,0,-163.16666667,"",2007-07-01,,0,1,

The first Map takes this line and passes it to a CSV reader that parses it (taking into account fields like Lik, AK that have commas in them) and pulls out the fields as a list of strings. These fields are then passed to the next transform. The second Map takes the fields as input and outputs a tuple of the form (the extracted fields are shown in bold in the previous example):

(1000401, (68.08333333,-163.16666667))

The first number is the unique airport code (we use this, rather than the airport’s three-letter code, because airport locations can change over time), and the next two numbers are the latitude/longitude pair for the airport’s location. The variable airports, which is the result of these three transformations, is not a simple in-memory list of these tuples. Instead, it is an immutable collection, termed a PCollection, that you can take out-of-memory and distribute.

We can write the contents of the PCollection to a text file to verify that the pipeline is behaving correctly:

(airports
   | beam.Map(lambda airport_data: '{},{}'.format(airport_data[0], ',' \
       .join(airport_data[1])) )
   | beam.io.WriteToText('extracted_airports')
)

Try this out: the code, in 04_streaming/transform/df01.py, is just a Python program that you can run from the command line. First, install the Apache Beam package if you haven’t yet done so and then run the program df01.py while you are in the directory containing the GitHub repository of this book:

cd 04_streaming/simulate
./install_packages.sh
python3 ./df01.py

This runs the code in df01.py locally. Later, we will change the pipeline line to:

with beam.Pipeline('DataflowRunner') as pipeline:

and get to run the pipeline on the Google Cloud Platform using the Cloud Dataflow service. With that change, simply running the Python program launches the data pipeline on multiple workers in the cloud. As with many distributed systems, the output of Cloud Dataflow is potentially sharded to one or more files. You will get a file whose name begins with “extracted_airports” (mine was extracted_airports-00000-of-00001), a few of whose lines might look something like this:

1000101,58.10944444,-152.90666667
1000301,65.54805556,-161.07166667

The columns are AIRPORT_SEQ_ID, LATITUDE, and LONGITUDE—the order of the rows you get depends on which of the parallel workers finished first, so it could be different.

Adding Time Zone Information

Let’s now change the code to determine the time zone corresponding to a latitude/longitude pair. In our pipeline, rather than simply emitting the latitude/longitude pair, we emit a list of three items: latitude, longitude, and time zone:

airports = (pipeline
      | beam.Read(beam.io.ReadFromText('airports.csv.gz'))
      | beam.Map(lambda line: next(csv.reader([line])))
      | beam.Map(lambda fields: (fields[0], addtimezone(fields[21], fields[26])))
   )

The lambda keyword in Python sets up an anonymous function. In the case of the first use of lambda in the preceding snippet, that method takes one parameter (line) and returns the stuff following the colon. We can determine the time zone by using the timezonefinder package:8

def addtimezone(lat, lon):
      import timezonefinder
      tf = timezonefinder.TimezoneFinder()
      lat = float(lat)
      lon = float(lon)
      return (lat, lon, tf.timezone_at(lng=lon, lat=lat))

The location of the import statement in the preceding example might look strange (most Python imports tend to be at the top of the file), but this import-within-the-function pattern is recommended by Cloud Dataflow so that,9 when we submit it to the cloud, pickling of the main session doesn’t end up pickling imported packages also.10

For now, though, we are going to run this (df02.py) locally. This will take a while because the time zone computation involves a large number of polygon intersection checks and because we are running locally, not (yet!) distributed in the cloud. So, let’s speed it up by adding a filter to reduce the number of airport locations we have to look up:

 | beam.io.ReadFromText('airports.csv.gz')
 | beam.Filter(lambda line: "United States" in line 
                              and line[-2:] == '1,')

The BTS flight delay data is only for US domestic flights, so we don’t need the time zones of airports outside the United States. The reason for the second check is that airport locations change over time, but we are interested only in the current location of the airport. For example, here are the airport locations for ORD (or Chicago):

1393001,...,"ORD",...,41.97805556,...,-87.90611111,...,1950-01-01,2011-06-30,0,0,
1393002,...,"ORD",...,41.98166667,...,-87.90666667,...,2011-07-01,2013-09-30,0,0,
1393003,...,"ORD",...,41.97944444,...,-87.90750000,...,2013-10-01,2015-09-30,0,0,
1393004,...,"ORD",...,41.97722222,...,-87.90805556,...,2015-10-01,,0,1,

The first row captures the location of Chicago’s airport between 1950 and June 30, 2011.11 The second row is valid from July 1, 2011, to September 30, 2013. The last row, however, is the current location and this is marked by the last column (the AIRPORT_IS_LATEST field) being 1.

That’s not the only line we are interested in, however! Flights before 2015-10-01 will report the ID of the second to last row. We could add a check for this, but this looks rather dicey for a slight bit of optimization. So, I’ll remove that last check, so that we have only:

 | beam.io.ReadFromText('airports.csv.gz')
 | beam.Filter(lambda line: "United States" in line)

Once I do this and run df02.py, the extracted information for the airports looks like this:

1672301,62.03611111,-151.45222222,America/Anchorage
1672401,43.87722222,-73.41305556,America/New_York
1672501,40.75722222,-119.21277778,America/Los_Angeles

The last column in the extracted information has the time zone, which was determined from the latitude and longitude of each airport.

Converting Times to UTC

Now that we have the time zone for each airport, we are ready to tackle converting the times in the flights data to UTC. At the time that we are developing the program, we’d prefer not to process all the months we have in BigQuery—waiting for the query each time we run the program will be annoying. Instead, we will create a small sample of the flights data in BigQuery against which to develop our code:12

SELECT *
FROM dsongcp.flights
WHERE RAND() < 0.001

This returns about 6,000 rows. We can use the BigQuery web UI to save these results as a JavaScript Object Notation (JSON) file. However, I prefer to script things out:13

bq query --destination_table dsongcp.flights_sample \
   --replace --nouse_legacy_sql \
   'SELECT * FROM dsongcp.flights WHERE RAND() < 0.001'

bq extract --destination_format=NEWLINE_DELIMITED_JSON \
   dsongcp.flights_sample  \
   gs://${BUCKET}/flights/ch4/flights_sample.json

gsutil cp gs://${BUCKET}/flights/ch4/flights_sample.json

This creates a file named flight_sample.json, a row of which looks similar to this:

{"FL_DATE":"2015-04-28","UNIQUE_CARRIER":"EV","ORIGIN_AIRPORT_SEQ_ID":"1013503",
"ORIGIN":"ABE","DEST_AIRPORT_SEQ_ID":"1039705","DEST":"ATL",
"CRS_DEP_TIME":"1600","DEP_TIME":"1555","DEP_DELAY":-5,"TAXI_OUT":7,
"WHEELS_OFF":"1602","WHEELS_ON":"1747","TAXI_IN":4,"CRS_ARR_TIME":"1809",
"ARR_TIME":"1751","ARR_DELAY":-18,"CANCELLED":false,"DIVERTED":false,
"DISTANCE":"692.00"}

Reading the flights data starts out similar to reading the airports data:14

flights = (pipeline
 | 'flights:read' >> beam.io.ReadFromText('flights_sample.json')
 | 'flights:parse' >> beam.Map(lambda line: json.loads(line))

This is the same code as when we read the airports.csv.gz file, except that I am also giving a name (flights:read) to this transform step and using a JSON parser instead of a CSV parser. Note the syntax here:

 | 'name-of-step' >> transform_function()

The next step, though, is different because it involves two PCollections. We need to join the flights data with the airports data to find the time zone corresponding to each flight. To do that, we make the airports PCollection a “side input.” Side inputs in Beam are like views into the original PCollection, and are either lists or dicts (dictionaries). In this case, we will create a dict that maps airport ID to information about the airports:

flights = (pipeline
 |'flights:read' >> beam.io.ReadFromText('flights_sample.json')
 | 'flights:parse' >> beam.Map(lambda line: json.loads(line))
 |'flights:tzcorr' >> beam.FlatMap(tz_correct, 
                                   beam.pvalue.AsDict(airports))
)

The fact that the PCollection has to be a Python list or a Python dict means that side inputs have to be small enough to fit into memory. If you need to join two large PCollections that will not fit into memory, use a CoGroupByKey.

The FlatMap() method calls out to a method tz_correct(), which takes the parsed content of a line from flights_sample.json (containing a single flight’s information) and a Python dictionary (containing all the airports’ time zone information):

def tz_correct(fields, airport_timezones):
   try:
      # convert all times to UTC
      # ORIGIN_AIRPORT_SEQ_ID is the name of JSON attribute
      dep_airport_id = fields["ORIGIN_AIRPORT_SEQ_ID"]
      arr_airport_id = fields["DEST_AIRPORT_SEQ_ID"]
      # airport_id is the key to airport_timezones dict
      # and the value is a tuple (lat, lon, timezone)
      dep_timezone = airport_timezones[dep_airport_id][2]
      arr_timezone = airport_timezones[arr_airport_id][2]

      for f in ["CRS_DEP_TIME", "DEP_TIME", "WHEELS_OFF"]:
         fields[f] = as_utc(fields["FL_DATE"], fields[f], dep_timezone)
      for f in ["WHEELS_ON", "CRS_ARR_TIME", "ARR_TIME"]:
         fields[f] = as_utc(fields["FL_DATE"], fields[f], arr_timezone)

      yield json.dumps(fields)
   except KeyError as e:
      logging.exception(" Ignoring " + line + 
                        " because airport is not known")

Why FlatMap() instead of Map to call tz_correct()? A Map is a 1-to-1 relation between input and output, whereas a FlatMap() can return 0–N outputs per input. The way it does this is with a Python generator function (i.e., the yield keyword—think of the yield as a return that returns one item at a time until there is no more data to return). Using FlatMap here allows us to ignore any flight information corresponding to unknown airports—even though this doesn’t happen in the historical data we are processing, a little bit of defensive programming doesn’t hurt.

The tz_correct() code gets the departure airport ID from the flight’s data and then looks up the time zone for that airport ID from the airport’s data. After it has the time zone, it calls out to the method as_utc() to convert each of the datetimes reported in that airport’s time zone to UTC:

def as_utc(date, hhmm, tzone):
   try:
      if len(hhmm) > 0 and tzone is not None:
         import datetime, pytz
         loc_tz = pytz.timezone(tzone)
         loc_dt = loc_tz.localize(datetime.datetime.strptime(date,'%Y-%m-%d'),
                                  is_dst=False)
         loc_dt += datetime.timedelta(hours=int(hhmm[:2]),
                                      minutes=int(hhmm[2:]))
         utc_dt = loc_dt.astimezone(pytz.utc)
         return utc_dt.strftime('%Y-%m-%d %H:%M:%S')
      else:
         return '' # empty string corresponds to canceled flights
   except ValueError as e:
      print('{} {} {}'.format(date, hhmm, tzone))
      raise e

As before, you can run this locally. To do that, run df03.py. A line that originally (in the raw data) looked like:

{"FL_DATE":"2015-11-05","UNIQUE_CARRIER":"DL","ORIGIN_AIRPORT_SEQ_ID":"1013503",
"ORIGIN":"ABE","DEST_AIRPORT_SEQ_ID":"1039705","DEST":"ATL",
"CRS_DEP_TIME":"0600","DEP_TIME":"0556","DEP_DELAY":-4,"TAXI_OUT":12,
"WHEELS_OFF":"0608","WHEELS_ON":"0749","TAXI_IN":10,"CRS_ARR_TIME":"0818",
"ARR_TIME":"0759","ARR_DELAY":-19,"CANCELLED":false,
"DIVERTED":false,"DISTANCE":"692.00"}

now becomes:

{"FL_DATE": "2015-11-05", "UNIQUE_CARRIER": "DL", 
"ORIGIN_AIRPORT_SEQ_ID": "1013503", "ORIGIN": "ABE", 
"DEST_AIRPORT_SEQ_ID": "1039705", "DEST": "ATL", 
"CRS_DEP_TIME": "2015-11-05 11:00:00", "DEP_TIME": "2015-11-05 10:56:00", 
"DEP_DELAY": -4, "TAXI_OUT": 12, "WHEELS_OFF": "2015-11-05 11:08:00", 
"WHEELS_ON": "2015-11-05 12:49:00", "TAXI_IN": 10, 
"CRS_ARR_TIME": "2015-11-05 13:18:00", "ARR_TIME": "2015-11-05 12:59:00", 
"ARR_DELAY": -19, "CANCELLED": false, "DIVERTED": false, "DISTANCE": "692.00"}

All the times have been converted to UTC. For example, the 0759 time of arrival in Atlanta has been converted to UTC to become 12:59:00.

Correcting Dates

Look carefully at the following line involving a flight from Honolulu (HNL) to Dallas–Fort Worth (DFW). Do you notice anything odd?

{"FL_DATE": "2015-03-06", "UNIQUE_CARRIER": "AA", 
"ORIGIN_AIRPORT_SEQ_ID": "1217302", "ORIGIN": "HNL", 
"DEST_AIRPORT_SEQ_ID": "1129803", "DEST": "DFW", 
"CRS_DEP_TIME": "2015-03-07 05:30:00", "DEP_TIME": "2015-03-07 05:22:00", 
"DEP_DELAY": -8, "TAXI_OUT": 40, "WHEELS_OFF": "2015-03-07 06:02:00", 
"WHEELS_ON": "2015-03-06 12:32:00", "TAXI_IN": 7, 
"CRS_ARR_TIME": "2015-03-06 12:54:00", "ARR_TIME": "2015-03-06 12:39:00", 
"ARR_DELAY": -15, "CANCELLED": false, "DIVERTED": false, "DISTANCE": "3784.00"}

Examine the departure time in Honolulu and the arrival time in Dallas—the flight is arriving the day before it departed! That’s because the flight date (2015-03-06) is the date of departure in local time. Add in a time difference between airports, and it is quite possible that it is not the date of arrival. We’ll look for these situations and add 24 hours if necessary. This is, of course, quite a hack (have I already mentioned that times ought to be stored in UTC?!):

def add_24h_if_before(arrtime, deptime):
   import datetime
   if len(arrtime) > 0 and len(deptime) > 0 and arrtime < deptime:
      adt = datetime.datetime.strptime(arrtime, '%Y-%m-%d %H:%M:%S')
      adt += datetime.timedelta(hours=24)
      return adt.strftime('%Y-%m-%d %H:%M:%S')
   else:
      return arrtime

The 24-hour hack is called just before the yield in tz_correct.15 Now that we have new data about the airports, it is probably wise to add it to our dataset. Also, as remarked earlier, we want to keep track of the time zone offset from UTC because some types of analysis might require knowledge of the local time. Thus, the new tz_correct code becomes the following:

def tz_correct(line, airport_timezones):
   fields = json.loads(line)
   try:
      # convert all times to UTC
      dep_airport_id = fields["ORIGIN_AIRPORT_SEQ_ID"]
      arr_airport_id = fields["DEST_AIRPORT_SEQ_ID"]
      dep_timezone = airport_timezones[dep_airport_id][2]
      arr_timezone = airport_timezones[arr_airport_id][2]

      for f in ["CRS_DEP_TIME", "DEP_TIME", "WHEELS_OFF"]:
         fields[f], deptz = as_utc(fields["FL_DATE"], fields[f], dep_timezone)
      for f in ["WHEELS_ON", "CRS_ARR_TIME", "ARR_TIME"]:
         fields[f], arrtz = as_utc(fields["FL_DATE"], fields[f], arr_timezone)

      for f in ["WHEELS_OFF", "WHEELS_ON", "CRS_ARR_TIME", "ARR_TIME"]:
         fields[f] = add_24h_if_before(fields[f], fields["DEP_TIME"])

      fields["DEP_AIRPORT_TZOFFSET"] = deptz
      fields["ARR_AIRPORT_TZOFFSET"] = arrtz
      yield json.dumps(fields)
   except KeyError as e:
      logging.exception(" Ignoring " + line + " because airport is not known")

When I run df04.py, which has these changes applied to it, the flight from Honolulu to Dallas becomes:

{"FL_DATE": "2015-03-06", "UNIQUE_CARRIER": "AA", 
"ORIGIN_AIRPORT_SEQ_ID": "1217302", "ORIGIN": "HNL", 
"DEST_AIRPORT_SEQ_ID": "1129803", "DEST": "DFW", 
"CRS_DEP_TIME": "2015-03-07 05:30:00", "DEP_TIME": "2015-03-07 05:22:00", 
"DEP_DELAY": -8, "TAXI_OUT": 40, "WHEELS_OFF": "2015-03-07 06:02:00", 
"WHEELS_ON": "2015-03-07 12:32:00", "TAXI_IN": 7, 
"CRS_ARR_TIME": "2015-03-07 12:54:00", "ARR_TIME": "2015-03-07 12:39:00", 
"ARR_DELAY": -15, "CANCELLED": false, "DIVERTED": false, "DISTANCE": "3784.00", 
"DEP_AIRPORT_TZOFFSET": -36000.0, "ARR_AIRPORT_TZOFFSET": -21600.0}

As you can see, the dates have now been corrected (see the bolded parts).

Creating Events

After we have our time-corrected data, we can move on to creating events to publish into Pub/Sub. For now, we’ll limit ourselves to just the departed and arrived messages—we can rerun the pipeline to create the additional events if and when our modeling efforts begin to use other events:

def get_next_event(fields):
    if len(fields["DEP_TIME"]) > 0:
       event = dict(fields)  # copy
       event["EVENT_TYPE"] = "departed"
       event["EVENT_TIME"] = fields["DEP_TIME"]
       for f in ["TAXI_OUT", "WHEELS_OFF", "WHEELS_ON", 
                 "TAXI_IN", "ARR_TIME", "ARR_DELAY", "DISTANCE"]:
          event.pop(f, None)  # not knowable at departure time
       yield event
    if len(fields["ARR_TIME"]) > 0:
       event = dict(fields)
       event["EVENT_TYPE"] = "arrived"
       event["EVENT_TIME"] = fields["ARR_TIME"]
       yield event

Essentially, we pick up the departure time and create a departed event at that time after making sure to remove the fields (such as arrival delay) we cannot know at the departure time. Similarly, we use the arrival time to create an arrived event, as shown in Figure 4-4.

In the pipeline, the event creation code is called on the flights PCollection after the conversion to UTC has happened:

flights = (pipeline
  |'flights:read' >> beam.io.ReadFromText('flights_sample.json')
  |'flights:tzcorr' >> beam.FlatMap(tz_correct,
                           beam.pvalue.AsDict(airports))
)
events = flights | beam.FlatMap(get_next_event)
Figure 4-4. Events, when they are published, and some of the fields present in those events.

If we now run the pipeline,16 we will see two events for each flight:

{"FL_DATE": "2015-04-28", "UNIQUE_CARRIER": "EV", 
"ORIGIN_AIRPORT_SEQ_ID": "1013503", "ORIGIN": "ABE", 
"DEST_AIRPORT_SEQ_ID": "1039705", "DEST": "ATL", 
"CRS_DEP_TIME": "2015-04-28 20:00:00", "DEP_TIME": "2015-04-28 19:55:00",
"DEP_DELAY": -5, "CRS_ARR_TIME": "2015-04-28 22:09:00", "CANCELLED": false, 
"DIVERTED": false, "DEP_AIRPORT_TZOFFSET": -14400.0, 
"ARR_AIRPORT_TZOFFSET": -14400.0, "EVENT_TYPE": "departed", 
"EVENT_TIME": "2015-04-28 19:55:00"}
{"FL_DATE": "2015-04-28", "UNIQUE_CARRIER": "EV", 
"ORIGIN_AIRPORT_SEQ_ID": "1013503", "ORIGIN": "ABE", 
"DEST_AIRPORT_SEQ_ID": "1039705", "DEST": "ATL", 
"CRS_DEP_TIME": "2015-04-28 20:00:00", "DEP_TIME": "2015-04-28 19:55:00",
"DEP_DELAY": -5, "TAXI_OUT": 7, "WHEELS_OFF": "2015-04-28 20:02:00", 
"WHEELS_ON": "2015-04-28 21:47:00", "TAXI_IN": 4, 
"CRS_ARR_TIME": "2015-04-28 22:09:00", "ARR_TIME": "2015-04-28 21:51:00", 
"ARR_DELAY": -18, "CANCELLED": false, "DIVERTED": false, "DISTANCE": "692.00",
"DEP_AIRPORT_TZOFFSET": -14400.0, "ARR_AIRPORT_TZOFFSET": -14400.0, 
"EVENT_TYPE": "arrived", "EVENT_TIME": "2015-04-28 21:51:00"}

The first event is a departed event and is to be published at the departure time, while the second event is an arrived event and is to be published at the arrival time. The departed event has a number of missing fields corresponding to data that is not known at that time.

Once we have this code working, let’s add a third event that will be sent when the plane takes off:

    if len(fields["WHEELS_OFF"]) > 0:
       event = dict(fields)  # copy
       event["EVENT_TYPE"] = "wheelsoff"
       event["EVENT_TIME"] = fields["WHEELS_OFF"]
       for f in ["WHEELS_ON", "TAXI_IN", 
                 "ARR_TIME", "ARR_DELAY", "DISTANCE"]:
          event.pop(f, None)  # not knowable at departure time
       yield event

At this point, we haven’t created a wheelsdown event yet.

Reading and Writing to the Cloud

So far, we have been reading and writing local files. However, once we start to run our code in production, in a serverless environment, the concept of a local drive no longer makes sense. We have to read and write from Cloud Storage. Also, because this is structured data, it is preferable to read and write to BigQuery—recall that we loaded our full dataset into BigQuery in Chapter 2. Now, we’d like to put the transformed (time-corrected) data there as well.

Fortunately, all this involves is changing the source or the sink. The rest of the pipeline stays the same. For example, in the previous section (see 04_streaming/transform/df05.py), we read the airports.csv.gz as:

| 'airports:read' >> beam.io.ReadFromText('airports.csv.gz')

Now, in order to read the equivalent file from Cloud Storage, we change the corresponding code in 04_streaming/transform/df06.py to be:

airports_filename = 'gs://{}/flights/airports/airports.csv.gz'.format(
                          bucket)
...
| 'airports:read' >> beam.io.ReadFromText(airports_filename)

Of course, we’ll have to make sure to upload the file to Cloud Storage and make it readable by whoever is going to run this code. Having the data file be available in our GitHub repository was not going to scale anyway—Cloud Storage (or BigQuery) is the right place for data.

In df05.py, I had to read a local file that contained the JSON export of a smart part of the dataset and use a JSON parser to obtain a dict:

 | 'flights:read' >> beam.io.ReadFromText('flights_sample.json')
 | 'flights:parse' >> beam.Map(lambda line: json.loads(line))

In df06.py, the corresponding code becomes simpler because the BigQuery reader returns a dict where the column names of the result set are the keys:

'flights:read' >> beam.io.ReadFromBigQuery(
      query='SELECT * FROM dsongcp.flights WHERE rand() < 0.001', 
      use_standard_sql=True)

Of course when we run it for real, we’ll change the query to remove the sampling (rand() < 0.001) so that we can process the entire dataset.

Similarly, where before we wrote to a local file using:

  | 'flights:tostring' >> beam.Map(lambda fields: json.dumps(fields))
  | 'flights:out' >> beam.io.textio.WriteToText('all_flights')

we’ll change the code to write to Cloud Storage using:

 flights_output = 'gs://{}/flights/tzcorr/all_flights'.format(bucket)
... 
 | 'flights:tostring' >> beam.Map(lambda fields: json.dumps(fields))
 | 'flights:gcsout' >> beam.io.textio.WriteToText(flights_output)

We can write the same data to a BigQuery table also:

flights_schema = \
     'FL_DATE:date,UNIQUE_CARRIER:string,...CANCELLED:boolean'
...
 | 'flights:bqout' >> beam.io.WriteToBigQuery(
     'dsongcp.flights_tzcorr', schema=flights_schema,
     write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE,
     create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED
    )

Note that we need to provide a schema when writing to BigQuery, and specify what to do if the table already exists (we ask for the table to be truncated and contents replaced) and if the table doesn’t already exist (we ask for the table to be created).

We can try running this code, but the pipeline will require a few extra parameters. So where we used to have:

  with beam.Pipeline('DirectRunner') as pipeline:

we now need:

  argv = [
      '--project={0}'.format(project),
      '--staging_location=gs://{0}/flights/staging/'.format(bucket),
      '--temp_location=gs://{0}/flights/temp/'.format(bucket),
      '--runner=DirectRunner'
   ]
   with beam.Pipeline(argv=argv) as pipeline:

The reason is that when we read from BigQuery, we are providing a query:

'flights:read' >> beam.io.ReadFromBigQuery(
      query='SELECT * FROM dsongcp.flights WHERE rand() < 0.001', 
      use_standard_sql=True)

So, we need to provide the project that needs to be billed. In addition, and this is an implementation detail, some temporary data needs to be staged and cached in Cloud Storage, and we need to provide the pipeline a place to store this temporary data—we will never be sure which operations will require staging or caching, so it’s a good idea to always specify a scratch location in Cloud Storage for this purpose.

We can run df06.py and then check that new tables are created in BigQuery. So far, we have been running the code locally, either on your laptop or in Cloud Shell.

Next, let’s look at how to run this in Cloud Dataflow, which is the GCP managed service for running Apache Beam pipelines.

Running the Pipeline in the Cloud

That last run took a few minutes on the local virtual machine (VM), and we were processing only a thousand lines! Let’s change the code (see df07.py) to process all the rows in the BigQuery view:

'flights:read' >> beam.io.ReadFromBigQuery(
      query='SELECT * FROM dsongcp.flights', 
      use_standard_sql=True)

Now that we have much more data, we need to distribute the work, and to do that, we will change the runner from DirectRunner (which runs locally) to DataflowRunner (which lobs the job off to the cloud and scales it out):

   argv = [
      '--project={0}'.format(project),
      '--job_name=ch04timecorr',
      '--save_main_session',
      '--staging_location=gs://{0}/flights/staging/'.format(bucket),
      '--temp_location=gs://{0}/flights/temp/'.format(bucket),
      '--setup_file=./setup.py',
      '--max_num_workers=8',
      '--region={}'.format(region),
      '--runner=DataflowRunner'
   ]

   pipeline = beam.Pipeline(argv=argv)

Notice that there are a few extra parameters now:

  • The job name provides the name by which this job will be listed in the GCP console. This is so that we can troubleshoot the job if necessary.

  • We ask the Dataflow submission code to save our main session. This is needed whenever we have global variables in our Python program.

  • The file setup.py should list the Python packages that we needed to install (timezonefinder and pytz) as we went along—Cloud Dataflow will need to install these packages on the Compute Engine instances that it launches behind the scenes:

    REQUIRED_PACKAGES = [
        'timezonefinder',
        'pytz'
    ]
  • By default, Dataflow autoscales the number of workers based on throughput—the more lines we have in our input data files, the more workers we need. This is called Horizontal Autoscaling. To turn off autoscaling, we can specify --autoscaling_algorithm=NONE, and to constrain it somewhat, we can specify the maximum number of workers.

  • We specify the region in which the Dataflow pipeline needs to run.

  • The runner is no longer DirectRunner (which runs locally). It is now DataflowRunner.

Running the Python program submits the job to the cloud. Cloud Dataflow autoscales each step of the pipeline based on throughput, and streams the events data into BigQuery (see Figure 4-3). You can monitor the running job on the Cloud Platform Console in the Cloud Dataflow section.

Even as the events data is being written out, we can query it by browsing to the BigQuery console and typing the following:

SELECT
  ORIGIN,
  DEP_TIME,
  DEST,
  ARR_TIME,
  ARR_DELAY,
  EVENT_TIME,
  EVENT_TYPE
FROM
  dsongcp.flights_simevents
WHERE
  (DEP_DELAY > 15 and ORIGIN = 'SEA') or
  (ARR_DELAY > 15 and DEST = 'SEA')
ORDER BY EVENT_TIME ASC
LIMIT
  5

This returns:

Row ORIGIN DEP_TIME DEST ARR_TIME ARR_DELAY EVENT_TIME EVENT_TYPE
1 SEA 2015-01-01 08:21:00 UTC IAD null null 2015-01-01 08:21:00 UTC departed
2 SEA 2015-01-01 08:21:00 UTC IAD null null 2015-01-01 08:38:00 UTC wheelsoff
3 SEA 2015-01-01 08:21:00 UTC IAD 2015-01-01 12:48:00 UTC 22.0 2015-01-01 12:48:00 UTC arrived
4 KOA 2015-01-01 10:11:00 UTC SEA 2015-01-01 15:45:00 UTC 40.0 2015-01-01 15:45:00 UTC arrived
5 SEA 2015-01-01 16:43:00 UTC PSP null null 2015-01-01 16:43:00 UTC departed

As expected, we see three events for the SEA-IAD flight, one at departure, the next at wheelsoff, and the third at arrival. The arrival delay is known only at arrival.

BigQuery is a columnar database, so a query that selects all fields:

SELECT
  *
FROM
  dsongcp.flights_simevents
ORDER BY EVENT_TIME ASC

will be inefficient. However, we do need all of the event data in order to send out event notifications. Therefore, we traded off storage for speed by adding an extra column called EVENT_DATA to our BigQuery table and populated it in our Dataflow pipeline as follows:

def create_event_row(fields):
   featdict = dict(fields)  # copy
   featdict['EVENT_DATA'] = json.dumps(fields)
   return featdict

Then, our query to pull the events could simply be as follows:

SELECT
  EVENT_TYPE,
  EVENT_TIME,
  EVENT_DATA
FROM
  dsongcp.flights_simevents
WHERE
  EVENT_TIME >= TIMESTAMP('2015-05-01 00:00:00 UTC')
  AND EVENT_TIME < TIMESTAMP('2015-05-03 00:00:00 UTC')
ORDER BY
  EVENT_TIME ASC
LIMIT
  2

The result looks like this:

Row EVENT_TYPE EVENT_TIME EVENT_DATA
1 wheelsoff 2015-05-01 00:00:00 UTC {"FL_DATE": "2015-04-30", "UNIQUE_CARRIER": "DL", "ORIGIN_AIRPORT_SEQ_ID": "1295302", "ORIGIN": "LGA", "DEST_AIRPORT_SEQ_ID": "1330303", "DEST": "MIA", "CRS_DEP_TIME": "2015-04-30T23:29:00", "DEP_TIME": "2015-04-30T23:35:00", "DEP_DELAY": 6.0, "TAXI_OUT": 25.0, "WHEELS_OFF": "2015-05-01T00:00:00", "CRS_ARR_TIME": "2015-05-01T02:53:00", "CANCELLED": false, "DIVERTED": false, "DEP_AIRPORT_TZOFFSET": -14400.0, "ARR_AIRPORT_TZOFFSET": -14400.0, "EVENT_TYPE": "wheelsoff", "EVENT_TIME": "2015-05-01T00:00:00"}
2 departed 2015-05-01 00:00:00 UTC {"FL_DATE": "2015-04-30", "UNIQUE_CARRIER": "DL", "ORIGIN_AIRPORT_SEQ_ID": "1295302", "ORIGIN": "LGA", "DEST_AIRPORT_SEQ_ID": "1320402", "DEST": "MCO", "CRS_DEP_TIME": "2015-04-30T23:55:00", "DEP_TIME": "2015-05-01T00:00:00", "DEP_DELAY": 5.0, "CRS_ARR_TIME": "2015-05-01T02:45:00", "CANCELLED": false, "DIVERTED": false, "DEP_AIRPORT_TZOFFSET": -14400.0, "ARR_AIRPORT_TZOFFSET": -14400.0, "EVENT_TYPE": "departed", "EVENT_TIME": "2015-05-01T00:00:00"}

This table will serve as the source of our events; it is from such a query that we will simulate streaming flight data.

Publishing an Event Stream to Cloud Pub/Sub

Now that we have the source events from the raw flight data, we are ready to simulate the stream. Streaming data in Google Cloud Platform is typically published to Cloud Pub/Sub, a serverless real-time messaging service. Cloud Pub/Sub provides reliable delivery and can scale to more than a million messages per second. Unless you are using Cloud Pub/Sub Lite (which is a single-zone service that is built for low-cost operation), Pub/Sub stores copies of messages in multiple zones to provide “at least once” guaranteed delivery to subscribers, and there can be many simultaneous subscribers.

Our simulator will read from the events table in BigQuery (populated in the previous section) and publish messages to Cloud Pub/Sub. Essentially, we will walk through the flight event records, getting the notification time from each, and simulate publishing those events as they happen.

Speed-Up Factor

However, we’ll also use a mapping between the event notification time (arrival or departure time based on event) and the current system time. Why? Because it is inefficient to always simulate the flight events at real-time speeds. Instead, we might want to run through a day of flight data in an hour (as long as the code that processes these events can handle the increased data rate). At other times, we may be running our event-processing code in a debugging environment that is slower and so we might want to slow down the simulation. I will refer to this ratio between the actual time and simulation time as the speed-up factor—the speed-up factor will be greater than 1 if we want the simulation to be faster than real time and less than 1 if we want it to be slower than real time.

Based on the speed-up factor, we’ll have to do a linear transformation of the event time to system time. If the speed-up factor is 1, a 60-minute difference between the start of the simulation in event time and the current record’s timestamp should be encountered 60 minutes after the start of the simulation. If the speed-up factor is 60, a 60-minute difference in event time translates to a 1-minute difference in system time, and so the record should be published a minute later. If the event time clock is ahead of the system clock, we sleep for the necessary amount of time so as to allow the simulation to catch up.

The simulation consists of four steps (see also Figure 4-5):

  • Run the query to get the set of flight event records to publish.

  • Iterate through the query results.

  • Accumulate events to publish as a batch.

  • Publish accumulated events and sleep as necessary.

Even though this is an ETL pipeline, the need to process records in strict sequential order and sleep in between makes this ETL pipeline a poor fit for Cloud Dataflow. Instead, we’ll implement this as a pure Python program. The problem with this choice is that the simulation code is not fault tolerant—if the simulation fails, it will not automatically restart and definitely will not start from the last successfully notified event.

Figure 4-5. The four steps of simulation.

The simulation code that we are writing is only for quick experimentation with streaming data. Hence, I will not take the extra effort needed to make it fault-tolerant. If we had to do so, we could make the simulation fault-tolerant by starting from a BigQuery query that is bounded in terms of a time range with the start of that time range automatically inferred from the last-notified record in Cloud Pub/Sub. Then, we could launch the simulation script from a Docker container and use Cloud Run or Google Kubernetes Engine to automatically restart the simulation if the simulation code fails.

Get Records to Publish

The BigQuery query is parameterized by the start and end time of the simulation and can be invoked through the Google Cloud API for Python (see 04_streaming/simulate/simulate.py in the GitHub repository):

   bqclient = bq.Client(args.project)
   querystr = """
SELECT
  EVENT_TYPE,
  EVENT_TIME AS NOTIFY_TIME,
  EVENT_DATA
FROM
  dsongcp.flights_simevents
WHERE
  EVENT_TIME >= TIMESTAMP('{}')
  AND EVENT_TIME < TIMESTAMP('{}')
ORDER BY
  EVENT_TIME ASC
"""
   rows = bqclient.query(querystr.format(args.startTime,
                                         args.endTime))

This, however, is a bad idea. Do you see why?

It’s because we are getting the start time and end time from the command line of the simulation script and directly passing it into BigQuery. This is called SQL injection, which can lead to security problems.17 A better approach is to use parameterized queries—the BigQuery query contains the parameters marked as @startTime, etc., and the Python query function takes the definitions via the job configuration parameter:

   bqclient = bq.Client(args.project)
   querystr = """
SELECT
  EVENT_TYPE,
  EVENT_TIME AS NOTIFY_TIME,
  EVENT_DATA
FROM
  dsongcp.flights_simevents
WHERE
  EVENT_TIME >= @startTime
  AND EVENT_TIME < @endTime
ORDER BY
  EVENT_TIME ASC
"""
   job_config = bq.QueryJobConfig(
       query_parameters=[
           bq.ScalarQueryParameter("startTime", "TIMESTAMP", args.startTime),
           bq.ScalarQueryParameter("endTime", "TIMESTAMP", args.endTime),
       ]
   )
   rows = bqclient.query(querystr, job_config=job_config)

The query function returns an object (called rows in the preceding snippet) that we can iterate through:

  for row in rows:
    # do something

What do we need to do for each of the rows? We’ll need to iterate through the records, build a batch of events, and publish each batch. Let’s see how each of these steps is done.

How Many Topics?

As we walk through the query results, we need to publish events to Cloud Pub/Sub. We have three choices in terms of the architecture:

  • We could publish all the events to a single topic. However, this can be wasteful of network bandwidth if we have a subscriber that is interested only in the wheelsoff event. Such a subscriber will have to parse the incoming event, decode the EVENT_TYPE file in the JSON, and discard events in which they are not interested.

  • We could publish all the events to a single topic, but add attributes to each message. For example, to publish an event with two attributes event_type and carrier, we’d do:

     publisher.publish(topic, event_data,
                       event_type='departed', carrier='AA')

    Then, the subscriber could ask for server-side filtering based on an attribute or combination of attributes when creating the subscription:

     subscriber.create_subscription(request={..., "filter": 
       "attributes.carrier='AS' AND attributes.event_type='arrived'"})
  • Create a separate topic per event type (i.e., an arrived topic, a departed topic, and a wheelsoff topic).

Option 1 is the simplest, and should be your default choice unless you will have many subscribers that are interested only in subsets of the event stream. If you will have subscribers that are interested in subsets of the event stream, choose between Options 2 and 3.

Option 2 adds software complexity. Option 3 adds infrastructure complexity. I suggest choosing Option 3 when you have only one attribute, and that attribute has only a handful of options. This limits infrastructure complexity while keeping publisher and subscriber code simple. Choose Option 2 when you have many attributes each with many possible values because Option 3 in such a scenario will lead to an explosion in the number of topics.

Iterating Through Records

We will choose to have a separate topic per event type (i.e., an arrived topic, a departed topic, and a wheelsoff topic), so we create three topics:18

for event_type in ['wheelsoff', 'arrived', 'departed']:
  topics[event_type] = publisher.topic_path(args.project, event_type)
  try:
    publisher.get_topic(topic=topics[event_type])
    logging.info("Already exists: {}".format(topics[event_type]))
  except:
    logging.info("Creating {}".format(topics[event_type]))
    publisher.create_topic(name=topics[event_type])

After creating the topics, we call the notify() method passing along the rows read from BigQuery:

# notify about each row in the dataset
programStartTime = datetime.datetime.utcnow()
simStartTime = datetime.datetime.strptime(args.startTime,
                         TIME_FORMAT).replace(tzinfo=pytz.UTC)
notify(publisher, topics, rows, simStartTime, 
       programStartTime, args.speedFactor)

Building a Batch of Events

The notify() method consists of accumulating the rows into batches, publishing a batch, and sleeping until it is time to publish the next batch:

def notify(publisher, topics, rows, simStartTime, programStart, speedFactor):
   # sleep computation
   def compute_sleep_secs(notify_time):
        time_elapsed = (datetime.datetime.utcnow() - 
                        programStart).seconds
        sim_time_elapsed = (notify_time - simStartTime).seconds / speedFactor
        to_sleep_secs = sim_time_elapsed - time_elapsed
        return to_sleep_secs

   tonotify = {}
   for key in topics:
     tonotify[key] = list()

   for row in rows:
       event, notify_time, event_data = row

       # how much time should we sleep?
       if compute_sleep_secs(notify_time) > 1:
          # notify the accumulated tonotify
          publish(publisher, topics, tonotify, notify_time)
          for key in topics:
             tonotify[key] = list()

          # recompute sleep, since notification takes a while
          to_sleep_secs = compute_sleep_secs(notify_time)
          if to_sleep_secs > 0:
             logging.info('Sleeping {} seconds'.format(to_sleep_secs))
             time.sleep(to_sleep_secs)

       tonotify[event].append(event_data)
   # left-over records; notify again
   publish(publisher, topics, tonotify, notify_time)

There are a few points to be made here. First is that we work completely in UTC so that the time difference computations make sense. Second, we always compute whether to sleep by looking at the time difference since the start of the simulation. If we simply keep moving a pointer forward, errors in time will accumulate. Finally, note that we check whether the sleep time is more than a second the first time, so as to give records time to accumulate. If, when you run the program, you do not see any sleep, your speed-up factor is too high for the capability of the machine running the simulation code and the network between that machine and Google Cloud Platform. Slow down the simulation, get a larger machine, or run it behind the Google firewall (such as in Cloud Shell or on a Compute Engine instance).

Publishing a Batch of Events

The notify() method that we saw in the previous code example has accumulated the events in between sleep calls. Even though it appears that we are publishing one event at a time, the publisher actually maintains a separate batch for each topic:

def publish(publisher, topics, allevents):
   for key in topics:  # 'departed', 'arrived', etc.
      topic = topics[key]
      events = allevents[key]
      logging.info('Publishing {} {} events'.format(len(events), key))
      for event_data in events:
          publisher.publish(topic, event_data.encode())

Note that Cloud Pub/Sub does not guarantee the order in which messages will be delivered, especially if the subscriber lets a huge backlog build up. Out-of-order messages will happen, and downstream subscribers will need to deal with them. Cloud Pub/Sub guarantees “at least once” delivery and will resend the message if the subscriber does not acknowledge a message in time. I will use Cloud Dataflow to ingest from Cloud Pub/Sub, and Cloud Dataflow deals with both these issues (out-of-order and duplication) transparently.

We can try out the simulation by typing the following:

python3 simulate.py --startTime '2015-05-01 00:00:00 UTC' \
      --endTime '2015-05-04 00:00:00 UTC' --speedFactor=60

This will simulate three days of flight data (the end time is exclusive) at 60 times real-time speed and stream the events into three topics on Cloud Pub/Sub.19 Because the simulation starts off from a BigQuery query, it is quite straightforward to limit the simulated events to just a single airport or to airports within a latitude/longitude bounding box.

In this section, we looked at how to produce an event stream and publish those events in real time. Throughout this book, we can use this simulator and these topics for experimenting with how to consume streaming data and carry out real-time analytics.

Real-Time Stream Processing

Now that we have a source of streaming data that includes location information, let’s look at how to build a real-time dashboard. Figure 4-6 presents the reference architecture for many solutions on Google Cloud Platform.20

Figure 4-6. Reference architecture for data processing on Google Cloud Platform.

In the previous section, we set up a real-time stream of events into Cloud Pub/Sub that we can aggregate in Cloud Dataflow and write to BigQuery. Data Studio can connect to BigQuery and provide a real-time, interactive dashboard. Let’s get started.

Streaming in Dataflow

When we carried out the time correction of the raw flight data, we were working off a complete BigQuery flights table, processing them in Cloud Dataflow, and writing the events table into BigQuery. Processing a finite, bounded input dataset is called batch processing.

Here, though, we need to process events in Cloud Pub/Sub that are streaming in. The dataset is unbounded. Processing an unbounded set of data is called stream processing. Fortunately, the code to do stream processing in Apache Beam is identical to the code to do batch processing.

We could simply receive the events from Cloud Pub/Sub similarly to how we read data from a CSV file:21

topic_name = "projects/{}/topics/arrived".format(project)
events = (pipeline
         | 'read' >> beam.io.ReadFromPubSub(topic=topic_name)
         | 'parse' >> beam.Map(lambda s: json.loads(s))
         )

The only change we have to do is to turn on the streaming flag in the Dataflow options:

argv = [
        ...
        '--streaming',
    ]

We can stream the read-in events to BigQuery using code similar to what we used in batch processing:

schema = 'FL_DATE:date,...,EVENT_TYPE:string,EVENT_TIME:timestamp'
(events
         | 'bqout' >> beam.io.WriteToBigQuery(
                    'dsongcp.streaming_events', schema=schema,
               create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED
            )
)

In the preceding code, we subscribe to a topic in Cloud Pub/Sub and begin reading from it. As each message streams in, we parse the message, convert it to a TableRow in BigQuery, and then write it out. Indeed, if this is all we need, we can simply use the Google-provided Dataflow template that goes from Pub/Sub to BigQuery.

But let’s say that we want to read both the arrived events and the departed events and write them to the same BigQuery table. We can do that quite simply in Beam:

events = {}
for event_name in ['arrived', 'departed']:
  topic_name = "projects/{}/topics/{}".format(project, event_name)
  events[event_name] = (pipeline
     | 'read:{}'.format(event_name) >>  
             beam.io.ReadFromPubSub(topic=topic_name)
     | 'parse:{}'.format(event_name) >> beam.Map(
             lambda s: json.loads(s))
  )

all_events = (events['arrived'], events['departed']) | beam.Flatten()

Flattening the two sets of events concatenates them into a single collection. We then write out all_events to BigQuery.

To try this code out, we need to run the simulator we wrote in the previous section so that the simulator can publish events to the Pub/Sub topics. To start the simulation, start the Python simulator that we developed in the previous section:

python simulate.py --startTime '2015-05-01 00:00:00 UTC'
--endTime '2015-05-04 00:00:00 UTC'  --speedFactor 30

The simulator will send events from May 1, 2015, to May 3, 2015, at 30 times real-time speed, so that an hour of data is sent to Cloud Pub/Sub in two minutes. You can do this from Cloud Shell or from your local laptop. (If necessary, run install_packages.sh to install the necessary Python packages and gcloud auth application-default login to give the application the necessary credentials to execute queries.)

In another terminal, start avg01.py to read the stream of events and write them out to BigQuery. We can then query the dataset in BigQuery even as the events are streaming in. The BigQuery UI may not even show this streaming table yet, but it can be queried:

SELECT * FROM dsongcp.streaming_events
ORDER BY EVENT_TIME DESC
LIMIT 5

Windowing a Pipeline

Although we could do just a straight data transfer, I’d like to do more. When I populate a real-time dashboard of flight delays, I’d like the information to be aggregated over a reasonable interval—for example, I want a moving average of flight delays and the total number of flights over the past 60 minutes at every airport. So, rather than simply take the input received from Cloud Pub/Sub and stream it out to BigQuery, I’d like to carry out time-windowed analytics on the data as I’m receiving it and write those analytics to BigQuery.22 Cloud Dataflow can help us do this.

While we may be averaging over 60 minutes, how often should we compute this 60-minute average? It might be advantageous, for example, to use a sliding window and compute this 60-minute average every five minutes.

Streaming Aggregation

The key difference between batch aggregation and streaming aggregation is the unbounded nature of the data in stream processing. What does an operation like “max” mean when the data is unbounded? After all, whatever our maximum at this point in time, a large value could come along in the stream at a later point.

A key concept when aggregating streaming data is that of a window that becomes the scope for all aggregations. Here, we apply a time-based sliding window on the pipeline. From now on, all grouping, aggregation, and so on is within that time window and there is a separate maximum, average, etc. in each time window:

stats = (all_events
   | 'byairport' >> beam.Map(by_airport)
   | 'window' >> beam.WindowInto(
                 beam.window.SlidingWindows(60 * 60, 5 * 60))
   | 'group' >> beam.GroupByKey()
   | 'stats' >> beam.Map(lambda x: compute_stats(x[0], x[1]))
)

Let’s walk through the preceding code snippet carefully.

The first thing we do is to take all the events and apply the by_airport transformation to the events:

   | 'byairport' >> beam.Map(by_airport)

What this does is to pull out the origin airport for departed events and destination airport for arrival events:

def by_airport(event):
    if event['EVENT_TYPE'] == 'departed':
        return event['ORIGIN'], event
    else:
        return event['DEST'], event

Next, we apply a sliding window to the event stream. The window is of 60 minutes duration, applied every 5 minutes:

   | 'window' >> beam.WindowInto(
                 beam.window.SlidingWindows(60 * 60, 5 * 60))

Then, we apply a GroupByKey:

   | 'group' >> beam.GroupByKey()

What’s the key?

In the by_airport function mentioned previously, we made the airport the key and the entire event object the value. So, the GroupByKey groups events by airport.

But the GroupByKey is not just by airport. Because we have already applied a sliding window, there is a separate group created for each time window. So, each group now consists of 60 minutes of flight events that arrived or departed at a specific airport.

It is on these groups that we call the compute_stats function in the last Map of the snippet:

   | 'stats' >> beam.Map(lambda x: compute_stats(x[0], x[1]))

The compute_stats function takes the airport and list of events at that airport, and then computes some statistics:

def compute_stats(airport, events):
    arrived = [event['ARR_DELAY'] for event in events 
                if event['EVENT_TYPE'] == 'arrived']
    avg_arr_delay = float(np.mean(arrived)) 
                if len(arrived) > 0 else None

    departed = [event['DEP_DELAY'] for event in events 
                if event['EVENT_TYPE'] == 'departed']
    avg_dep_delay = float(np.mean(departed)) 
                if len(departed) > 0 else None

    num_flights = len(events)
    start_time = min([event['EVENT_TIME'] for event in events])
    latest_time = max([event['EVENT_TIME'] for event in events])

    return {
        'AIRPORT': airport,
        'AVG_ARR_DELAY': avg_arr_delay,
        'AVG_DEP_DELAY': avg_dep_delay,
        'NUM_FLIGHTS': num_flights,
        'START_TIME': start_time,
        'END_TIME': latest_time
    }

In the preceding code, we pull out the arrived events and compute the average arrival delay. Similarly, we compute the average departure delay on the departed events. We also compute the number of flights in the time window at this airport and return all these statistics.

The statistics are then written out to BigQuery using code that should look familiar by now:

stats_schema = ','.join(
    ['AIRPORT:string,AVG_ARR_DELAY:float,AVG_DEP_DELAY:float',                          
     'NUM_FLIGHTS:int64,START_TIME:timestamp,END_TIME:timestamp'])
(stats
    | 'bqout' >> beam.io.WriteToBigQuery(
             'dsongcp.streaming_delays', schema=stats_schema,
       create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED
               )
)

As with the previous section, we can start the simulator, and then start avg02.py. When I did this, the resulting aggregations were getting produced every 5 minutes, but within each 5 minute period, the events being notified about covered a 150 minute range (because the 30x simulation processes 150 minutes of data in 5 minutes).

The stream processing engine was applying the sliding windows based on the time on a wall clock. We, however, want it to apply the window based on the timestamp within the images.

How do we do that?

Using Event Timestamps

We have to add an attribute at the time we publish the events (in simulate.py):

publisher.publish(topic, event_data.encode(), EventTimeStamp=timestamp)

Then, in our Beam pipeline, when read from Pub/Sub, we should tell the pipeline to disregard the publish time in Pub/Sub and use this attribute of the message as the timestamp instead:

| 'read:{}'.format(event_name) >> beam.io.ReadFromPubSub(
            topic=topic_name, timestamp_attribute='EventTimeStamp')

With this change, when I run the query:

SELECT * FROM dsongcp.streaming_delays
WHERE AIRPORT = 'ATL'
ORDER BY END_TIME DESC

I get rows approximately 5 minutes apart as expected:

Row AIRPORT AVG_ARR_DELAY AVG_DEP_DELAY NUM_FLIGHTS START_TIME END_TIME
1 ATL 35.72222222222222 13.666666666666666 48 2015-05-01 02:24:00 UTC 2015-05-01 03:21:00 UTC
2 ATL 35.25 8.717948717948717 59 2015-05-01 02:15:00 UTC 2015-05-01 03:12:00 UTC
3 ATL 38.666666666666664 9.882352941176471 52 2015-05-01 02:19:00 UTC 2015-05-01 03:12:00 UTC
4 ATL 38.473684210526315 5.916666666666667 55 2015-05-01 02:15:00 UTC 2015-05-01 03:08:00 UTC
5 ATL 35.111111111111114 5.53125 50 2015-05-01 02:15:00 UTC 2015-05-01 03:03:00 UTC

The reported times are not exactly 5 minutes apart because the reported times correspond to the earliest/latest flight in Atlanta within the time window. Note also that the length of the time window is approximately an hour.

It is likely, however, that Cloud Shell or your local laptop will struggle to keep up with the event stream. We need to be executing this pipeline in Dataflow in a serverless way.

Executing the Stream Processing

To run the Beam pipeline in Cloud Dataflow, all I have to do is to change the runner (see avg03.py in the GitHub repository):

argv = [
        '--project={0}'.format(project),
        '--job_name=ch04avgdelay',
        '--streaming',
      ...
        '--runner=DataflowRunner'
    ]

Before we start this pipeline, though, it is a good idea to delete the rows already written to the BigQuery table by avg02.py in the previous section. The easiest way to do this is to run the following SQL DML command to truncate the table:

TRUNCATE TABLE dsongcp.streaming_delays

Running avg03.py will launch off a Dataflow job. If you now browse to the Cloud Platform console, to the Cloud Dataflow section, you will see that a new streaming job has started and that the pipeline looks like that shown in Figure 4-7.

The pipeline processes flight events as they stream into Pub/Sub, aggregates them into time windows, and streams the resulting statistics into BigQuery.

Figure 4-7. The streaming pipeline to compute delay statistics in real time at each airport.

Analyzing Streaming Data in BigQuery

Two minutes after the launch of your program,23 the first set of data will make it into BigQuery. You can query for the statistics for a specific airport from the BigQuery console using the same query as before:

SELECT * FROM dsongcp.streaming_delays
WHERE AIRPORT = 'ATL'
ORDER BY END_TIME DESC

The cool thing is that we can do this querying even as the data is streaming! How would we get the latest data for all airports? We could get all the data for each airport, order it by time, and take the latest:

SELECT 
    AIRPORT,
    ARRAY_AGG(
        STRUCT(AVG_ARR_DELAY, AVG_DEP_DELAY, NUM_FLIGHTS, END_TIME)
        ORDER BY END_TIME DESC LIMIT 1) AS a
FROM dsongcp.streaming_delays d
GROUP BY AIRPORT

The results look something like this:

Row AIRPORT a.AVG_ARR_DELAY a.AVG_DEP_DELAY a.NUM_FLIGHTS a.END_TIME
1 BUR -6.8 -5.666666666666667 8 2015-05-01 03:26:00 UTC
2 HNL 17.11111111111111 -3.7777777777777777 18 2015-05-01 03:46:00 UTC
3 CVG -7.75 null 4 2015-05-01 03:48:00 UTC
4 PHL 5.636363636363637 16.5 13 2015-05-01 03:48:00 UTC
5 IND 40.6 null 5 2015-05-01 03:45:00 UTC

Queries like these on streaming data will be useful when we begin to build our dashboard. For example, the first query will allow us to build a time series chart of delays at a specific airport. The second query will allow us to build a map of average delays across the country.

Real-Time Dashboard

Now that we have streaming data in BigQuery and a way to analyze it as it is streaming in, we can create a dashboard that shows departure and arrival delays in context. Two maps can help explain our contingency table–based model to end users: current arrival delays across the country and current departure delays across the country.

To pull the data to populate these charts, we need to add a BigQuery data source in Data Studio. Although Data Studio supports specifying the query directly in the user interface, it is much better to create a view in BigQuery and use that view as a data source in Data Studio. BigQuery views have a few advantages over queries that you type into Data Studio: they tend to be reusable across reports and visualization tools, there is only one place to change if an error is detected, and BigQuery views map better to access privileges (Cloud Identity and Access Management roles) based on the columns they need to access.

Here is the query that I used to create the view:

CREATE OR REPLACE VIEW dsongcp.airport_delays AS
WITH delays AS (
    SELECT d.*, a.LATITUDE, a.LONGITUDE
    FROM dsongcp.streaming_delays d
    JOIN dsongcp.airports a USING(AIRPORT) 
    WHERE a.AIRPORT_IS_LATEST = 1
)
 
SELECT 
    AIRPORT,
    CONCAT(LATITUDE, ',', LONGITUDE) AS LOCATION,
    ARRAY_AGG(
        STRUCT(AVG_ARR_DELAY, AVG_DEP_DELAY, NUM_FLIGHTS, END_TIME)
        ORDER BY END_TIME DESC LIMIT 1) AS a
FROM delays
GROUP BY AIRPORT, LONGITUDE, LATITUDE

This is slightly different from the second query in the previous section in that it also adds the location of the airport by joining against the airports table.

Having saved the view in BigQuery, we can create a data source for the view in Data Studio, just as we did in the previous chapter:

  • Visit https://datastudio.google.com.

  • Create a BigQuery data source, point it to the airport_delays view, and connect to it.

  • Change the location field from Text to a Geo | Latitude, Longitude, then click Create Report.

  • Add a Geo Chart to the report.

  • Specify the location field as the geo dimension (see Figure 4-8).

  • Specify average departure delay as the dimension and United States as the zoom level.

  • Change the style so that the color bar includes all areas.

  • Repeat for the arrival delay.

Figure 4-8. Dashboard of latest flight data from across the United States.

It is worth reflecting on what we did in this section. We processed streaming data in Cloud Dataflow, creating 60-minute moving averages that we streamed into BigQuery. We then created a view in BigQuery that would show the latest data for each airport, even as it was streaming in. We connected that to a dashboard in Data Studio. Every time the dashboard is refreshed, it pulls new data from the view, which in turn dynamically reflects the latest data in BigQuery.

Summary

In this chapter, we discussed how to build a real-time analysis pipeline to carry out streaming analytics and populate real-time dashboards. In this book, we are using a dataset that is not available in real time. Therefore, we simulated the creation of a real-time feed so that I could demonstrate how to build a streaming ingest pipeline. Building the simulation also gives us a handy test tool—no longer do we need to wait for an interesting event to happen. We can simply play back a recorded event!

In the process of building out the simulation, we realized that time handling in the original dataset was problematic. Therefore, we improved the handling of time in the original data and created a new dataset with UTC timestamps and local offsets. This is the dataset that we will use going forward.

We also looked at the reference architecture for handling streaming data in Google Cloud Platform. First, receive your data in Cloud Pub/Sub so that the messages can be received asynchronously. Process the Cloud Pub/Sub messages in Cloud Dataflow, computing aggregations on the data as needed, and stream either the raw data or aggregate data (or both) to BigQuery. We worked with all three Google Cloud products (Cloud Pub/Sub, Cloud Dataflow, and BigQuery) using the Google Cloud client libraries in Python. However, in none of these cases did we ever need to create a virtual machine ourselves—these are all serverless and autoscaled offerings. We thus were able to concentrate on writing code, letting the platform manage the rest.

Suggested Resources

The Apache Beam website has interactive coding exercises, called Katas, that provide an excellent hands-on way to learn streaming concepts and how to implement them using Beam.

Dataflow templates are prewritten Apache Beam pipelines that are handy for data migration. In the chapter, we mentioned the Dataflow template for ingesting data from Pub/Sub into BigQuery. Dataflow templates also exist for non-Google sources. For example, there is a Dataflow connector from SAP HANA to BigQuery, as described in the 2017 Google blog post “Using Apache Beam and Cloud Dataflow to Integrate SAP HANA and BigQuery” by Babu Prasad Elumalai and Mark Shalda. That particular connector is written in Java.

This tutorial walks you through the process of creating your own Dataflow template. Any Dataflow pipeline can be made into a template for easy sharing and convenient launch.

In this 2021 article, “Processing Billions of Events in Real Time at Twitter,” Twitter engineers Lu Zhang and Chukwudiuto Malife describe how Twitter processes 400 billion events in real time using Dataflow.

1 This is a common situation. It is only as you start to explore a dataset that you discover you need ancillary datasets. Had I known beforehand, I would have ingested both datasets. But you are following my workflow, and as of this point, I knew that I needed a dataset of time zone offsets but hadn’t yet searched for it!

2 See 04_streaming/design/mktbl.sh for the actual syntax; we’ve made adjustments here for printing purposes.

3 Or make a copy or view of the table with anonymized column values—we cover safeguarding personally identifiable information in Chapter 7 and in the Appendix.

4 For example, the time zone of Sevastopol changed in 2014 from Eastern European Time (UTC+2) to Moscow Time (UTC+4) after the annexation of Crimea by the Russian Federation.

5 The Java API is much more mature and performant, but Python is easier and more concise. In this book, we will use Python.

6 If you are using an ephemeral shell like Cloud Shell, you will have to run the activate line every time you start a new session. This will load up the virtual environment that you were using earlier. This way, you will not need to reinstall the Python packages every time.

7 This code is in 04_streaming/transform/df01.py of the GitHub repository of this book.

8 This code is in 04_streaming/transform/df02.py of the GitHub repository of this book.

9 See the answer to the question “How do I handle NameErrors?” in the Google documentation.

10 Saving Python objects is called pickling.

11 Chicago’s airport didn’t pack up and move on June 30. Most likely, a new terminal or runway was opened at that time, and this changed the location of the centroid of the airport’s aerial extent. Notice that the change is just 0.0036 in latitude degrees. At Chicago’s latitude, this translates to about 400 meters.

12 Normally, the recommended way to sample a BigQuery table is to do SELECT * FROM dsongcp.flights WHERE TABLESAMPLE SYSTEM (0.001) because table sampling isn’t cached, so we will get different results each time. However, at the time of writing, table sampling works only on tables and flights is a View. Besides, in our current use case, we don’t care whether or not we get different samples each time we run the query. That’s why I’m using rand().

13 See the file 04_streaming/transform/bqsample.sh.

14 This code is in 04_streaming/transform/df03.py of the GitHub repository of this book.

15 This code is in 04_streaming/transform/df04.py of the GitHub repository of this book.

16 This code is in 04_streaming/transform/df05.py of the GitHub repository of this book.

17 It opens a door to someone passing in queries that could, for example, delete a table. This XKCD cartoon is famous for highlighting the issue.

18 See 04_streaming/simulate/simulate.py in the GitHub repository.

19 At 60 times real-time speed, the 3 days of flight data will take over an hour to complete. Hopefully, that’s enough time to complete the rest of the chapter. If not, just restart the simulator. If you get done early, hit Ctrl-C to stop the simulator.

20 For an example, see the reference architecture to analyze games on mobile devices.

21 See 04_streaming/realtime/avg01.py in the GitHub repository.

22 If you wanted to write the raw data that is received to BigQuery, you could do that, too, of course—that is what is shown in the previous code snippet. In this section, I assume that we need only the aggregate statistics over the past hour.

23 Recall that we are computing aggregates over 60 minutes every 5 minutes. Cloud Dataflow treats the first “full” window as happening 65 minutes into the simulation. Because we are simulating at 30 times speed, this is two minutes on your clock.

Get Data Science on the Google Cloud Platform, 2nd Edition now with the O’Reilly learning platform.

O’Reilly members experience books, live events, courses curated by job role, and more from O’Reilly and nearly 200 top publishers.