Chapter 4. Common Developer Tasks for Impala
Here are the special Impala aspects of some standard operations familiar to database developers.
Getting Data into an Impala Table
Because Impala’s feature set is oriented toward high-performance queries, much of the data you work with in Impala will originate from some other source, and Impala takes over near the end of the extract-transform-load (ETL) pipeline.
To get data into an Impala table, you can point Impala at data files in an arbitrary HDFS location; move data files from somewhere in HDFS into an Impala-managed directory; or copy data from one Impala table to another. Impala can query the original raw data files, without requiring any conversion or reorganization. Impala can also assist with converting and reorganizing data when those changes are helpful for query performance.
As a developer, you might be setting up all parts of a data pipeline, or you might work with files that already exist. Either way, the last few steps in the pipeline are the most important ones from the Impala perspective. You want the data files to go into a well-understood and predictable location in HDFS, and then Impala can work with them.
See Chapter 5 for some demonstrations of ways to construct and load data for your own testing. You can do basic functional testing with trivial amounts of data. For performance and scalability testing, you’ll need many gigabytes worth.
The following sections are roughly in order from the easiest techniques to the most complex. Once you have an ETL pipeline set up or a substantial amount of data loaded into Impala, you can explore all the different techniques and settle on one or two ingestion methods that work the best for you.
INSERT … SELECT statement is very simple to use, but requires you to have some existing data in an Impala table. You issue an
INSERT … SELECT statement to copy data from one table to another. You can convert the data to a different file format in the destination table, filter the data using
WHERE clauses, and transform values using operators and built-in functions. With this technique, you can improve query efficiency by reorganizing the data in various ways; you’ll see examples in following sections.
INSERT statement can add data to an existing table with the
table_name syntax, or replace the entire contents of a table or partition with the
table_name syntax. Because Impala does not currently have
DELETE statements, overwriting a table is how you make a change to existing data.
For First-Time Users Only
As you’ll see in Chapter 5, you can issue an
INSERT … VALUES statement to create new data from literals and function return values. You can insert multiple rows through a single statement by including multiple tuples after the
VALUES clause. We recommend against relying on this technique for production data, because it really only applies to very small volumes of data. Each
INSERT statement produces a new tiny data file, which is a very inefficient layout for Impala queries against HDFS data. On the other hand, if you’re entirely new to Hadoop, this is a simple way to get started and experiment with SQL syntax and various table layouts, data types, and file formats. You should expect to outgrow the
INSERT … VALUES syntax relatively quickly. You might graduate from tables with a few dozen rows straight to billions of rows when you start working with real data. Make sure to clean up any unneeded tables full of small files after finishing with
INSERT … VALUES experiments.
LOAD DATA Statement
If you have data files somewhere in HDFS already, you can issue a
LOAD DATA statement to move data files in HDFS into the Impala data directory for a table.
Specify the HDFS path of a single file or a directory full of files. Impala moves the files out of their original location, to a directory under Impala’s control. You don’t need to know the destination directory; that aspect is managed by Impala. The Impala table or partition must already exist.
The files are not changed in any way by the
LOAD DATA operation. They keep the same names, contents, and they all reside in the same destination directory.
This technique is most useful when you already have some sort of ETL pipeline that puts data files in a central HDFS location, and when Impala is the main consumer for the data. For example, you might use this technique if the final stage of your ETL process converts raw data files to query-optimized Parquet files. Leave the original data files where they are, and use
LOAD DATA to move the corresponding Parquet files into the Impala directory structure for querying.
If you drop the table, the files are removed from HDFS. (The removed files are stored temporarily in the HDFS trashcan before being permanently deleted, so you can still recover them for some time after the
CREATE EXTERNAL TABLE statement acts almost as a symbolic link, pointing Impala to a directory full of HDFS files. This is a handy technique to avoid copying data when other Hadoop components are already using the data files.
The statement begins with
CREATE EXTERNAL TABLE statement and ends with the
LOCATION hdfs_path clause. The data files are not moved or changed at all. Thus, this operation is very quick, regardless of the size of the underlying data.
The files can still be added to or replaced by Hadoop components outside of Impala. (Issue a
table_name statement afterward if so.)
If you subsequently drop the table, the files are left untouched.
This is a good technique to use if you have a robust system for managing incoming data in HDFS. For example, you might put the files in a central, well-known location to analyze the same data files through multiple SQL engines, NoSQL engines, or Hadoop components.
Figuring Out Where Impala Data Resides
All the techniques up to this point work without requiring you to specify any Impala-specific HDFS paths. Subsequent techniques require that you know the actual destination path in HDFS, based on the directory structure of tables managed by Impala. Here are techniques you can use to understand the overall Impala data directory structure, and to find the HDFS location of any Impala table or partition:
DESCRIBE FORMATTEDstatement in
impala-shellto figure out the HDFS path corresponding to any Impala table. The path is shown in the
If some of your data resides outside the Impala table directories, you might use Linux commands such as
hdfs dfs -ls pathto browse around the HDFS directory structure to find the paths to specify for the
Partitioned tables consist of multiple levels of directories, one level for each partition key column. To see that structure at a glance, use
hdfs dfs -du hdfs_pathto see the directory structure of all the partitions.
Manually Loading Data Files into HDFS
When your data files originate on your local Unix system, you can use Hadoop utilities to copy those files to specific locations within HDFS. The commands start with either
hdfs dfs or
hadoop fs, followed by arguments such as
-du, and others corresponding to familiar Unix utilities. The difference between
hdfs dfs and
hadoop fs is too subtle to matter for the examples in this book, so I typically use
If you are not already familiar with the HDFS directory structure, first learn how to check the HDFS path corresponding to an Impala table or partition (“Figuring Out Where Impala Data Resides”). See the tutorial using a billion rows of sample data (“Tutorial: The Journey of a Billion Rows”) for an example of this process.
When Parquet files come into HDFS for the first time, or are copied from one HDFS location to another, make sure to preserve the original block size. Rather than
hdfs dfs -put, use the Linux command
hadoop distcp -pb as follows:
hadoop distcp -pb
If you’re already using batch-oriented SQL-on-Hadoop technology through the Apache Hive component, you can reuse Hive tables and their data directly in Impala without any time-consuming loading or conversion step. (This cross-compatibility applies to Hive tables that use Impala-compatible types for all columns.) Because Impala and Hive tables are interchangeable, after data is loaded through Hive, you can query it through Impala. This technique is for organizations that already have a Hadoop data pipeline set up. The steps are:
CREATE TABLEstatements either in Impala or through the Hive shell. (Doing the DDL in Impala reduces the chances of using an incompatible data type or file format by accident.)
INSERTstatements through the Hive shell. Hive is well-suited for batch data transfer jobs that take many hours or even days. If something goes wrong, the job continues automatically from the point where the problem occurred.
impala-shell, issue a one-time
table_namestatement to make Impala aware of a table created through Hive. (Another reason to prefer to do the DDL through Impala.)
impala-shell, issue a
table_namestatement any time data is added to or removed from a table through Hive or manual HDFS operations.
If you have data in another database system, such as an OLTP system or a data warehouse with limited capacity, you can bring it into Impala for large-scale analytics using Apache Sqoop.
The commands you run are
sqoop-import-all-tables. You specify user credentials and a JDBC-style URL to connect to the database system. Specify the options
'\\N' to translate
NULL values to the notation that Impala expects. (Due to the handling of escape sequences in the Linux shell, you typically have to specify the argument with double backslashes,
The output is in the form of text, Parquet, Avro, or SequenceFile data files. The Sqoop commands can also create the relevant SQL tables, and load those data files into the tables in HDFS.
If you create tables and load the data through Sqoop, afterward you issue
INVALIDATE METADATA and/or
REFRESH statements in Impala, the same as when you do those operations through Hive.
For general information about the Sqoop commands, see the Sqoop documentation. For tutorial-style instructions, see the Apache Sqoop Cookbook by Ting and Cecho (O’Reilly); recipes 2.5 and 2.10 are especially helpful for using Sqoop with Impala.
The Kite SDK includes a command-line interface that can go directly from a text-based CSV file into a Parquet or Avro table in HDFS. After creating the table and loading the data through Kite, you issue
INVALIDATE METADATA and/or
REFRESH statements in Impala, the same as when you do those operations through Hive.
For instructions to download and use the Kite command-line interface, see the Kite documentation.
Porting SQL Code to Impala
For the most part, standard SQL that you bring over to Impala should run unchanged. The following aspects might require changes in the SQL code:
Impala might not have every data type found on other database systems, or the name might be different. For example, Impala uses
STRINGas the all-purpose string type, and
VARCHAR2for variable-length strings with a maximum length. (For Impala,
STRINGis typically faster than constrained types such as
DDL statements have a number of Impala-specific or Hadoop-themed clauses. Expect to make changes to all your
Because Impala has limited DML statements (for example, no
DELETEexcept in combination with Kudu), and no transactional statements (such as
ROLLBACK), you might need to remove some statements from your code entirely. Most changes to data are performed by
INSERT OVERWRITEstatements in Impala.
Queries use standard SQL-92 syntax. Some specific features are not supported, or are supported starting in a particular Impala release:
Every vendor has its own set of built-in functions. Impala supports a broad set of string, numeric, and date/time functions, but you’ll need to cross-check against the ones used in your own code.
Impala is a little stricter than you might be used to in terms of casting and implicit conversions between types, in order to avoid unexpected loss of precision. Be ready to add some
CAST()calls when working with expressions or columns of different types.
See “Recent Additions” to see the latest enhancements to SQL portability.
See the Impala documentation for more on the subject of porting, including the most recent feature support.
Using Impala from a JDBC or ODBC Application
Although this book mainly emphasizes how the SQL language in Impala frees developers from having to write Java or other non-SQL programs for data processing, this section explains how to interface Java, C, PHP, and other kinds of applications with Impala through the standard JDBC interface. Driving Impala through these interfaces lets you operate the main program and display results on a non-Linux system such as a Mac OS X or Windows machine, or even a web page.
The best use case for this technique is in query-intensive applications. Data loading and ETL are relatively straightforward in SQL or in separate applications running directly on the server. Although it might be tempting to use the
INSERT … VALUES syntax from JDBC or ODBC, remember that inserting rows one or a few at a time results in a very inefficient file layout for Impala (many small files) when it comes time to run queries.
Along the same lines, look for opportunities to run heavy-duty queries on large amounts of data through Impala. Although you can run simple “point queries” that look up a single row through Impala, that technique is really only efficient when the underlying data is pulled from tables stored in HBase, not HDFS. You typically write an Impala application to churn through huge quantities of sales, web traffic, bioscience, or similar data and render the results in graphs. Or you might have a web page that runs a query through PHP to retrieve a chunk of personalized information to display for a visitor. You would probably not use Impala as the backend for a web page that ran 50 queries to pull individual page elements out of a SQL table.
Make sure to always close query handles when finished. Because Impala runs queries against such big tables, there is often a significant amount of memory tied up during a query, which is important to release. Likewise, features like admission control and YARN resource management can limit the number of queries that run concurrently; if “zombie” queries hang around due to unclosed query handles in applications, the system can stop accepting new queries.
I refer you to the official documentation and download sources for JDBC and ODBC driver information, because the details change periodically as new drivers are released.
From Java, you can connect using the dedicated Cloudera JDBC connector for Impala. You can also connect through the standard Hadoop JDBC driver (known as the Hive JDBC driver), although this option is not extensively tested and support for new Impala features can lag behind. In your application, you interface with Impala queries and result sets using standard JDBC API calls. See the Impala JDBC documentation for details, such as the class name and the connection string for your particular security configuration.
From C, C++, PHP, or other languages that support an ODBC interface, you can connect using a special Impala ODBC driver and go through standard ODBC API calls. See the Impala ODBC documentation for details.
From a Python script, you can use the
pyodbc package to issue SQL statements and get back the results as native Python data structures. Data scientists who do serious work in Python should explore the Ibis framework, which is similar to familiar Python data science frameworks such as Pandas.
Using Impala with a Scripting Language
You can write a Python, Perl, Bash, or other kind of script that uses the features of those languages without delving into any database-specific APIs. You can use a script to produce or manipulate input data for Impala, and to drive the
impala-shell interpreter to run SQL statements (primarily queries) and save or process the results.
For serious application development, you can access database-centric APIs from a variety of scripting languages. See discussions of the
impyla package for Python (“The impyla Package for Python Scripting”), and JDBC and ODBC connectivity options (“Using Impala from a JDBC or ODBC Application”) usable from many different languages.
Running Impala SQL Statements from Scripts
To execute SQL statements without any additional software prerequisites or API layers, run the
impala-shell command with some command-line options. Specify the
-q option to run a single SQL statement, or the
-f option to process a file full of SQL statements. Typically, you also use the
-B option to suppress the ASCII art boxes around query results, which makes the textual output easier to consume.
Currently, the way to use substitution variables with Impala statements and scripts is to embed shell substitution variables in strings passed to the
impala-shell interpreter, like so:
#!/bin/bash export DB_NAME=tpc export TABLE_NAME=customer_address export CRITERIA=Oakland export CUTOFF=20 impala-shell -d $DB_NAME <<EOF select * from $DB_NAME.$TABLE_NAME where ca_city = '$CRITERIA' limit $CUTOFF; EOF ...more shell code...
Saving Query Results
-o filename option of the
impala-shell command saves the output in a file. You typically use
-o in combination with
-f to run a single query or a file of SQL commands, then exit. To make the output easier to parse, also use the
-B option to suppress the ASCII art boxes around query results, and optionally the
--output_delimiter=character option to format the output with a comma, pipe, or some other character as the separator.
-o option saves the
impala-shell output in the local filesystem. To save results in HDFS, you put the result rows into an Impala table using SQL syntax such as
CREATE TABLE AS SELECT or
INSERT … SELECT. You can set up the table with the desired characteristics of file format (
STORED AS clause), separator character for text files (
ROW FORMAT clause), and HDFS path for the output files (
The impyla Package for Python Scripting
The many scientific libraries available for Python make it a popular choice for data scientists to code in. The
impyla package (still under development) acts as a bridge between the Python database API and the protocol that Impala supports for its JDBC and ODBC drivers. The Python programs use the Python DB API 2.0, from the PEP-249 specification.
For example, here is a script that issues a
SHOW TABLES statement to get a list of tables in the
DEFAULT database, then
DESCRIBE statements to get details about the structure of each table, and then issues queries to get the number of rows in each table. The result sets come back as lists of tuples. Substitute your own hostname here, but keep the same port,
21050, where Impala listens for JDBC requests. You can run scripts like this on all kinds of systems—not only on Linux machines with Hadoop installed; this particular script was executed on Mac OS X.
from impala.dbapi import connect conn = connect(host='a1730.abcde.example.com', port=21050) try: cur = conn.cursor() try: cur.execute('show tables in default') tables_in_default_db = cur.fetchall() print tables_in_default_db for table in tables_in_default_db: print "Table: " + table try: cur.execute('describe `%s`' % (table)) table_layout = cur.fetchall() for row in table_layout: print "Column: " + row + ", type: " + row + ", comment: " + row except: print "Error describing table " + table cur.execute('select count(*) from `%s`' % (table)) result = cur.fetchall() count = str(result) print "Rows = " + count except: print "Error getting list of tables." cur.close() except: print "Error establishing connection to Impala."
In addition to writing Python programs that call into Impala, you can write simple UDFs in Python through
impyla, ship the resulting binaries from a development machine to your Impala cluster, and the functions from Impala queries. To use this capability, you need certain other software prerequisites on the development machine (for example, LLVM) and should be familiar with the data types used in C++ UDFs for Impala.
Optimizing Impala Performance
If you come from a traditional database background, you might have engraved in your mind the notion that indexes are crucial for query speed. If your experience extends to data warehousing environments, you might be comfortable with the idea of doing away with indexes, because it’s often more efficient when doing heavy duty analysis to just scan the entire table or certain partitions.
Impala embraces this data warehousing approach of avoiding indexes by not having any indexes at all. After all, data files can be added to HDFS at any time by components other than Impala. Index maintenance would be very expensive. The HDFS storage subsystem is optimized for fast reads of big chunks of data. So the types of queries that can be expensive in a traditional database system are standard operating procedure for Impala, as long as you follow the best practices for performance.
Having said that, the laws of physics still apply, and if there is a way for a query to read, evaluate, and transmit less data overall, of course the query will be proportionally faster as a result. With Impala, the biggest I/O savings come from using partitioned tables and choosing the most appropriate file format. The most complex and resource-intensive queries tend to involve join operations, and the critical factor there is to collect statistics (using the
COMPUTE STATS statement) for all the tables involved in the join.
The following sections give some guidelines for optimizing performance and scalability for queries and overall memory usage. For those who prefer to learn by doing, later sections show examples and tutorials for file formats (“Tutorial: The Journey of a Billion Rows”), partitioned tables (“Making a Partitioned Table”), and join queries and table statistics (“Deep Dive: Joins and the Role of Statistics”).
Optimizing Query Performance
The most resource-intensive and performance-critical Impala queries tend to be joins: pulling together related data from multiple tables. For all tables involved in join queries, issue a
COMPUTE STATS statement after loading initial data into a table, or adding new data that changes the table size by 30% or more.
When a table has a column or set of columns that’s almost always used for filtering, such as date or geographic region, consider partitioning that table by that column or columns. Partitioning allows queries to analyze the rows containing specific values of the partition key columns, and avoid reading partitions with irrelevant data. Because the
COMPUTE STATS statement by default scans the entire table, it can be impractical for the largest partitioned tables, especially when a data load operation only involves a single new partition. In Impala 2.1 and higher, you can use the
COMPUTE INCREMENTAL STATS statement for partitioned tables, to limit the scanning to new or changed partitions.
At the end of your ETL process, you want the data to be in a file format that is efficient for data-warehouse-style queries. In practice, Parquet format is the most efficient for Impala. Other binary formats such as Avro are also more efficient than delimited text files.
See “Tutorial: The Journey of a Billion Rows” for a sequence of examples that explores all these aspects of query tuning. For more background information, see the related discussions of joins and statistics (“Deep Dive: Joins and the Role of Statistics”), file formats (“File Formats”) including Parquet (“Parquet Files: The Biggest Blocks of All”), and partitioning (“Working with Partitioned Tables”).
Optimizing Memory Usage
This section provides guidelines and strategies for keeping memory use low. Efficient use of memory is important for overall performance, and also for scalability in a highly concurrent production setup.
For many kinds of straightforward queries, Impala uses a modest and predictable amount of memory, regardless of the size of the table. As intermediate results become available from different nodes in the cluster, the data is sent back to the coordinator node rather than being buffered in memory. For example,
conditions both read data from disk using modestly sized read buffers, regardless of the volume of data or the HDFS block size.
Certain kinds of clauses increase the memory requirement. For example,
ORDER BY involves sorting intermediate results on remote nodes. (Although in Impala 1.4 and later, the maximum memory used by
ORDER BY is lower than in previous releases, and very large sort operations write to a work area on disk to keep memory usage under control.)
GROUP BY involves building in-memory data structures to keep track of the intermediate result for each group.
DISTINCT also build in-memory data structures to prune duplicate values.
The size of the additional work memory does depend on the amount and types of data in the table. Luckily, you don’t need all this memory on any single machine, but rather spread across all the DataNodes of the cluster.
Calls to aggregation functions such as
SUM() reduce the size of the overall data. The working memory for those functions themselves is proportional to the number of groups in the
GROUP BY clause. For example, computing
SUM() for an entire table involves very little memory because only a single variable is needed to hold the intermediate sum. Using
SUM() in a query with
GROUP BY year involves one intermediate variable corresponding to each year, presumably not many different values. A query calling an aggregate function with
GROUP BY unique_column could have millions or billions of different groups, where the time and memory to compute all the different aggregate values could be substantial.
UNION operator does more work than the
UNION ALL operator, because
UNION collects the values from both sides of the query and then eliminates duplicates. Therefore, if you know there will be no duplicate values, or there is no harm in having duplicates, use
UNION ALL instead of
UNION. If duplicates are not allowed, sometimes it is more efficient to do a fast
UNION ALL query in a
WITH clause or a subquery, and add an additional outer query that does a
DISTINCT on the final result set.
LIMIT clause puts a cap on the number of results, allowing the nodes performing the distributed query to skip unnecessary processing. If you know you need a maximum of N results, include a
LIMIT N clause so that Impala can return the results faster.
GROUP BY clause involving a
STRING column is much less efficient than with a numeric column. This is one of the cases where it makes sense to normalize data, replacing long or repeated string values with numeric IDs.
INT is the most familiar integer type, if you are dealing with values that fit into smaller ranges (such as 1–12 for month and 1–31 for day), specifying the “smallest” appropriate integer type means the hash tables, intermediate result sets, and so on will use 1/2, 1/4, or 1/8 as much memory for the data from those columns. Use the other integer types (
BIGINT) when appropriate based on the range of values.
You can also do away with separate time-based fields in favor of a single
TIMESTAMP column. The
EXTRACT() function lets you pull out the individual fields when you need them. Sometimes, it makes sense to include both a
TIMESTAMP column to do fast tests of which event came first, as well as individual columns for year, month, and so on to use as partition keys.
Although most of the Impala memory considerations revolve around queries, inserting into a Parquet table (especially a partitioned Parquet table) can also use substantial memory. One data block’s worth of memory (originally 1 GB, now typically 256 MB) of Parquet data is buffered in memory before being written to disk. With a partitioned Parquet table, there could be this same amount of memory used for each partition being inserted into, multiplied by the number of nodes in the cluster, multiplied again by the number of cores on each node.
Use one of the following techniques to minimize memory use when writing to Parquet tables:
Impala can determine when an
INSERT … SELECTinto a partitioned table is especially memory-intensive and redistribute the work to avoid excessive memory usage. For this optimization to be effective, you must issue a
COMPUTE STATSstatement for the source table where the data is being copied from, so that Impala can make a correct estimate of the volume and distribution of data being inserted.
If statistics are not available for the source table, or the automatic memory estimate is inaccurate, you can force lower memory usage for the
INSERTstatement by including the
[SHUFFLE]hint immediately before the
SELECTkeyword in the
INSERT ... SELECTstatement.
Running a separate
INSERTstatement for each partition minimizes the number of memory buffers allocated at any one time. In the
INSERTstatement, include a clause
…)to specify constant values for all the partition key columns.
Working with Partitioned Tables
In Impala, as in large-scale data warehouse systems, the primary way for a schema designer to speed up queries is to create partitioned tables. The data is physically divided based on all the different values in one column or a set of columns, known as the partition key columns. Partitioning acts like indexes, instead of looking up one row at a time from widely scattered items, the rows with identical partition keys are physically grouped together. Impala uses the fast bulk I/O capabilities of HDFS to read all the data stored in particular partitions, based on references to the partition key columns in
WHERE or join clauses.
With Impala, partitioning is ready to go out of the box with no setup required. It’s expected that practically every user will employ partitioning for their tables that truly qualify as Big Data.
Frequently tested columns like
COUNTRY, and so on make good partition keys. For example, if you partition on a
YEAR column, all the data for a particular year can be physically placed together on disk. Queries with clauses such as
WHERE YEAR = 1987 or
WHERE YEAR BETWEEN 2006 AND 2009 can zero in almost instantly on the data to read, and then read that data very efficiently because all the rows are located adjacent to each other in a few large files.
Partitioning is great for reducing the overall amount of data to read, which in turn reduces the CPU cycles to test column values and the memory to hold intermediate results. All these reductions flow straight through to the bottom line: faster query performance. If you have 100 years worth of historical data, and you want to analyze only the data for 1 year, you can do that 100 times as fast with a partitioned table as with an unpartitioned one (all else being equal).
This section provides some general guidelines. For demonstrations of some of these techniques, see “Making a Partitioned Table”.
Finding the Ideal Granularity
Now that I have told you how partitioning makes your queries faster, let’s look at some design aspects for partitioning in Impala (or Hadoop in general). Sometimes, taking an existing partitioned table from a data warehouse and reusing the schema as-is isn’t optimal for Impala.
Remember, Hadoop’s HDFS filesystem does best with a relatively small number of big files. (By big, we mean in the range of 128 MB to 1 GB; ideally, nothing smaller than 64 MB.) If you partition on columns that are so fine-grained that each partition has very little data, the bulk I/O and parallel processing of Hadoop mostly goes to waste. Thus, often you’ll find that an existing partitioning scheme needs to be reduced by one level to put sufficient data in each partition.
For example, if a table was partitioned by year, month, and day in pre-Hadoop days, you might get more efficient queries by partitioning only for year and month in Impala. Or if you have an older table partitioned by city and state, maybe a more efficient layout for Impala is only partitioned by state (or even by region). From the Hadoop point of view, it’s not much different to read a 40 MB partition than it is to read a 20 MB one, and reading only 5 MB is unlikely to see much advantage from Hadoop strengths like parallel execution. This is especially true if you frequently run reports that hit many different partitions, such as when you partition down to the day but then run reports for an entire month or a full year.
When you take a column from a traditional table and turn it into a key column for a partitioned table, the volume of data for that column essentially evaporates. Instead of a billion integers in a
YEAR column, now you have a directory named
YEAR=2014, another directory named
YEAR=2015, and so on. The data files inside those directories are smaller than before because they do not have a
YEAR column anymore; the
YEAR value is deduced based on which partition the files are in. Take that shrinkage into account when finding the balance between the number of partition key columns and the volume of data inside each partition. This factor is also why you would choose partition key columns with low to moderate cardinality: having only 2 or 10 directories doesn’t help to filter the data very much, while having 50,000 directories shifts much of the query overhead to the Hadoop NameNode and the metastore database to track down the locations of all the data files.
Inserting into Partitioned Tables
When you insert into a partitioned table, again Impala parallelizes that operation. If the data has to be split up across many different partitions, that means many data files being written to simultaneously, which can exceed limits on things like HDFS file descriptors. When you insert into Parquet tables, each data file being written requires a memory buffer equal to the Parquet block size, which by default is 1 GB for Impala. Thus, what seems like a relatively innocuous operation (copy 10 years of data into a table partitioned by year, month, and day) can take a long time or even fail, despite a low overall volume of information. Here again, it’s better to work with big chunks of information at once. Impala
INSERT syntax lets you work with one partition at a time:
CREATE TABLE raw_data (year SMALLINT, month TINYINT, c1 STRING, c2 INT, c3 BOOLEAN); -- Load some data into this unpartitioned table... CREATE TABLE partitioned_table (c1 STRING, c2 INT, c3 BOOLEAN) PARTITIONED BY (year SMALLINT, month TINYINT); -- Copy data into the partitioned table, one partition at a time. INSERT INTO partitioned_table PARTITION (year=2000, month=1) SELECT c1, c2, c3 FROM raw_data WHERE year=2000 AND month=1; INSERT INTO partitioned_table PARTITION (year=2000, month=2) SELECT c1, c2, c3 FROM raw_data WHERE year=2000 AND month=2; ...
It’s easy to write a query that generates a set of
INSERT statements like this by finding all the distinct values for the partition key columns. Then you can run the resulting statements in a SQL script. For example:
SELECT DISTINCT concat('insert into partitioned_table partition (year=', cast(year as string),', month=',cast(month as string), ') select c1, c2, c3 from raw_data where year=', cast(year as string),' and month=',cast(month as string),';') AS command FROM raw_data; +---------------------------------------------------------------------... | command ... +---------------------------------------------------------------------... | insert into partitioned_table partition (year=2000, month=1) select ... | insert into partitioned_table partition (year=2000, month=2) select ... | insert into partitioned_table partition (year=2000, month=3) select ... ...
When you run Impala queries to generate other SQL statements, start
impala-shell with the
-B option. That option suppresses the ASCII boxes around query results, making the output easier to redirect or copy and paste into a script file. See “Tutorial: Verbose and Quiet impala-shell Output” for examples.
Adding and Loading New Partitions
One of the convenient aspects of Impala partitioned tables is that the partitions are just HDFS directories, where you can put data files without going through any file conversion or even Impala
INSERT statements. In this example, you create the partitions individually and use the
LOAD DATA statement or some mechanism outside Impala to ingest the data.
-- Set up empty partitions. ALTER TABLE partitioned_table ADD PARTITION (year=2010, month=1); ALTER TABLE partitioned_table ADD PARTITION (year=2010, month=2); ... ALTER TABLE partitioned_table ADD PARTITION (year=2014, month=1); ALTER TABLE partitioned_table ADD PARTITION (year=2014, month=2); ... -- Move data that already exists in HDFS into appropriate partition directories. LOAD DATA INPATH '/user/warehouse/this_year/january' INTO partitioned_table PARTITION (year=2014, month=1); LOAD DATA INPATH '/user/warehouse/this_year/february' INTO partitioned_table PARTITION (year=2014, month=2); -- Or tell Impala to look for specific partitions in specific HDFS directories. ALTER TABLE partitioned_table PARTITION (year=2014, month=3) SET LOCATION '/user/warehouse/this_year/march'; -- If the files are not already in HDFS, shell out to an external command -- that does 'hdfs dfs -put' or similar. ! load_projected_data_for_2020.sh -- Make Impala aware of the files that were added by non-SQL means. REFRESH partitioned_table;
See “Anti-Pattern: A Million Little Pieces” for some other tricks you can use to avoid fragmentation and excessive memory use when inserting into partitioned Parquet tables.
Keeping Statistics Up to Date for Partitioned Tables
COMPUTE STATS statement scans the entire table by default, it can be impractical for the largest partition tables, especially when a data load operation involves only a single new partition. In Impala 2.1 and higher, you can use the
COMPUTE INCREMENTAL STATS statement for partitioned tables, to limit the scanning to new or changed partitions.
The following example shows how the
COMPUTE INCREMENTAL STATS statement fills in table and column statistics, such as the
#Rows value, and skipping those partitions that already have incremental stats. Therefore, it is an inexpensive operation to run after adding and loading data into a new partition.
-- Initially, none of the partitions have incremental stats. SHOW PARTITIONS partitioned_table; +-------+-------+-------+--------+------+...+--------+-------------------+... | year | month | #Rows | #Files | Size |...| Format | Incremental stats |... +-------+-------+-------+--------+------+...+--------+-------------------+... | 2000 | 1 | -1 | 1 | 11B |...| TEXT | false |... | 2000 | 2 | -1 | 1 | 11B |...| TEXT | false |... | Total | | -1 | 2 | 22B |...| | |... +-------+-------+-------+--------+------+...+--------+-------------------+... COMPUTE INCREMENTAL STATS partitioned_table; +-----------------------------------------+ | summary | +-----------------------------------------+ | Updated 2 partition(s) and 3 column(s). | +-----------------------------------------+ -- Now all the partitions have incremental stats. -- Notice the #Rows value is now filled in. SHOW PARTITIONS partitioned_table; +-------+-------+-------+--------+------+...+--------+-------------------+... | year | month | #Rows | #Files | Size |...| Format | Incremental stats |... +-------+-------+-------+--------+------+...+--------+-------------------+... | 2000 | 1 | 1 | 1 | 11B |...| TEXT | true |... | 2000 | 2 | 1 | 1 | 11B |...| TEXT | true |... | Total | | 2 | 2 | 22B |...| | |... +-------+-------+-------+--------+------+...+--------+-------------------+... ALTER TABLE partitioned_table ADD PARTITION (year=2000, month=3); -- After a new partition is added, its statistics such as #Rows -- are initially unknown and the Incremental stats field is false. SHOW PARTITIONS partitioned_table; +-------+-------+-------+--------+------+...+--------+-------------------+... | year | month | #Rows | #Files | Size |...| Format | Incremental stats |... +-------+-------+-------+--------+------+...+--------+-------------------+... | 2000 | 1 | 1 | 1 | 11B |...| TEXT | true |... | 2000 | 2 | 1 | 1 | 11B |...| TEXT | true |... | 2000 | 3 | -1 | 0 | 0B |...| TEXT | false |... | Total | | 2 | 2 | 22B |...| | |... +-------+-------+-------+--------+------+...+--------+-------------------+... -- Only the single newly added partition is processed. COMPUTE INCREMENTAL STATS partitioned_table; +-----------------------------------------+ | summary | +-----------------------------------------+ | Updated 1 partition(s) and 3 column(s). | +-----------------------------------------+ -- Once again, all partitions now have up-to-date statistics. SHOW PARTITIONS partitioned_table; +-------+-------+-------+--------+------+...+--------+-------------------+... | year | month | #Rows | #Files | Size |...| Format | Incremental stats |... +-------+-------+-------+--------+------+...+--------+-------------------+... | 2000 | 1 | 1 | 1 | 11B |...| TEXT | true |... | 2000 | 2 | 1 | 1 | 11B |...| TEXT | true |... | 2000 | 3 | 0 | 0 | 0B |...| TEXT | true |... | Total | | 2 | 2 | 22B |...| | |... +-------+-------+-------+--------+------+...+--------+-------------------+...
Writing User-Defined Functions
If you have exotic algorithms or high-performance computations coded in C++ but you want users to go through a familiar SQL interface rather than you writing a whole C++ application, you can encapsulate the special code in a user-defined function (UDF), and call that function from SQL in the same way as a built-in Impala function.
For best performance, write any UDFs in C++. UDFs can also use a Java interface, but the option is primarily for reusing existing UDFs written for Hive.
Scalar UDFs produce a value for each input row, and are primarily for convenience and readability; you can bundle complex string processing or arithmetic operations into a single function call, possibly more efficient than building the same logic with a sequence of expressions within the query.
User-defined aggregate functions (UDAFs) are more complex. They return one or many values based on groups of related values from a table. If your analytic software relies on “secret sauce” algorithms that give you a competitive edge, you would likely implement those as UDAFs for Impala. (Because UDAFs build up their results over potentially millions or billions of calls to the same function, your pointer arithmetic and memory allocation need to be thoroughly debugged before executing the code inside Impala.)
Coding UDFs and UDAFs is beyond the scope of this book. For instructions for C++ and Java UDFs, see the Impala documentation. For header files, build environment, and code examples, see the Impala UDF developer GitHub repository.
You can also write simple UDFs in Python, using the impyla Python package (“The impyla Package for Python Scripting”).
Collaborating with Your Administrators
Although you can do a substantial amount of coding and testing in a purely development environment, at some point you will probably interact in some way with production systems where security policies and resource allocations are controlled by administrators. You might hand off queries, scripts, or JDBC applications to be run in a production environment. You might connect directly to a production system to run ad hoc queries. Or you might be in a devops role where you share both development and administration responsibilities.
Although the details of Impala administration are outside the scope of this book, here are some tips to help set expectations and smooth communications between you as a developer and the administrators in your organization.
Designing for Security
In a development environment, you might have wide-open access to all the data. In a production environment, access is likely controlled at the database, table, and even column level by the Sentry authorization system. Make life easier for administrators by grouping related tables logically into databases so that users can be granted privileges on all the tables in a database at once. Use consistent naming conventions for tables and columns to make it easier for an administrator to define views that access subsets of columns corresponding to the data that can be accessed by different classes of users. (For bonus points, create such views up front as part of your schema design process.) For example, in a healthcare organization, a table might contain some data that is only available to doctors, a different subset of information that is available to lab technicians, and yet another subset of information that is available to health insurance providers. Find out up front if your organization has already defined classes of users like these.
Anticipate Memory Usage
It is common for database vendors to suggest allocating a high percentage of physical memory, often 80% or more, for exclusive use of database software. Impala also benefits from having access to large amounts of physical memory for processing intermediate results from clauses like joins,
ORDER BY, and
GROUP BY. On a development system, you might be spoiled by having exclusive access to all memory for all machines. Practice running all your SQL with memory limits that reflect how much RAM will be available to Impala in the production environment. Examine estimated and actual memory requirements for queries, using the
PROFILE commands in
impala-shell. Receiving “out of memory” errors typically means that you are missing statistics that help Impala to plan and distribute the work for the most resource-intensive queries, or that you should take other tuning steps to help the queries execute using less RAM.
Understanding Resource Management
In addition to segmenting users and applications for security purposes, an administrator might use YARN and Impala’s admission control feature to provide different proportions of cluster resources to different groups of users. The resource allocation policies help prioritize and schedule the work across different Hadoop components on a busy cluster, ensuring that everybody stays within their defined limits for overall RAM and CPU usage, and in Impala’s case, the number of queries allowed to execute concurrently. Thus, be prepared to discuss whether queries for different types of users are more frequent, or more memory- and CPU-intensive than others to help administrators set up the resource allocation policies for a busy cluster. Understand the memory usage of particular queries and how that memory usage varies depending on the amount of data, because the production environment might have larger data volume than the development and test environment.
Helping to Plan for Performance (Stats, HDFS Caching)
No matter how well you design your schema and how efficient you make your queries, when your code goes into production, it might perform differently than in your dev/test environment. The cluster will likely be running other workloads at the same time—both Impala queries and non-Impala jobs. The volume of data in your tables might go up as new data arrives, or go down as older partitions are dropped.
Two important features to help your code perform well in an ever-changing environment are the
COMPUTE STATS statement and HDFS caching.
COMPUTE STATS statement gathers metadata that lets Impala optimize resource-intensive queries and insert operations, particularly join queries and
INSERTs, into partitioned Parquet tables. The administrator might need to run
COMPUTE STATS periodically whenever data volume in a table changes by a substantial amount. (Use 30% as a guideline, and then do your own testing to see how the explain plans for your join queries change depending on data volumes and the presence or absence of statistics.) Practice automating this step in any data-loading scripts you create. Communicate to your administrator all the tables involved in join queries, which are the most important ones when it comes to keeping the statistics up-to-date.
In Impala 2.1 and higher, the
COMPUTE INCREMENTAL STATS statement provides a fast way to scan just newly added partitions for the relevant metadata. See “Keeping Statistics Up to Date for Partitioned Tables” for details.
HDFS caching helps reduce I/O and memory-to-memory copying by keeping specified tables and partitions entirely in a special memory cache area. (The size of this cache does not count against the memory limits you can set for Impala.) The data is cached persistently, rather than being evicted each time new data is read. Therefore, it is suitable for frequently queried lookup tables, or tables and partitions that are being intensively queried during a particular timeframe. The administrator sets the size of the HDFS cache and divides it into cache pools with different characteristics for use by different classes of applications. Again, practice with this feature in your dev/test environment, and be prepared to discuss with your administrator which tables and partitions are most valuable to cache, and which cache pool they should go into. (The caching information can be set up with the initial
CREATE TABLE statements, or applied later through
ALTER TABLE statements.) The major benefit of this feature is scalability in a high-concurrency environment, so don’t be discouraged if you don’t see a big performance difference in a single-user test environment.
The original Impala integration with HDFS caching tended to shift query overhead from I/O spread across multiple hosts, to CPU load concentrated on a single host. In Impala 2.2 and higher, the
WITH REPLICATION clause for
CREATE TABLE and
ALTER TABLE specifies how many hosts should cache the same data blocks, letting Impala spread the CPU load across more machines. Because this feature is being improved based on real-world experiences and feedback, check the “New Features” page for each Impala release for improvements related to HDFS caching.
Understanding Cluster Topology
As a developer, you might work with a different cluster setup than is actually used in production. Here are some things to watch out for, to understand the performance and scalability implications as your application moves from a dev/test setup into production:
For basic functional testing, you might use a single-node setup, perhaps running inside a virtual machine. You can check SQL compatibility, try out built-in functions, check data type compatibility and experiment with
CAST(), see that your custom UDFs work correctly, and so on. (Perhaps with relatively small data volume, just to check correctness.)
To see what happens with distributed queries, you could use a relatively small cluster, such as two or four nodes. This allows you to see some performance and scalability benefits from parallelizing the queries. On a dev/test cluster, the NameNode is probably on the same host as one of the DataNodes, which is not a problem when the cluster is running under a light workload.
For production, you’ll probably have a separate host for the NameNode, and a substantial number of DataNodes. Here, the chances of a node failing are greater. (In this case, rerun any queries that were in flight.) Or one node might experience a performance issue, dragging down the response time of queries. (This type of problem is best detected with monitoring software such as Cloudera Manager.) Also, this is the time to double-check the guideline about installing Impala on all the DataNodes in the cluster (to avoid I/O slowdown due to remote reads) and only on the DataNodes (to avoid using up memory and CPU unnecessarily on the NameNode, which has a lot of work to do on a busy cluster).
Avoid using machines of widely different capacities as DataNodes in the same cluster. Try to have roughly equivalent RAM, speed of storage devices, CPU speed, and number of cores. If some machines are substantially beefier than others, query performance might be gated by the slowest machines. One machine might still be churning away, perhaps running out of memory and writing temporary data to disk, after all the others have finished.
In a production system, don’t skimp on the hardware capacity for the system running the metastore database. Because the largest tables in a Hadoop cluster can grow to tens of thousands of partitions and associated data files, the work to pull all the relevant information from the metastore database can become a significant factor in the time for query planning or performing DDL operations such as adding partitions.
Always Close Your Queries
Because Impala queries can be resource-intensive, production deployments typically use strategies such as resource management and admission control to cap the number of concurrent queries at a level the cluster can comfortably accommodate. This is a constraint you might not face in a development environment. In an application that submits queries through an interface such as JDBC, ODBC, HiveServer2, or Beeswax, make sure that all queries are closed when finished. Address this aspect in all execution paths and error handling. Otherwise, your application could leave “zombie” unclosed queries that fill up the available execution slots and prevent other queries from running. If this happens, expect a call from your administrator.