Chapter 4. Client API: Advanced Features

Now that you understand the basic client API, we will discuss the advanced features that HBase offers to clients.

Filters

HBase filters are a powerful feature that can greatly enhance your effectiveness when working with data stored in tables. You will find predefined filters, already provided by HBase for your use, as well as a framework you can use to implement your own. You will now be introduced to both.

Introduction to Filters

The two prominent read functions for HBase are get() and scan(), both supporting either direct access to data or the use of a start and end key, respectively. You can limit the data retrieved by progressively adding more limiting selectors to the query. These include column families, column qualifiers, timestamps or ranges, as well as version number.

While this gives you control over what is included, it is missing more fine-grained features, such as selection of keys, or values, based on regular expressions. Both classes support filters for exactly these reasons: what cannot be solved with the provided API functionality to filter row or column keys, or values, can be achieved with filters. The base interface is aptly named Filter, and there is a list of concrete classes supplied by HBase that you can use without doing any programming.

You can, on the other hand, extend the Filter classes to implement your own requirements. All the filters are actually applied on the server side, also called predicate pushdown. This ensures the most efficient selection of the data that needs to be transported back to the client. You could implement most of the filter functionality in your client code as well, but you would have to transfer much more data—something you need to avoid at scale.

Figure 4-1 shows how the filters are configured on the client, then serialized over the network, and then applied on the server.

The filters created on the client side, sent through the RPC, and executed on the server side
Figure 4-1. The filters created on the client side, sent through the RPC, and executed on the server side

The filter hierarchy

The lowest level in the filter hierarchy is the Filter interface, and the abstract FilterBase class that implements an empty shell, or skeleton, that is used by the actual filter classes to avoid having the same boilerplate code in each of them.

Most concrete filter classes are direct descendants of FilterBase, but a few use another, intermediate ancestor class. They all work the same way: you define a new instance of the filter you want to apply and hand it to the Get or Scan instances, using:

setFilter(filter)

While you initialize the filter instance itself, you often have to supply parameters for whatever the filter is designed for. There is a special subset of filters, based on CompareFilter, that ask you for at least two specific parameters, since they are used by the base class to perform its task. You will learn about the two parameter types next so that you can use them in context.

Note

Filters have access to the entire row they are applied to. This means that they can decide the fate of a row based on any available information. This includes the row key, column qualifiers, actual value of a column, timestamps, and so on.

When referring to values, or comparisons, as we will discuss shortly, this can be applied to any of these details. Specific filter implementations are available that consider only one of those criteria each.

Comparison operators

As CompareFilter-based filters add one more feature to the base FilterBase class, namely the compare() operation, it has to have a user-supplied operator type that defines how the result of the comparison is interpreted. The values are listed in Table 4-1.

Table 4-1. The possible comparison operators for CompareFilter-based filters
OperatorDescription
LESSMatch values less than the provided one.
LESS_OR_EQUALMatch values less than or equal to the provided one.
EQUALDo an exact match on the value and the provided one.
NOT_EQUALInclude everything that does not match the provided value.
GREATER_OR_EQUALMatch values that are equal to or greater than the provided one.
GREATEROnly include values greater than the provided one.
NO_OPExclude everything.

The comparison operators define what is included, or excluded, when the filter is applied. This allows you to select the data that you want as either a range, subset, or exact and single match.

Comparators

The second type that you need to provide to CompareFilter-related classes is a comparator, which is needed to compare various values and keys in different ways. They are derived from WritableByteArrayComparable, which implements Writable, and Comparable. You do not have to go into the details if you just want to use an implementation provided by HBase and listed in Table 4-2. The constructors usually take the control value, that is, the one to compare each table value against.

Table 4-2. The HBase-supplied comparators, used with CompareFilter-based filters
ComparatorDescription
BinaryComparatorUses Bytes.compareTo() to compare the current with the provided value.
BinaryPrefixComparatorSimilar to the above, but does a lefthand, prefix-based match using Bytes.compareTo().
NullComparatorDoes not compare against an actual value but whether a given one is null, or not null.
BitComparatorPerforms a bitwise comparison, providing a BitwiseOp class with AND, OR, and XOR operators.
RegexStringComparatorGiven a regular expression at instantiation this comparator does a pattern match on the table data.
SubstringComparatorTreats the value and table data as String instances and performs a contains() check.

Note

The last three comparators listed in Table 4-2—the BitComparator, RegexStringComparator, and SubstringComparatoronly work with the EQUAL and NOT_EQUAL operators, as the compareTo() of these comparators returns 0 for a match or 1 when there is no match. Using them in a LESS or GREATER comparison will yield erroneous results.

Each of the comparators usually has a constructor that takes the comparison value. In other words, you need to define a value you compare each cell against. Some of these constructors take a byte[], a byte array, to do the binary comparison, for example, while others take a String parameter—since the data point compared against is assumed to be some sort of readable text. Example 4-1 shows some of these in action.

Warning

The string-based comparators, RegexStringComparator and SubstringComparator, are more expensive in comparison to the purely byte-based version, as they need to convert a given value into a String first. The subsequent string or regular expression operation also adds to the overall cost.

Comparison Filters

The first type of supplied filter implementations are the comparison filters. They take the comparison operator and comparator instance as described earlier. The constructor of each of them has the same signature, inherited from CompareFilter:

CompareFilter(CompareOp valueCompareOp,
  WritableByteArrayComparable valueComparator)

You need to supply this comparison operator and comparison class for the filters to do their work. Next you will see the actual filters implementing a specific comparison.

Note

Please keep in mind that the general contract of the HBase filter API means you are filtering out information—filtered data is omitted from the results returned to the client. The filter is not specifying what you want to have, but rather what you do not want to have returned when reading data.

In contrast, all filters based on CompareFilter are doing the opposite, in that they include the matching values. In other words, be careful when choosing the comparison operator, as it makes the difference in regard to what the server returns. For example, instead of using LESS to skip some information, you may need to use GREATER_OR_EQUAL to include the desired data points.

RowFilter

This filter gives you the ability to filter data based on row keys.

Example 4-1 shows how the filter can use different comparator instances to get the desired results. It also uses various operators to include the row keys, while omitting others. Feel free to modify the code, changing the operators to see the possible results.

Example 4-1. Using a filter to select specific rows
    Scan scan = new Scan();
    scan.addColumn(Bytes.toBytes("colfam1"), Bytes.toBytes("col-0"));

    Filter filter1 = new RowFilter(CompareFilter.CompareOp.LESS_OR_EQUAL, 1
      new BinaryComparator(Bytes.toBytes("row-22")));
    scan.setFilter(filter1);
    ResultScanner scanner1 = table.getScanner(scan);
    for (Result res : scanner1) {
      System.out.println(res);
    }
    scanner1.close();

    Filter filter2 = new RowFilter(CompareFilter.CompareOp.EQUAL, 2
      new RegexStringComparator(".*-.5"));
    scan.setFilter(filter2);
    ResultScanner scanner2 = table.getScanner(scan);
    for (Result res : scanner2) {
      System.out.println(res);
    }
    scanner2.close();

    Filter filter3 = new RowFilter(CompareFilter.CompareOp.EQUAL, 3
      new SubstringComparator("-5"));
    scan.setFilter(filter3);
    ResultScanner scanner3 = table.getScanner(scan);
    for (Result res : scanner3) {
      System.out.println(res);
    }
    scanner3.close();
1

Create a filter, while specifying the comparison operator and comparator. Here an exact match is needed.

2

Another filter is created, this time using a regular expression to match the row keys.

3

The third filter uses a substring match approach.

Here is the full printout of the example on the console:

Adding rows to table...
Scanning table #1... 
keyvalues={row-1/colfam1:col-0/1301043190260/Put/vlen=7}
keyvalues={row-10/colfam1:col-0/1301043190908/Put/vlen=8}
keyvalues={row-100/colfam1:col-0/1301043195275/Put/vlen=9}
keyvalues={row-11/colfam1:col-0/1301043190982/Put/vlen=8}
keyvalues={row-12/colfam1:col-0/1301043191040/Put/vlen=8}
keyvalues={row-13/colfam1:col-0/1301043191172/Put/vlen=8}
keyvalues={row-14/colfam1:col-0/1301043191318/Put/vlen=8}
keyvalues={row-15/colfam1:col-0/1301043191429/Put/vlen=8}
keyvalues={row-16/colfam1:col-0/1301043191509/Put/vlen=8}
keyvalues={row-17/colfam1:col-0/1301043191593/Put/vlen=8}
keyvalues={row-18/colfam1:col-0/1301043191673/Put/vlen=8}
keyvalues={row-19/colfam1:col-0/1301043191771/Put/vlen=8}
keyvalues={row-2/colfam1:col-0/1301043190346/Put/vlen=7}
keyvalues={row-20/colfam1:col-0/1301043191841/Put/vlen=8}
keyvalues={row-21/colfam1:col-0/1301043191933/Put/vlen=8}
keyvalues={row-22/colfam1:col-0/1301043191998/Put/vlen=8}
Scanning table #2... 
keyvalues={row-15/colfam1:col-0/1301043191429/Put/vlen=8}
keyvalues={row-25/colfam1:col-0/1301043192140/Put/vlen=8}
keyvalues={row-35/colfam1:col-0/1301043192665/Put/vlen=8}
keyvalues={row-45/colfam1:col-0/1301043193138/Put/vlen=8}
keyvalues={row-55/colfam1:col-0/1301043193729/Put/vlen=8}
keyvalues={row-65/colfam1:col-0/1301043194092/Put/vlen=8}
keyvalues={row-75/colfam1:col-0/1301043194457/Put/vlen=8}
keyvalues={row-85/colfam1:col-0/1301043194806/Put/vlen=8}
keyvalues={row-95/colfam1:col-0/1301043195121/Put/vlen=8}
Scanning table #3... 
keyvalues={row-5/colfam1:col-0/1301043190562/Put/vlen=7}
keyvalues={row-50/colfam1:col-0/1301043193332/Put/vlen=8}
keyvalues={row-51/colfam1:col-0/1301043193514/Put/vlen=8}
keyvalues={row-52/colfam1:col-0/1301043193603/Put/vlen=8}
keyvalues={row-53/colfam1:col-0/1301043193654/Put/vlen=8}
keyvalues={row-54/colfam1:col-0/1301043193696/Put/vlen=8}
keyvalues={row-55/colfam1:col-0/1301043193729/Put/vlen=8}
keyvalues={row-56/colfam1:col-0/1301043193766/Put/vlen=8}
keyvalues={row-57/colfam1:col-0/1301043193802/Put/vlen=8}
keyvalues={row-58/colfam1:col-0/1301043193842/Put/vlen=8}
keyvalues={row-59/colfam1:col-0/1301043193889/Put/vlen=8}

You can see how the first filter did an exact match on the row key, including all of those rows that have a key, equal to or less than the given one. Note once again the lexicographical sorting and comparison, and how it filters the row keys.

The second filter does a regular expression match, while the third uses a substring match approach. The results show that the filters work as advertised.

FamilyFilter

This filter works very similar to the RowFilter, but applies the comparison to the column families available in a row—as opposed to the row key. Using the available combinations of operators and comparators you can filter what is included in the retrieved data on a column family level. Example 4-2 shows how to use this.

Example 4-2. Using a filter to include only specific column families
    Filter filter1 = new FamilyFilter(CompareFilter.CompareOp.LESS, 1
      new BinaryComparator(Bytes.toBytes("colfam3")));

    Scan scan = new Scan();
    scan.setFilter(filter1);
    ResultScanner scanner = table.getScanner(scan); 2
    for (Result result : scanner) {
      System.out.println(result);
    }
    scanner.close();

    Get get1 = new Get(Bytes.toBytes("row-5"));
    get1.setFilter(filter1);
    Result result1 = table.get(get1); 3
    System.out.println("Result of get(): " + result1);

    Filter filter2 = new FamilyFilter(CompareFilter.CompareOp.EQUAL,
      new BinaryComparator(Bytes.toBytes("colfam3")));
    Get get2 = new Get(Bytes.toBytes("row-5")); 4
    get2.addFamily(Bytes.toBytes("colfam1"));
    get2.setFilter(filter2);
    Result result2 = table.get(get2); 5
    System.out.println("Result of get(): " + result2);
1

Create a filter, while specifying the comparison operator and comparator.

2

Scan over the table while applying the filter.

3

Get a row while applying the same filter.

4

Create a filter on one column family while trying to retrieve another.

5

Get the same row while applying the new filter; this will return “NONE”.

The output—reformatted and abbreviated for the sake of readability—shows the filter in action. The input data has four column families, with two columns each, and 10 rows in total.

Adding rows to table...
Scanning table... 
keyvalues={row-1/colfam1:col-0/1303721790522/Put/vlen=7, 
           row-1/colfam1:col-1/1303721790574/Put/vlen=7, 
           row-1/colfam2:col-0/1303721790522/Put/vlen=7, 
           row-1/colfam2:col-1/1303721790574/Put/vlen=7}
keyvalues={row-10/colfam1:col-0/1303721790785/Put/vlen=8, 
           row-10/colfam1:col-1/1303721790792/Put/vlen=8, 
           row-10/colfam2:col-0/1303721790785/Put/vlen=8,  
           row-10/colfam2:col-1/1303721790792/Put/vlen=8}
...
keyvalues={row-9/colfam1:col-0/1303721790778/Put/vlen=7, 
           row-9/colfam1:col-1/1303721790781/Put/vlen=7, 
           row-9/colfam2:col-0/1303721790778/Put/vlen=7, 
           row-9/colfam2:col-1/1303721790781/Put/vlen=7}

Result of get(): keyvalues={row-5/colfam1:col-0/1303721790652/Put/vlen=7, 
           row-5/colfam1:col-1/1303721790664/Put/vlen=7, 
           row-5/colfam2:col-0/1303721790652/Put/vlen=7, 
           row-5/colfam2:col-1/1303721790664/Put/vlen=7}

Result of get(): keyvalues=NONE

The last get() shows that you can (inadvertently) create an empty set by applying a filter for exactly one column family, while specifying a different column family selector using addFamily().

QualifierFilter

Example 4-3 shows how the same logic is applied on the column qualifier level. This allows you to filter specific columns from the table.

Example 4-3. Using a filter to include only specific column qualifiers
    Filter filter = new QualifierFilter(CompareFilter.CompareOp.LESS_OR_EQUAL,
      new BinaryComparator(Bytes.toBytes("col-2")));

    Scan scan = new Scan();
    scan.setFilter(filter);
    ResultScanner scanner = table.getScanner(scan);
    for (Result result : scanner) {
      System.out.println(result);
    }
    scanner.close();

    Get get = new Get(Bytes.toBytes("row-5"));
    get.setFilter(filter);
    Result result = table.get(get);
    System.out.println("Result of get(): " + result);

ValueFilter

This filter makes it possible to include only columns that have a specific value. Combined with the RegexStringComparator, for example, this can filter using powerful expression syntax. Example 4-4 showcases this feature. Note, though, that with certain comparators—as explained earlier—you can only employ a subset of the operators. Here a substring match is performed and this must be combined with an EQUAL, or NOT_EQUAL, operator.

Example 4-4. Using the value-based filter
    Filter filter = new ValueFilter(CompareFilter.CompareOp.EQUAL, 1
      new SubstringComparator(".4"));

    Scan scan = new Scan();
    scan.setFilter(filter); 2
    ResultScanner scanner = table.getScanner(scan);
    for (Result result : scanner) {
      for (KeyValue kv : result.raw()) {
        System.out.println("KV: " + kv + ", Value: " + 3
          Bytes.toString(kv.getValue()));
      }
    }
    scanner.close();

    Get get = new Get(Bytes.toBytes("row-5"));
    get.setFilter(filter); 4
    Result result = table.get(get);
    for (KeyValue kv : result.raw()) {
      System.out.println("KV: " + kv + ", Value: " +
        Bytes.toString(kv.getValue()));
    }
1

Create a filter, while specifying the comparison operator and comparator.

2

Set the filter for the scan.

3

Print out the value to check that the filter works.

4

Assign the same filter to the Get instance.

DependentColumnFilter

Here you have a more complex filter that does not simply filter out data based on directly available information. Rather, it lets you specify a dependent column—or reference column—that controls how other columns are filtered. It uses the timestamp of the reference column and includes all other columns that have the same timestamp. Here are the constructors provided:

DependentColumnFilter(byte[] family, byte[] qualifier)
DependentColumnFilter(byte[] family, byte[] qualifier,
  boolean dropDependentColumn)
DependentColumnFilter(byte[] family, byte[] qualifier,
  boolean dropDependentColumn, CompareOp valueCompareOp,
  WritableByteArrayComparable valueComparator)

Since it is based on CompareFilter, it also offers you to further select columns, but for this filter it does so based on their values. Think of it as a combination of a ValueFilter and a filter selecting on a reference timestamp. You can optionally hand in your own operator and comparator pair to enable this feature. The class provides constructors, though, that let you omit the operator and comparator and disable the value filtering, including all columns by default, that is, performing the timestamp filter based on the reference column only.

Example 4-5 shows the filter in use. You can see how the optional values can be handed in as well. The dropDependentColumn parameter is giving you additional control over how the reference column is handled: it is either included or dropped by the filter, setting this parameter to false or true, respectively.

Example 4-5. Using a filter to include only specific column families
  private static void filter(boolean drop,
      CompareFilter.CompareOp operator,
      WritableByteArrayComparable comparator)
  throws IOException {
    Filter filter;
    if (comparator != null) {
      filter = new DependentColumnFilter(Bytes.toBytes("colfam1"), 1
        Bytes.toBytes("col-5"), drop, operator, comparator);
    } else {
      filter = new DependentColumnFilter(Bytes.toBytes("colfam1"),
        Bytes.toBytes("col-5"), drop);

    }

    Scan scan = new Scan();
    scan.setFilter(filter);
    ResultScanner scanner = table.getScanner(scan);
    for (Result result : scanner) {
      for (KeyValue kv : result.raw()) {
        System.out.println("KV: " + kv + ", Value: " +
          Bytes.toString(kv.getValue()));
      }
    }
    scanner.close();

    Get get = new Get(Bytes.toBytes("row-5"));
    get.setFilter(filter);
    Result result = table.get(get);
    for (KeyValue kv : result.raw()) {
      System.out.println("KV: " + kv + ", Value: " +
        Bytes.toString(kv.getValue()));
    }
  }

  public static void main(String[] args) throws IOException {
    filter(true, CompareFilter.CompareOp.NO_OP, null);
    filter(false, CompareFilter.CompareOp.NO_OP, null); 2
    filter(true, CompareFilter.CompareOp.EQUAL,
      new BinaryPrefixComparator(Bytes.toBytes("val-5")));
    filter(false, CompareFilter.CompareOp.EQUAL,
      new BinaryPrefixComparator(Bytes.toBytes("val-5")));
    filter(true, CompareFilter.CompareOp.EQUAL,
      new RegexStringComparator(".*\\.5"));
    filter(false, CompareFilter.CompareOp.EQUAL,
      new RegexStringComparator(".*\\.5"));
  }
1

Create the filter with various options.

2

Call the filter method with various options.

Warning

This filter is not compatible with the batch feature of the scan operations, that is, setting Scan.setBatch() to a number larger than zero. The filter needs to see the entire row to do its work, and using batching will not carry the reference column timestamp over and would result in erroneous results.

If you try to enable the batch mode nevertheless, you will get an error:

Exception org.apache.hadoop.hbase.filter.IncompatibleFilterException: 
  Cannot set batch on a scan using a filter that returns true for 
  filter.hasFilterRow

The example also proceeds slightly differently compared to the earlier filters, as it sets the version to the column number for a more reproducible result. The implicit timestamps that the servers use as the version could result in fluctuating results as you cannot guarantee them using the exact time, down to the millisecond.

The filter() method used is called with different parameter combinations, showing how using the built-in value filter and the drop flag is affecting the returned data set.

Dedicated Filters

The second type of supplied filters are based directly on FilterBase and implement more specific use cases. Many of these filters are only really applicable when performing scan operations, since they filter out entire rows. For get() calls, this is often too restrictive and would result in a very harsh filter approach: include the whole row or nothing at all.

SingleColumnValueFilter

You can use this filter when you have exactly one column that decides if an entire row should be returned or not. You need to first specify the column you want to track, and then some value to check against. The constructors offered are:

SingleColumnValueFilter(byte[] family, byte[] qualifier,
  CompareOp compareOp, byte[] value)
SingleColumnValueFilter(byte[] family, byte[] qualifier,
  CompareOp compareOp, WritableByteArrayComparable comparator)

The first one is a convenience function as it simply creates a BinaryComparator instance internally on your behalf. The second takes the same parameters we used for the CompareFilter-based classes. Although the SingleColumnValueFilter does not inherit from the CompareFilter directly, it still uses the same parameter types.

The filter class also exposes a few auxiliary methods you can use to fine-tune its behavior:

boolean getFilterIfMissing()
void setFilterIfMissing(boolean filterIfMissing)
boolean getLatestVersionOnly()
void setLatestVersionOnly(boolean latestVersionOnly)

The former controls what happens to rows that do not have the column at all. By default, they are included in the result, but you can use setFilterIfMissing(true) to reverse that behavior, that is, all rows that do not have the reference column are dropped from the result.

Note

You must include the column you want to filter by, in other words, the reference column, into the families you query for—using addColumn(), for example. If you fail to do so, the column is considered missing and the result is either empty, or contains all rows, based on the getFilterIfMissing() result.

By using setLatestVersionOnly(false)—the default is true—you can change the default behavior of the filter, which is only to check the newest version of the reference column, to instead include previous versions in the check as well. Example 4-6 combines these features to select a specific set of rows only.

Example 4-6. Using a filter to return only rows with a given value in a given column
    SingleColumnValueFilter filter = new SingleColumnValueFilter(
      Bytes.toBytes("colfam1"),
      Bytes.toBytes("col-5"),
      CompareFilter.CompareOp.NOT_EQUAL,
      new SubstringComparator("val-5"));
    filter.setFilterIfMissing(true);

    Scan scan = new Scan();
    scan.setFilter(filter);
    ResultScanner scanner = table.getScanner(scan);
    for (Result result : scanner) {
      for (KeyValue kv : result.raw()) {
        System.out.println("KV: " + kv + ", Value: " +
          Bytes.toString(kv.getValue()));
      }
    }
    scanner.close();

    Get get = new Get(Bytes.toBytes("row-6"));
    get.setFilter(filter);
    Result result = table.get(get);
    System.out.println("Result of get: ");
    for (KeyValue kv : result.raw()) {
      System.out.println("KV: " + kv + ", Value: " +
        Bytes.toString(kv.getValue()));
    }

SingleColumnValueExcludeFilter

The SingleColumnValueFilter we just discussed is extended in this class to provide slightly different semantics: the reference column, as handed into the constructor, is omitted from the result. In other words, you have the same features, constructors, and methods to control how this filter works. The only difference is that you will never get the column you are checking against as part of the Result instance(s) on the client side.

PrefixFilter

Given a prefix, specified when you instantiate the filter instance, all rows that match this prefix are returned to the client. The constructor is:

public PrefixFilter(byte[] prefix)

Example 4-7 has this applied to the usual test data set.

Example 4-7. Using the prefix-based filter
    Filter filter = new PrefixFilter(Bytes.toBytes("row-1"));

    Scan scan = new Scan();
    scan.setFilter(filter);
    ResultScanner scanner = table.getScanner(scan);
    for (Result result : scanner) {
      for (KeyValue kv : result.raw()) {
        System.out.println("KV: " + kv + ", Value: " +
          Bytes.toString(kv.getValue()));
      }
    }
    scanner.close();

    Get get = new Get(Bytes.toBytes("row-5"));
    get.setFilter(filter);
    Result result = table.get(get);
    for (KeyValue kv : result.raw()) {
      System.out.println("KV: " + kv + ", Value: " +
        Bytes.toString(kv.getValue()));
    }

It is interesting to see how the get() call fails to return anything, because it is asking for a row that does not match the filter prefix. This filter does not make much sense when doing get() calls but is highly useful for scan operations.

The scan also is actively ended when the filter encounters a row key that is larger than the prefix. In this way, and combining this with a start row, for example, the filter is improving the overall performance of the scan as it has knowledge of when to skip the rest of the rows altogether.

PageFilter

You paginate through rows by employing this filter. When you create the instance, you specify a pageSize parameter, which controls how many rows per page should be returned.

Note

There is a fundamental issue with filtering on physically separate servers. Filters run on different region servers in parallel and cannot retain or communicate their current state across those boundaries. Thus, each filter is required to scan at least up to pageCount rows before ending the scan. This means a slight inefficiency is given for the PageFilter as more rows are reported to the client than necessary. The final consolidation on the client obviously has visibility into all results and can reduce what is accessible through the API accordingly.

The client code would need to remember the last row that was returned, and then, when another iteration is about to start, set the start row of the scan accordingly, while retaining the same filter properties.

Because pagination is setting a strict limit on the number of rows to be returned, it is possible for the filter to early out the entire scan, once the limit is reached or exceeded. Filters have a facility to indicate that fact and the region servers make use of this hint to stop any further processing.

Example 4-8 puts this together, showing how a client can reset the scan to a new start row on the subsequent iterations.

Example 4-8. Using a filter to paginate through rows
    Filter filter = new PageFilter(15);

    int totalRows = 0;
    byte[] lastRow = null;
    while (true) {
      Scan scan = new Scan();
      scan.setFilter(filter);
      if (lastRow != null) {
        byte[] startRow = Bytes.add(lastRow, POSTFIX);
        System.out.println("start row: " +
          Bytes.toStringBinary(startRow));
        scan.setStartRow(startRow);
      }
      ResultScanner scanner = table.getScanner(scan);
      int localRows = 0;
      Result result;
      while ((result = scanner.next()) != null) {
        System.out.println(localRows++ + ": " + result);
        totalRows++;
        lastRow = result.getRow();
      }
      scanner.close();
      if (localRows == 0) break;
    }
    System.out.println("total rows: " + totalRows);

Because of the lexicographical sorting of the row keys by HBase and the comparison taking care of finding the row keys in order, and the fact that the start key on a scan is always inclusive, you need to add an extra zero byte to the previous key. This will ensure that the last seen row key is skipped and the next, in sorting order, is found. The zero byte is the smallest increment, and therefore is safe to use when resetting the scan boundaries. Even if there were a row that would match the previous plus the extra zero byte, the scan would be correctly doing the next iteration—this is because the start key is inclusive.

KeyOnlyFilter

Some applications need to access just the keys of each KeyValue, while omitting the actual data. The KeyOnlyFilter provides this functionality by applying the filter’s ability to modify the processed columns and cells, as they pass through. It does so by applying the KeyValue.convertToKeyOnly(boolean) call that strips out the data part.

The constructor of this filter has a boolean parameter, named lenAsVal. It is handed to the convertToKeyOnly() call as-is, controlling what happens to the value part of each KeyValue instance processed. The default false simply sets the value to zero length, while the opposite true sets the value to the number representing the length of the original value.

The latter may be useful to your application when quickly iterating over columns, where the keys already convey meaning and the length can be used to perform a secondary sort, for example. Client API: Best Practices has an example.

FirstKeyOnlyFilter

If you need to access the first column—as sorted implicitly by HBase—in each row, this filter will provide this feature. Typically this is used by row counter type applications that only need to check if a row exists. Recall that in column-oriented databases a row really is composed of columns, and if there are none, the row ceases to exist.

Another possible use case is relying on the column sorting in lexicographical order, and setting the column qualifier to an epoch value. This would sort the column with the oldest timestamp name as the first to be retrieved. Combined with this filter, it is possible to retrieve the oldest column from every row using a single scan.

This class makes use of another optimization feature provided by the filter framework: it indicates to the region server applying the filter that the current row is done and that it should skip to the next one. This improves the overall performance of the scan, compared to a full table scan.

InclusiveStopFilter

The row boundaries of a scan are inclusive for the start row, yet exclusive for the stop row. You can overcome the stop row semantics using this filter, which includes the specified stop row. Example 4-9 uses the filter to start at row-3, and stop at row-5 inclusively.

Example 4-9. Using a filter to include a stop row
    Filter filter = new InclusiveStopFilter(Bytes.toBytes("row-5"));

    Scan scan = new Scan();
    scan.setStartRow(Bytes.toBytes("row-3"));
    scan.setFilter(filter);
    ResultScanner scanner = table.getScanner(scan);
    for (Result result : scanner) {
      System.out.println(result);
    }
    scanner.close();

The output on the console, when running the example code, confirms that the filter works as advertised:

Adding rows to table...
Results of scan:
keyvalues={row-3/colfam1:col-0/1301337961569/Put/vlen=7}
keyvalues={row-30/colfam1:col-0/1301337961610/Put/vlen=8}
keyvalues={row-31/colfam1:col-0/1301337961612/Put/vlen=8}
keyvalues={row-32/colfam1:col-0/1301337961613/Put/vlen=8}
keyvalues={row-33/colfam1:col-0/1301337961614/Put/vlen=8}
keyvalues={row-34/colfam1:col-0/1301337961615/Put/vlen=8}
keyvalues={row-35/colfam1:col-0/1301337961616/Put/vlen=8}
keyvalues={row-36/colfam1:col-0/1301337961617/Put/vlen=8}
keyvalues={row-37/colfam1:col-0/1301337961618/Put/vlen=8}
keyvalues={row-38/colfam1:col-0/1301337961619/Put/vlen=8}
keyvalues={row-39/colfam1:col-0/1301337961620/Put/vlen=8}
keyvalues={row-4/colfam1:col-0/1301337961571/Put/vlen=7}
keyvalues={row-40/colfam1:col-0/1301337961621/Put/vlen=8}
keyvalues={row-41/colfam1:col-0/1301337961622/Put/vlen=8}
keyvalues={row-42/colfam1:col-0/1301337961623/Put/vlen=8}
keyvalues={row-43/colfam1:col-0/1301337961624/Put/vlen=8}
keyvalues={row-44/colfam1:col-0/1301337961625/Put/vlen=8}
keyvalues={row-45/colfam1:col-0/1301337961626/Put/vlen=8}
keyvalues={row-46/colfam1:col-0/1301337961627/Put/vlen=8}
keyvalues={row-47/colfam1:col-0/1301337961628/Put/vlen=8}
keyvalues={row-48/colfam1:col-0/1301337961629/Put/vlen=8}
keyvalues={row-49/colfam1:col-0/1301337961630/Put/vlen=8}
keyvalues={row-5/colfam1:col-0/1301337961573/Put/vlen=7}

TimestampsFilter

When you need fine-grained control over what versions are included in the scan result, this filter provides the means. You have to hand in a List of timestamps:

TimestampsFilter(List<Long> timestamps)

Note

As you have seen throughout the book so far, a version is a specific value of a column at a unique point in time, denoted with a timestamp. When the filter is asking for a list of timestamps, it will attempt to retrieve the column versions with the matching timestamps.

Example 4-10 sets up a filter with three timestamps and adds a time range to the second scan.

Example 4-10. Filtering data by timestamps
    List<Long> ts = new ArrayList<Long>();
    ts.add(new Long(5));
    ts.add(new Long(10)); 1
    ts.add(new Long(15));
    Filter filter = new TimestampsFilter(ts);

    Scan scan1 = new Scan();
    scan1.setFilter(filter); 2
    ResultScanner scanner1 = table.getScanner(scan1);
    for (Result result : scanner1) {
      System.out.println(result);
    }
    scanner1.close();

    Scan scan2 = new Scan();
    scan2.setFilter(filter);
    scan2.setTimeRange(8, 12); 3
    ResultScanner scanner2 = table.getScanner(scan2);
    for (Result result : scanner2) {
      System.out.println(result);
    }
    scanner2.close();
1

Add timestamps to the list.

2

Add the filter to an otherwise default Scan instance.

3

Also add a time range to verify how it affects the filter.

Here is the output on the console in an abbreviated form:

Adding rows to table...
Results of scan #1:
keyvalues={row-1/colfam1:col-10/10/Put/vlen=8, 
           row-1/colfam1:col-15/15/Put/vlen=8, 
           row-1/colfam1:col-5/5/Put/vlen=7}
keyvalues={row-10/colfam1:col-10/10/Put/vlen=9, 
           row-10/colfam1:col-15/15/Put/vlen=9, 
           row-10/colfam1:col-5/5/Put/vlen=8}
keyvalues={row-100/colfam1:col-10/10/Put/vlen=10, 
           row-100/colfam1:col-15/15/Put/vlen=10, 
           row-100/colfam1:col-5/5/Put/vlen=9}
...
Results of scan #2:
keyvalues={row-1/colfam1:col-10/10/Put/vlen=8}
keyvalues={row-10/colfam1:col-10/10/Put/vlen=9}
keyvalues={row-100/colfam1:col-10/10/Put/vlen=10}
keyvalues={row-11/colfam1:col-10/10/Put/vlen=9}
...

The first scan, only using the filter, is outputting the column values for all three specified timestamps as expected. The second scan only returns the timestamp that fell into the time range specified when the scan was set up. Both time-based restrictions, the filter and the scanner time range, are doing their job and the result is a combination of both.

ColumnCountGetFilter

You can use this filter to only retrieve a specific maximum number of columns per row. You can set the number using the constructor of the filter:

ColumnCountGetFilter(int n)

Since this filter stops the entire scan once a row has been found that matches the maximum number of columns configured, it is not useful for scan operations, and in fact, it was written to test filters in get() calls.

ColumnPaginationFilter

Similar to the PageFilter, this one can be used to page through columns in a row. Its constructor has two parameters:

ColumnPaginationFilter(int limit, int offset)

It skips all columns up to the number given as offset, and then includes limit columns afterward. Example 4-11 has this applied to a normal scan.

Example 4-11. Paginating through columns in a row
    Filter filter = new ColumnPaginationFilter(5, 15);

    Scan scan = new Scan();
    scan.setFilter(filter);
    ResultScanner scanner = table.getScanner(scan);
    for (Result result : scanner) {
      System.out.println(result);
    }
    scanner.close();

Running this example should render the following output:

Adding rows to table...
Results of scan:
keyvalues={row-01/colfam1:col-15/15/Put/vlen=9, 
           row-01/colfam1:col-16/16/Put/vlen=9, 
           row-01/colfam1:col-17/17/Put/vlen=9, 
           row-01/colfam1:col-18/18/Put/vlen=9, 
           row-01/colfam1:col-19/19/Put/vlen=9}
keyvalues={row-02/colfam1:col-15/15/Put/vlen=9, 
           row-02/colfam1:col-16/16/Put/vlen=9, 
           row-02/colfam1:col-17/17/Put/vlen=9, 
           row-02/colfam1:col-18/18/Put/vlen=9, 
           row-02/colfam1:col-19/19/Put/vlen=9}
...

Note

This example slightly changes the way the rows and columns are numbered by adding a padding to the numeric counters. For example, the first row is padded to be row-01. This also shows how padding can be used to get a more human-readable style of sorting, for example—as known from a dictionary or telephone book.

The result includes all 10 rows, starting each row at column (offset = 15) and printing five columns (limit = 5).

ColumnPrefixFilter

Analog to the PrefixFilter, which worked by filtering on row key prefixes, this filter does the same for columns. You specify a prefix when creating the filter:

ColumnPrefixFilter(byte[] prefix)

All columns that have the given prefix are then included in the result.

RandomRowFilter

Finally, there is a filter that shows what is also possible using the API: including random rows into the result. The constructor is given a parameter named chance, which represents a value between 0.0 and 1.0:

RandomRowFilter(float chance)

Internally, this class is using a Java Random.nextFloat() call to randomize the row inclusion, and then compares the value with the chance given. Giving it a negative chance value will make the filter exclude all rows, while a value larger than 1.0 will make it include all rows.

Decorating Filters

While the provided filters are already very powerful, sometimes it can be useful to modify, or extend, the behavior of a filter to gain additional control over the returned data. Some of this additional control is not dependent on the filter itself, but can be applied to any of them. This is what the decorating filter group of classes is about.

SkipFilter

This filter wraps a given filter and extends it to exclude an entire row, when the wrapped filter hints for a KeyValue to be skipped. In other words, as soon as a filter indicates that a column in a row is omitted, the entire row is omitted.

Note

The wrapped filter must implement the filterKeyValue() method, or the SkipFilter will not work as expected.[58] This is because the SkipFilter is only checking the results of that method to decide how to handle the current row. See Table 4-5 on page for an overview of compatible filters.

Example 4-12 combines the SkipFilter with a ValueFilter to first select all columns that have no zero-valued column, and subsequently drops all other partial rows that do not have a matching value.

Example 4-12. Using a filter to skip entire rows based on another filter’s results
    Filter filter1 = new ValueFilter(CompareFilter.CompareOp.NOT_EQUAL,
      new BinaryComparator(Bytes.toBytes("val-0")));

    Scan scan = new Scan();
    scan.setFilter(filter1); 1
    ResultScanner scanner1 = table.getScanner(scan);
    for (Result result : scanner1) {
      for (KeyValue kv : result.raw()) {
        System.out.println("KV: " + kv + ", Value: " +
          Bytes.toString(kv.getValue()));
      }
    }
    scanner1.close();

    Filter filter2 = new SkipFilter(filter1);

    scan.setFilter(filter2); 2
    ResultScanner scanner2 = table.getScanner(scan);
    for (Result result : scanner2) {
      for (KeyValue kv : result.raw()) {
        System.out.println("KV: " + kv + ", Value: " +
          Bytes.toString(kv.getValue()));
      }
    }
    scanner2.close();
1

Only add the ValueFilter to the first scan.

2

Add the decorating skip filter for the second scan.

The example code should print roughly the following results when you execute it—note, though, that the values are randomized, so you should get a slightly different result for every invocation:

Adding rows to table...
Results of scan #1:
KV: row-01/colfam1:col-00/0/Put/vlen=5, Value: val-4
KV: row-01/colfam1:col-01/1/Put/vlen=5, Value: val-2
KV: row-01/colfam1:col-02/2/Put/vlen=5, Value: val-4
KV: row-01/colfam1:col-03/3/Put/vlen=5, Value: val-3
KV: row-01/colfam1:col-04/4/Put/vlen=5, Value: val-1
KV: row-02/colfam1:col-00/0/Put/vlen=5, Value: val-3
KV: row-02/colfam1:col-01/1/Put/vlen=5, Value: val-1
KV: row-02/colfam1:col-03/3/Put/vlen=5, Value: val-4
KV: row-02/colfam1:col-04/4/Put/vlen=5, Value: val-1
...
Total KeyValue count for scan #1: 122

Results of scan #2:
KV: row-01/colfam1:col-00/0/Put/vlen=5, Value: val-4
KV: row-01/colfam1:col-01/1/Put/vlen=5, Value: val-2
KV: row-01/colfam1:col-02/2/Put/vlen=5, Value: val-4
KV: row-01/colfam1:col-03/3/Put/vlen=5, Value: val-3
KV: row-01/colfam1:col-04/4/Put/vlen=5, Value: val-1
KV: row-07/colfam1:col-00/0/Put/vlen=5, Value: val-4
KV: row-07/colfam1:col-01/1/Put/vlen=5, Value: val-1
KV: row-07/colfam1:col-02/2/Put/vlen=5, Value: val-1
KV: row-07/colfam1:col-03/3/Put/vlen=5, Value: val-2
KV: row-07/colfam1:col-04/4/Put/vlen=5, Value: val-4
...
Total KeyValue count for scan #2: 50

The first scan returns all columns that are not zero valued. Since the value is assigned at random, there is a high probability that you will get at least one or more columns of each possible row. Some rows will miss a column—these are the omitted zero-valued ones.

The second scan, on the other hand, wraps the first filter and forces all partial rows to be dropped. You can see from the console output how only complete rows are emitted, that is, those with all five columns the example code creates initially. The total KeyValue count for each scan confirms the more restrictive behavior of the SkipFilter variant.

WhileMatchFilter

This second decorating filter type works somewhat similarly to the previous one, but aborts the entire scan once a piece of information is filtered. This works by checking the wrapped filter and seeing if it skips a row by its key, or a column of a row because of a KeyValue check.[59]

Example 4-13 is a slight variation of the previous example, using different filters to show how the decorating class works.

Example 4-13. Using a filter to skip entire rows based on another filter’s results
    Filter filter1 = new RowFilter(CompareFilter.CompareOp.NOT_EQUAL,
      new BinaryComparator(Bytes.toBytes("row-05")));

    Scan scan = new Scan();
    scan.setFilter(filter1);
    ResultScanner scanner1 = table.getScanner(scan);
    for (Result result : scanner1) {
      for (KeyValue kv : result.raw()) {
        System.out.println("KV: " + kv + ", Value: " +
          Bytes.toString(kv.getValue()));
      }
    }
    scanner1.close();

    Filter filter2 = new WhileMatchFilter(filter1);

    scan.setFilter(filter2);
    ResultScanner scanner2 = table.getScanner(scan);
    for (Result result : scanner2) {
      for (KeyValue kv : result.raw()) {
        System.out.println("KV: " + kv + ", Value: " +
          Bytes.toString(kv.getValue()));
      }
    }
    scanner2.close();

Once you run the example code, you should get this output on the console:

Adding rows to table...
Results of scan #1:
KV: row-01/colfam1:col-00/0/Put/vlen=9, Value: val-01.00
KV: row-02/colfam1:col-00/0/Put/vlen=9, Value: val-02.00
KV: row-03/colfam1:col-00/0/Put/vlen=9, Value: val-03.00
KV: row-04/colfam1:col-00/0/Put/vlen=9, Value: val-04.00
KV: row-06/colfam1:col-00/0/Put/vlen=9, Value: val-06.00
KV: row-07/colfam1:col-00/0/Put/vlen=9, Value: val-07.00
KV: row-08/colfam1:col-00/0/Put/vlen=9, Value: val-08.00
KV: row-09/colfam1:col-00/0/Put/vlen=9, Value: val-09.00
KV: row-10/colfam1:col-00/0/Put/vlen=9, Value: val-10.00
Total KeyValue count for scan #1: 9
Results of scan #2:
KV: row-01/colfam1:col-00/0/Put/vlen=9, Value: val-01.00
KV: row-02/colfam1:col-00/0/Put/vlen=9, Value: val-02.00
KV: row-03/colfam1:col-00/0/Put/vlen=9, Value: val-03.00
KV: row-04/colfam1:col-00/0/Put/vlen=9, Value: val-04.00
Total KeyValue count for scan #2: 4

The first scan used just the RowFilter to skip one out of 10 rows; the rest is returned to the client. Adding the WhileMatchFilter for the second scan shows its behavior to stop the entire scan operation, once the wrapped filter omits a row or column. In the example this is row-05, triggering the end of the scan.

Note

Decorating filters implement the same Filter interface, just like any other single-purpose filter. In doing so, they can be used as a drop-in replacement for those filters, while combining their behavior with the wrapped filter instance.

FilterList

So far you have seen how filters—on their own, or decorated—are doing the work of filtering out various dimensions of a table, ranging from rows, to columns, and all the way to versions of values within a column. In practice, though, you may want to have more than one filter being applied to reduce the data returned to your client application. This is what the FilterList is for.

Note

The FilterList class implements the same Filter interface, just like any other single-purpose filter. In doing so, it can be used as a drop-in replacement for those filters, while combining the effects of each included instance.

You can create an instance of FilterList while providing various parameters at instantiation time, using one of these constructors:

FilterList(List<Filter> rowFilters)
FilterList(Operator operator)
FilterList(Operator operator, List<Filter> rowFilters

The rowFilters parameter specifies the list of filters that are assessed together, using an operator to combine their results. Table 4-3 lists the possible choices of operators. The default is MUST_PASS_ALL, and can therefore be omitted from the constructor when you do not need a different one.

Table 4-3. Possible values for the FilterList.Operator enumeration
OperatorDescription
MUST_PASS_ALLA value is only included in the result when all filters agree to do so, i.e., no filter is omitting the value.
MUST_PASS_ONEAs soon as a value was allowed to pass one of the filters, it is included in the overall result.

Adding filters, after the FilterList instance has been created, can be done with:

void addFilter(Filter filter)

You can only specify one operator per FilterList, but you are free to add other FilterList instances to an existing FilterList, thus creating a hierarchy of filters, combined with the operators you need.

You can further control the execution order of the included filters by carefully choosing the List implementation you require. For example, using ArrayList would guarantee that the filters are applied in the order they were added to the list. This is shown in Example 4-14.

Example 4-14. Using a filter list to combine single-purpose filters
    List<Filter> filters = new ArrayList<Filter>();

    Filter filter1 = new RowFilter(CompareFilter.CompareOp.GREATER_OR_EQUAL,
      new BinaryComparator(Bytes.toBytes("row-03")));
    filters.add(filter1);

    Filter filter2 = new RowFilter(CompareFilter.CompareOp.LESS_OR_EQUAL,
      new BinaryComparator(Bytes.toBytes("row-06")));
    filters.add(filter2);

    Filter filter3 = new QualifierFilter(CompareFilter.CompareOp.EQUAL,
      new RegexStringComparator("col-0[03]"));
    filters.add(filter3);

    FilterList filterList1 = new FilterList(filters);

    Scan scan = new Scan();
    scan.setFilter(filterList1);
    ResultScanner scanner1 = table.getScanner(scan);
    for (Result result : scanner1) {
      for (KeyValue kv : result.raw()) {
        System.out.println("KV: " + kv + ", Value: " +
          Bytes.toString(kv.getValue()));
      }
    }
    scanner1.close();

    FilterList filterList2 = new FilterList(
      FilterList.Operator.MUST_PASS_ONE, filters);

    scan.setFilter(filterList2);
    ResultScanner scanner2 = table.getScanner(scan);
    for (Result result : scanner2) {
      for (KeyValue kv : result.raw()) {
        System.out.println("KV: " + kv + ", Value: " +
          Bytes.toString(kv.getValue()));
      }
    }
    scanner2.close();

The first scan filters out a lot of details, as at least one of the filters in the list excludes some information. Only where they all let the information pass is it returned to the client.

In contrast, the second scan includes all rows and columns in the result. This is caused by setting the FilterList operator to MUST_PASS_ONE, which includes all the information as soon as a single filter lets it pass. And in this scenario, all values are passed by at least one of them, including everything.

Custom Filters

Eventually, you may exhaust the list of supplied filter types and need to implement your own. This can be done by either implementing the Filter interface, or extending the provided FilterBase class. The latter provides default implementations for all methods that are members of the interface.

The Filter interface has the following structure:

public interface Filter extends Writable {
  public enum ReturnCode { 
    INCLUDE, SKIP, NEXT_COL, NEXT_ROW, SEEK_NEXT_USING_HINT 
  }
  public void reset()
  public boolean filterRowKey(byte[] buffer, int offset, int length)
  public boolean filterAllRemaining()
  public ReturnCode filterKeyValue(KeyValue v)
  public void filterRow(List<KeyValue> kvs)
  public boolean hasFilterRow()
  public boolean filterRow()
  public KeyValue getNextKeyHint(KeyValue currentKV)

The interface provides a public enumeration type, named ReturnCode, that is used by the filterKeyValue() method to indicate what the execution framework should do next. Instead of blindly iterating over all values, the filter has the ability to skip a value, the remainder of a column, or the rest of the entire row. This helps tremendously in terms of improving performance while retrieving data.

Note

The servers may still need to scan the entire row to find matching data, but the optimizations provided by the filterKeyValue() return code can reduce the work required to do so.

Table 4-4 lists the possible values and their meaning.

Table 4-4. Possible values for the Filter.ReturnCode enumeration
Return codeDescription
INCLUDEInclude the given KeyValue instance in the result.
SKIPSkip the current KeyValue and proceed to the next.
NEXT_COLSkip the remainder of the current column, proceeding to the next. This is used by the TimestampsFilter, for example.
NEXT_ROWSimilar to the previous, but skips the remainder of the current row, moving to the next. The RowFilter makes use of this return code, for example.
SEEK_NEXT_USING_HINTSome filters want to skip a variable number of values and use this return code to indicate that the framework should use the getNextKeyHint() method to determine where to skip to. The ColumnPrefixFilter, for example, uses this feature.

Most of the provided methods are called at various stages in the process of retrieving a row for a client—for example, during a scan operation. Putting them in call order, you can expect them to be executed in the following sequence:

filterRowKey(byte[] buffer, int offset, int length)

The next check is against the row key, using this method of the Filter implementation. You can use it to skip an entire row from being further processed. The RowFilter uses it to suppress entire rows being returned to the client.

filterKeyValue(KeyValue v)

When a row is not filtered (yet), the framework proceeds to invoke this method for every KeyValue that is part of the current row. The ReturnCode indicates what should happen with the current value.

filterRow(List<KeyValue> kvs)

Once all row and value checks have been performed, this method of the filter is called, giving you access to the list of KeyValue instances that have been included by the previous filter methods. The DependentColumnFilter uses it to drop those columns that do not match the reference column.

filterRow()

After everything else was checked and invoked, the final inspection is performed using filterRow(). A filter that uses this functionality is the PageFilter, checking if the number of rows to be returned for one iteration in the pagination process is reached, returning true afterward. The default false would include the current row in the result.

reset()

This resets the filter for every new row the scan is iterating over. It is called by the server, after a row is read, implicitly. This applies to get and scan operations, although obviously it has no effect for the former, as gets only read a single row.

filterAllRemaining()

This method can be used to stop the scan, by returning true. It is used by filters to provide the early out optimizations mentioned earlier. If a filter returns false, the scan is continued, and the aforementioned methods are called.

Obviously, this also implies that for get operations this call is not useful.

Figure 4-2 shows the logical flow of the filter methods for a single row. There is a more fine-grained process to apply the filters on a column level, which is not relevant in this context.

The logical flow through the filter methods for a single row
Figure 4-2. The logical flow through the filter methods for a single row

Example 4-15 implements a custom filter, using the methods provided by FilterBase, overriding only those methods that need to be changed.

The filter first assumes all rows should be filtered, that is, removed from the result. Only when there is a value in any column that matches the given reference does it include the row, so that it is sent back to the client.

Example 4-15. Implementing a filter that lets certain rows pass
public class CustomFilter extends FilterBase{

  private byte[] value = null;
  private boolean filterRow = true;

  public CustomFilter() {
    super();
  }

  public CustomFilter(byte[] value) {
    this.value = value; 1
  }

  @Override
  public void reset() {
    this.filterRow = true; 2
  }

  @Override
  public ReturnCode filterKeyValue(KeyValue kv) {
    if (Bytes.compareTo(value, kv.getValue()) == 0) {
      filterRow = false; 3
    }
    return ReturnCode.INCLUDE; 4
  }

  @Override
  public boolean filterRow() {
    return filterRow; 5
  }

  @Override
  public void write(DataOutput dataOutput) throws IOException {
    Bytes.writeByteArray(dataOutput, this.value); 6
  }

  @Override
  public void readFields(DataInput dataInput) throws IOException {
    this.value = Bytes.readByteArray(dataInput); 7
  }
}
1

Set the value to compare against.

2

Reset the filter flag for each new row being tested.

3

When there is a matching value, let the row pass.

4

Always include this, since the final decision is made later.

5

Here the actual decision is taking place, based on the flag status.

6

Write the given value out so that it can be sent to the servers.

7

Used by the servers to establish the filter instance with the correct values.

Example 4-16 uses the new custom filter to find rows with specific values in it, also using a FilterList.

Example 4-16. Using a custom filter
    List<Filter> filters = new ArrayList<Filter>();

    Filter filter1 = new CustomFilter(Bytes.toBytes("val-05.05"));
    filters.add(filter1);

    Filter filter2 = new CustomFilter(Bytes.toBytes("val-02.07"));
    filters.add(filter2);

    Filter filter3 = new CustomFilter(Bytes.toBytes("val-09.00"));
    filters.add(filter3);

    FilterList filterList = new FilterList(
      FilterList.Operator.MUST_PASS_ONE, filters);

    Scan scan = new Scan();
    scan.setFilter(filterList);
    ResultScanner scanner = table.getScanner(scan);
    for (Result result : scanner) {
      for (KeyValue kv : result.raw()) {
        System.out.println("KV: " + kv + ", Value: " +
          Bytes.toString(kv.getValue()));
      }
    }
    scanner.close();

Just as with the earlier examples, here is what should appear as output on the console when executing this example:

Adding rows to table...
Results of scan:
KV: row-02/colfam1:col-00/1301507323088/Put/vlen=9, Value: val-02.00
KV: row-02/colfam1:col-01/1301507323090/Put/vlen=9, Value: val-02.01
KV: row-02/colfam1:col-02/1301507323092/Put/vlen=9, Value: val-02.02
KV: row-02/colfam1:col-03/1301507323093/Put/vlen=9, Value: val-02.03
KV: row-02/colfam1:col-04/1301507323096/Put/vlen=9, Value: val-02.04
KV: row-02/colfam1:col-05/1301507323104/Put/vlen=9, Value: val-02.05
KV: row-02/colfam1:col-06/1301507323108/Put/vlen=9, Value: val-02.06
KV: row-02/colfam1:col-07/1301507323110/Put/vlen=9, Value: val-02.07
KV: row-02/colfam1:col-08/1301507323112/Put/vlen=9, Value: val-02.08
KV: row-02/colfam1:col-09/1301507323113/Put/vlen=9, Value: val-02.09
KV: row-05/colfam1:col-00/1301507323148/Put/vlen=9, Value: val-05.00
KV: row-05/colfam1:col-01/1301507323150/Put/vlen=9, Value: val-05.01
KV: row-05/colfam1:col-02/1301507323152/Put/vlen=9, Value: val-05.02
KV: row-05/colfam1:col-03/1301507323153/Put/vlen=9, Value: val-05.03
KV: row-05/colfam1:col-04/1301507323154/Put/vlen=9, Value: val-05.04
KV: row-05/colfam1:col-05/1301507323155/Put/vlen=9, Value: val-05.05
KV: row-05/colfam1:col-06/1301507323157/Put/vlen=9, Value: val-05.06
KV: row-05/colfam1:col-07/1301507323158/Put/vlen=9, Value: val-05.07
KV: row-05/colfam1:col-08/1301507323158/Put/vlen=9, Value: val-05.08
KV: row-05/colfam1:col-09/1301507323159/Put/vlen=9, Value: val-05.09
KV: row-09/colfam1:col-00/1301507323192/Put/vlen=9, Value: val-09.00
KV: row-09/colfam1:col-01/1301507323194/Put/vlen=9, Value: val-09.01
KV: row-09/colfam1:col-02/1301507323196/Put/vlen=9, Value: val-09.02
KV: row-09/colfam1:col-03/1301507323199/Put/vlen=9, Value: val-09.03
KV: row-09/colfam1:col-04/1301507323201/Put/vlen=9, Value: val-09.04
KV: row-09/colfam1:col-05/1301507323202/Put/vlen=9, Value: val-09.05
KV: row-09/colfam1:col-06/1301507323203/Put/vlen=9, Value: val-09.06
KV: row-09/colfam1:col-07/1301507323204/Put/vlen=9, Value: val-09.07
KV: row-09/colfam1:col-08/1301507323205/Put/vlen=9, Value: val-09.08
KV: row-09/colfam1:col-09/1301507323206/Put/vlen=9, Value: val-09.09

As expected, the entire row that has a column with the value matching one of the references is included in the result.

Filters Summary

Table 4-5 summarizes some of the features and compatibilities related to the provided filter implementations. The ✓ symbol means the feature is available, while ✗ indicates it is missing.

Table 4-5. Summary of filter features and compatibilities between them
FilterBatch[a]Skip[b]While-Match[c]List[d]Early Out[e]Gets[f]Scans[g]
RowFilter
FamilyFilter
QualifierFilter
ValueFilter
DependentColumnFilter
SingleColumnValueFilter
SingleColumnValueExcludeFilter
PrefixFilter
PageFilter
KeyOnlyFilter
FirstKeyOnlyFilter
InclusiveStopFilter
TimestampsFilter
ColumnCountGetFilter
ColumnPaginationFilter
ColumnPrefixFilter
RandomRowFilter
SkipFilter✓/✗[h]✓/✗[h]
WhileMatchFilter✓/✗[h]✓/✗[h]
FilterList✓/✗[h]✓/✗[h]✓/✗[h]✓/✗[h]

[a] Filter supports Scan.setBatch(), i.e., the scanner batch mode.

[b] Filter can be used with the decorating SkipFilter class.

[c] Filter can be used with the decorating WhileMatchFilter class.

[d] Filter can be used with the combining FilterList class.

[e] Filter has optimizations to stop a scan early, once there are no more matching rows ahead.

[f] Filter can be usefully applied to Get instances.

[g] Filter can be usefully applied to Scan instances.

[h] Depends on the included filters.

Counters

In addition to the functionality we already discussed, HBase offers another advanced feature: counters. Many applications that collect statistics—such as clicks or views in online advertising—were used to collect the data in logfiles that would subsequently be analyzed. Using counters offers the potential of switching to live accounting, foregoing the delayed batch processing step completely.

Introduction to Counters

In addition to the check-and-modify operations you saw earlier, HBase also has a mechanism to treat columns as counters. Otherwise, you would have to lock a row, read the value, increment it, write it back, and eventually unlock the row for other writers to be able to access it subsequently. This can cause a lot of contention, and in the event of a client process, crashing it could leave the row locked until the lease recovery kicks in—which could be disastrous in a heavily loaded system.

The client API provides specialized methods to do the read-and-modify operation atomically in a single client-side call. Earlier versions of HBase only had calls that would involve an RPC for every counter update, while newer versions started to add the same mechanisms used by the CRUD operations—as explained in CRUD Operations—which can bundle multiple counter updates in a single RPC.

Note

While you can update multiple counters, you are still limited to single rows. Updating counters in multiple rows would require separate API—and therefore RPC—calls. The batch() calls currently do not support the Increment instance, though this should change in the near future.

Before we discuss each type separately, you need to have a few more details regarding how counters work on the column level. Here is an example using the shell that creates a table, increments a counter twice, and then queries the current value:

hbase(main):001:0> create 'counters', 'daily', 'weekly', 'monthly'
0 row(s) in 1.1930 seconds

hbase(main):002:0> incr 'counters', '20110101', 'daily:hits', 1
COUNTER VALUE = 1

hbase(main):003:0> incr 'counters', '20110101', 'daily:hits', 1
COUNTER VALUE = 2

hbase(main):04:0> get_counter 'counters', '20110101', 'daily:hits'
COUNTER VALUE = 2

Every call to incr returns the new value of the counter. The final check using get_counter shows the current value as expected.

Note

The format of the shell’s incr command is as follows:

incr '<table>', '<row>', '<column>', [<increment-value>]

You can also access the counter with a get call, giving you this result:

hbase(main):005:0> get 'counters', '20110101'
COLUMN       CELL
 daily:hits  timestamp=1301570823471, value=\x00\x00\x00\x00\x00\x00\x00\x02
1 row(s) in 0.0600 seconds

This is obviously not very readable, but it shows that a counter is simply a column, like any other. You can also specify a larger increment value:

hbase(main):006:0> incr 'counters',
'20110101', 'daily:hits', 20
COUNTER VALUE = 22

hbase(main):007:0> get 'counters', '20110101'                   
COLUMN       CELL
 daily:hits  timestamp=1301574412848, value=\x00\x00\x00\x00\x00\x00\x00\x16
1 row(s) in 0.0400 seconds

hbase(main):008:0> get_counter 'counters',
'20110101', 'daily:hits'
COUNTER VALUE = 22

Accessing the counter directly gives you the byte array representation, with the shell printing the separate bytes as hexadecimal values. Using the get_counter once again shows the current value in a more human-readable format, and confirms that variable increments are possible and work as expected.

Finally, you can use the increment value of the incr call to not only increase the counter, but also retrieve the current value, and decrease it as well. In fact, you can omit it completely and the default of 1 is assumed:

hbase(main):004:0> incr 'counters', '20110101',
'daily:hits'  
COUNTER VALUE = 3

hbase(main):005:0> incr 'counters', '20110101', 'daily:hits'
COUNTER VALUE = 4

hbase(main):006:0> incr 'counters', '20110101', 'daily:hits', 0
COUNTER VALUE = 4

hbase(main):007:0> incr 'counters', '20110101', 'daily:hits', -1
COUNTER VALUE = 3

hbase(main):008:0> incr 'counters', '20110101', 'daily:hits', -1
COUNTER VALUE = 2

Using the increment value—the last parameter of the incr command—you can achieve the behavior shown in Table 4-6.

Table 4-6. The increment value and its effect on counter increments
ValueEffect
greater than zeroIncrease the counter by the given value.
zeroRetrieve the current value of the counter. Same as using the get_counter shell command.
less than zeroDecrease the counter by the given value.

Obviously, using the shell’s incr command only allows you to increase a single counter. You can do the same using the client API, described next.

Single Counters

The first type of increment call is for single counters only: you need to specify the exact column you want to use. The methods, provided by HTable, are as such:

long incrementColumnValue(byte[] row, byte[] family, byte[] qualifier,
  long amount) throws IOException
long incrementColumnValue(byte[] row, byte[] family, byte[] qualifier, 
  long amount, boolean writeToWAL) throws IOException

Given the coordinates of a column, and the increment account, these methods only differ by the optional writeToWAL parameter—which works the same way as the Put.setWriteToWAL() method.

Omitting writeToWAL uses the default value of true, meaning the write-ahead log is active.

Apart from that, you can use them easily, as shown in Example 4-17.

Example 4-17. Using the single counter increment methods
    HTable table = new HTable(conf, "counters");

    long cnt1 = table.incrementColumnValue(Bytes.toBytes("20110101"), 1
      Bytes.toBytes("daily"), Bytes.toBytes("hits"), 1);
    long cnt2 = table.incrementColumnValue(Bytes.toBytes("20110101"), 2
      Bytes.toBytes("daily"), Bytes.toBytes("hits"), 1);

    long current = table.incrementColumnValue(Bytes.toBytes("20110101"), 3
      Bytes.toBytes("daily"), Bytes.toBytes("hits"), 0);

    long cnt3 = table.incrementColumnValue(Bytes.toBytes("20110101"), 4
      Bytes.toBytes("daily"), Bytes.toBytes("hits"), -1);
1

Increase the counter by one.

2

Increase the counter by one a second time.

3

Get the current value of the counter without increasing it.

4

Decrease the counter by one.

The output on the console is:

cnt1: 1, cnt2: 2, current: 2, cnt3: 1

Just as with the shell commands used earlier, the API calls have the same effect: they increment the counter when using a positive increment value, retrieve the current value when using zero for the increment, and eventually decrease the counter by using a negative increment value.

Multiple Counters

Another way to increment counters is provided by the increment() call of HTable. It works similarly to the CRUD-type operations discussed earlier, using the following method to do the increment:

Result increment(Increment increment) throws IOException

You must create an instance of the Increment class and fill it with the appropriate details—for example, the counter coordinates. The constructors provided by this class are:

Increment() {}
Increment(byte[] row)
Increment(byte[] row, RowLock rowLock)

You must provide a row key when instantiating an Increment, which sets the row containing all the counters that the subsequent call to increment() should modify.

The optional parameter rowLock specifies a custom row lock instance, allowing you to run the entire operation under your exclusive control—for example, when you want to modify the same row a few times while protecting it against updates from other writers.

Warning

While you can guard the increment operation against other writers, you currently cannot do this for readers. In fact, there is no atomicity guarantee made for readers.

Since readers are not taking out locks on rows that are incremented, it may happen that they have access to some counters—within one row—that are already updated, and some that are not! This applies to scan and get operations equally.

Once you have decided which row to update and created the Increment instance, you need to add the actual counters—meaning columns—you want to increment, using this method:

Increment addColumn(byte[] family, byte[] qualifier, long amount)

The difference here, as compared to the Put methods, is that there is no option to specify a version—or timestamp—when dealing with increments: versions are handled implicitly. Furthermore, there is no addFamily() equivalent, because counters are specific columns, and they need to be specified as such. It therefore makes no sense to add a column family alone.

A special feature of the Increment class is the ability to take an optional time range:

Increment setTimeRange(long minStamp, long maxStamp) 
  throws IOException

Setting a time range for a set of counter increments seems odd in light of the fact that versions are handled implicitly. The time range is actually passed on to the servers to restrict the internal get operation from retrieving the current counter values. You can use it to expire counters, for example, to partition them by time: when you set the time range to be restrictive enough, you can mask out older counters from the internal get, making them look like they are nonexistent. An increment would assume they are unset and start at 1 again.

The Increment class provides additional methods, which are summarized in Table 4-7.

Table 4-7. Quick overview of additional methods provided by the Increment class
MethodDescription
getRow()Returns the row key as specified when creating the Increment instance.
getRowLock()Returns the row RowLock instance for the current Increment instance.
getLockId()Returns the optional lock ID handed into the constructor using the rowLock parameter. Will be -1L if not set.
setWriteToWAL()Allows you to disable the default functionality of writing the data to the server-side write-ahead log.
getWriteToWAL()Indicates if the data will be written to the write-ahead log.
getTimeRange()Retrieves the associated time range of the Increment instance—as assigned using the setTimeStamp() method.
numFamilies()Convenience method to retrieve the size of the family map, containing all column families of the added columns.
numColumns()Returns the number of columns that will be incremented.
hasFamilies()Another helper to check if a family—or column—has been added to the current instance of the Increment class.
familySet()/getFamilyMap()Give you access to the specific columns, as added by the addColumn() call. The family map is a map where the key is the family name and the value a list of added column qualifiers for this particular family. The familySet() returns the Set of all stored families, i.e., a set containing only the family names.

Similar to the shell example shown earlier, Example 4-18 uses various increment values to increment, retrieve, and decrement the given counters.

Example 4-18. Incrementing multiple counters in one row
    Increment increment1 = new Increment(Bytes.toBytes("20110101"));

    increment1.addColumn(Bytes.toBytes("daily"), Bytes.toBytes("clicks"), 1);
    increment1.addColumn(Bytes.toBytes("daily"), Bytes.toBytes("hits"), 1); 1
    increment1.addColumn(Bytes.toBytes("weekly"), Bytes.toBytes("clicks"), 10);
    increment1.addColumn(Bytes.toBytes("weekly"), Bytes.toBytes("hits"), 10);

    Result result1 = table.increment(increment1); 2

    for (KeyValue kv : result1.raw()) {
      System.out.println("KV: " + kv +
        " Value: " + Bytes.toLong(kv.getValue())); 3
    }

    Increment increment2 = new Increment(Bytes.toBytes("20110101"));

    increment2.addColumn(Bytes.toBytes("daily"), Bytes.toBytes("clicks"), 5);
    increment2.addColumn(Bytes.toBytes("daily"), Bytes.toBytes("hits"), 1); 4
    increment2.addColumn(Bytes.toBytes("weekly"), Bytes.toBytes("clicks"), 0);
    increment2.addColumn(Bytes.toBytes("weekly"), Bytes.toBytes("hits"), -5);

    Result result2 = table.increment(increment2);

    for (KeyValue kv : result2.raw()) {
      System.out.println("KV: " + kv +
        " Value: " + Bytes.toLong(kv.getValue()));
    }
1

Increment the counters with various values.

2

Call the actual increment method with the earlier counter updates and receive the results.

3

Print the KeyValue and returned the counter value.

4

Use positive, negative, and zero increment values to achieve the desired counter changes.

When you run the example, the following is output on the console:

KV: 20110101/daily:clicks/1301948275827/Put/vlen=8 Value: 1
KV: 20110101/daily:hits/1301948275827/Put/vlen=8 Value: 1
KV: 20110101/weekly:clicks/1301948275827/Put/vlen=8 Value: 10
KV: 20110101/weekly:hits/1301948275827/Put/vlen=8 Value: 10

KV: 20110101/daily:clicks/1301948275829/Put/vlen=8 Value: 6
KV: 20110101/daily:hits/1301948275829/Put/vlen=8 Value: 2
KV: 20110101/weekly:clicks/1301948275829/Put/vlen=8 Value: 10
KV: 20110101/weekly:hits/1301948275829/Put/vlen=8 Value: 5

When you compare the two sets of increment results, you will notice that this works as expected.

Coprocessors

Earlier we discussed how you can use filters to reduce the amount of data being sent over the network from the servers to the client. With the coprocessor feature in HBase, you can even move part of the computation to where the data lives.

Introduction to Coprocessors

Using the client API, combined with specific selector mechanisms, such as filters, or column family scoping, it is possible to limit what data is transferred to the client. It would be good, though, to take this further and, for example, perform certain operations directly on the server side while only returning a small result set. Think of this as a small MapReduce framework that distributes work across the entire cluster.

A coprocessor enables you to run arbitrary code directly on each region server. More precisely, it executes the code on a per-region basis, giving you trigger-like functionality—similar to stored procedures in the RDBMS world. From the client side, you do not have to take specific actions, as the framework handles the distributed nature transparently.

There is a set of implicit events that you can use to hook into, performing auxiliary tasks. If this is not enough, you can also extend the RPC protocol to introduce your own set of calls, which are invoked from your client and executed on the server on your behalf.

Just as with the custom filters (see Custom Filters), you need to create special Java classes that implement specific interfaces. Once they are compiled, you make these classes available to the servers in the form of a JAR file. The region server process can instantiate these classes and execute them in the correct environment. In contrast to the filters, though, coprocessors can be loaded dynamically as well. This allows you to extend the functionality of a running HBase cluster.

Use cases for coprocessors are, for instance, using hooks into row mutation operations to maintain secondary indexes, or implementing some kind of referential integrity. Filters could be enhanced to become stateful, and therefore make decisions across row boundaries. Aggregate functions, such as sum(), or avg(), known from RDBMSes and SQL, could be moved to the servers to scan the data locally and only returning the single number result across the network.

Note

Another good use case for coprocessors is access control. The authentication, authorization, and auditing features added in HBase version 0.92 are based on coprocessors. They are loaded at system startup and use the provided trigger-like hooks to check if a user is authenticated, and authorized to access specific values stored in tables.

The framework already provides classes, based on the coprocessor framework, which you can use to extend from when implementing your own functionality. They fall into two main groups: observer and endpoint. Here is a brief overview of their purpose:

Observer

This type of coprocessor is comparable to triggers: callback functions (also referred to here as hooks) are executed when certain events occur. This includes user-generated, but also server-internal, automated events.

The interfaces provided by the coprocessor framework are:

RegionObserver

You can handle data manipulation events with this kind of observer. They are closely bound to the regions of a table.

MasterObserver

This can be used to react to administrative or DDL-type operations. These are cluster-wide events.

WALObserver

This provides hooks into the write-ahead log processing.

Observers provide you with well-defined event callbacks, for every operation a cluster server may handle.

Endpoint

Next to event handling there is also a need to add custom operations to a cluster. User code can be deployed to the servers hosting the data to, for example, perform server-local computations.

Endpoints are dynamic extensions to the RPC protocol, adding callable remote procedures. Think of them as stored procedures, as known from RDBMSes. They may be combined with observer implementations to directly interact with the server-side state.

All of these interfaces are based on the Coprocessor interface to gain common features, but then implement their own specific functionality.

Finally, coprocessors can be chained, very similar to what the Java Servlet API does with request filters. The following section discusses the various types available in the coprocessor framework.

The Coprocessor Class

All coprocessor classes must be based on this interface. It defines the basic contract of a coprocessor and facilitates the management by the framework itself. The interface provides two enumerations, which are used throughout the framework: Priority and State. Table 4-8 explains the priority values.

Table 4-8. Priorities as defined by the Coprocessor.Priority enumeration
ValueDescription
SYSTEMHighest priority, defines coprocessors that are executed first
USERDefines all other coprocessors, which are executed subsequently

The priority of a coprocessor defines in what order the coprocessors are executed: system-level instances are called before the user-level coprocessors are executed.

Note

Within each priority level, there is also the notion of a sequence number, which keeps track of the order in which the coprocessors were loaded. The number starts with zero, and is increased by one thereafter.

The number itself is not very helpful, but you can rely on the framework to order the coprocessors—in each priority group—ascending by sequence number. This defines their execution order.

Coprocessors are managed by the framework in their own life cycle. To that effect, the Coprocessor interface offers two calls:

void start(CoprocessorEnvironment env) throws IOException;
void stop(CoprocessorEnvironment env) throws IOException;

These two methods are called when the coprocessor class is started, and eventually when it is decommissioned. The provided CoprocessorEnvironment instance is used to retain the state across the lifespan of the coprocessor instance. A coprocessor instance is always contained in a provided environment. Table 4-9 lists the methods available from it.

Table 4-9. Methods provided by the CoprocessorEnvironment class
MethodDescription
String getHBaseVersion()Returns the HBase version identification string.
int getVersion()Returns the version of the Coprocessor interface.
Coprocessor getInstance()Returns the loaded coprocessor instance.
Coprocessor.Priority getPriority() Provides the priority level of the coprocessor.
int getLoadSequence()The sequence number of the coprocessor. This is set when the instance is loaded and reflects the execution order.
HTableInterface getTable(byte[] tableName)Returns an HTable instance for the given table name. This allows the coprocessor to access the actual table data.

Coprocessors should only deal with what they have been given by their environment. There is a good reason for that, mainly to guarantee that there is no back door for malicious code to harm your data.

Note

Coprocessor implementations should be using the getTable() method to access tables. Note that this class adds certain safety measures to the default HTable class. For example, coprocessors are not allowed to lock a row.

While there is currently nothing that can stop you from creating your own HTable instances inside your coprocessor code, this is likely to be checked against in the future and possibly denied.

The start() and stop() methods of the Coprocessor interface are invoked implicitly by the framework as the instance is going through its life cycle. Each step in the process has a well-known state. Table 4-10 lists the life-cycle state values as provided by the coprocessor interface.

Table 4-10. The states as defined by the Coprocessor.State enumeration
ValueDescription
UNINSTALLEDThe coprocessor is in its initial state. It has no environment yet, nor is it initialized.
INSTALLEDThe instance is installed into its environment.
STARTINGThis state indicates that the coprocessor is about to be started, i.e., its start() method is about to be invoked.
ACTIVEOnce the start() call returns, the state is set to active.
STOPPINGThe state set just before the stop() method is called.
STOPPEDOnce stop() returns control to the framework, the state of the coprocessor is set to stopped.

The final piece of the puzzle is the CoprocessorHost class that maintains all the coprocessor instances and their dedicated environments. There are specific subclasses, depending on where the host is used, in other words, on the master, region server, and so on.

The trinity of Coprocessor, CoprocessorEnvironment, and CoprocessorHost forms the basis for the classes that implement the advanced functionality of HBase, depending on where they are used. They provide the life-cycle support for the coprocessors, manage their state, and offer the environment for them to execute as expected. In addition, these classes provide an abstraction layer that developers can use to easily build their own custom implementation.

Figure 4-3 shows how the calls from a client are flowing through the list of coprocessors. Note how the order is the same on the incoming and outgoing sides: first are the system-level ones, and then the user ones in the order they were loaded.

Coprocessors executed sequentially, in their environment, and per region
Figure 4-3. Coprocessors executed sequentially, in their environment, and per region

Coprocessor Loading

Coprocessors are loaded in a variety of ways. Before we discuss the actual coprocessor types and how to implement your own, we will talk about how to deploy them so that you can try the provided examples.

You can either configure coprocessors to be loaded in a static way, or load them dynamically while the cluster is running. The static method uses the configuration files and table schemas—and is discussed next. Unfortunately, there is not yet an exposed API to load them dynamically.[60]

Loading from the configuration

You can configure globally which coprocessors are loaded when HBase starts. This is done by adding one, or more, of the following to the hbase-site.xml configuration file:

<property>
  <name>hbase.coprocessor.region.classes</name>
  <value>coprocessor.RegionObserverExample, coprocessor.AnotherCoprocessor</value>
</property>
<property>
  <name>hbase.coprocessor.master.classes</name>
  <value>coprocessor.MasterObserverExample</value>
</property>
<property>
  <name>hbase.coprocessor.wal.classes</name>
  <value>coprocessor.WALObserverExample, bar.foo.MyWALObserver</value>
</property>

Note

Replace the example class names with your own ones!

The order of the classes in each configuration property is important, as it defines the execution order. All of these coprocessors are loaded with the system priority. You should configure all globally active classes here so that they are executed first and have a chance to take authoritative actions. Security coprocessors are loaded this way, for example.

Note

The configuration file is the first to be examined as HBase starts. Although you can define additional system-level coprocessors in other places, the ones here are executed first.

Only one of the three possible configuration keys is read by the matching CoprocessorHost implementation. For example, the coprocessors defined in hbase.coprocessor.master.classes are loaded by the MasterCoprocessorHost class.

Table 4-11 shows where each configuration property is used.

Table 4-11. Possible configuration properties and where they are used
PropertyCoprocessor hostServer type
hbase.coprocessor.master.classesMasterCoprocessorHostMaster server
hbase.coprocessor.region.classesRegionCoprocessorHostRegion server
hbase.coprocessor.wal.classesWALCoprocessorHostRegion server

The coprocessors defined with hbase.coprocessor.region.classes are loaded as defaults when a region is opened for a table. Note that you cannot specify for which table, or region, they are loaded: the default coprocessors are loaded for every table and region. You need to keep this in mind when designing your own coprocessors.

Loading from the table descriptor

The other option to define what coprocessors to load is the table descriptor. As this is per table, the coprocessors defined here are only loaded for regions of that table—and only by the region servers. In other words, you can only use this approach for region-related coprocessors, not for master or WAL-related ones.

Since they are loaded in the context of a table, they are more targeted compared to the configuration loaded ones, which apply to all tables.

You need to add their definition to the table descriptor using the HTableDescriptor.setValue() method. The key must start with COPROCESSOR, and the value has to conform to the following format:

<path-to-jar>|<classname>|<priority>

Here is an example that defines two coprocessors, one with system-level priority, the other with user-level priority:

'COPROCESSOR$1' => \
  'hdfs://localhost:8020/users/leon/test.jar|coprocessor.Test|SYSTEM'
'COPROCESSOR$2' => \
  '/Users/laura/test2.jar|coprocessor.AnotherTest|USER'

The path-to-jar can either be a fully specified HDFS location, or any other path supported by the Hadoop FileSystem class. The second coprocessor definition, for example, uses a local path instead.

The classname defines the actual implementation class. While the JAR may contain many coprocessor classes, only one can be specified per table attribute. Use the standard Java package name conventions to specify the class.

The priority must be either SYSTEM or USER. This is case-sensitive and must be specified exactly this way.

Warning

Avoid using extra whitespace characters in the coprocessor definition. The parsing is quite strict, and adding leading, trailing, or spacing characters will render the entire entry invalid.

Using the $<number> postfix for the key enforces the order in which the definitions, and therefore the coprocessors, are loaded. Although only the prefix of COPROCESSOR is checked, using the numbered postfix is the advised way to define them. Example 4-19 shows how this can be done using the administrative API for HBase.

Example 4-19. Region observer checking for special get requests
public class LoadWithTableDescriptorExample {

  public static void main(String[] args) throws IOException {
    Configuration conf = HBaseConfiguration.create();

    FileSystem fs = FileSystem.get(conf);
    Path path = new Path(fs.getUri() + Path.SEPARATOR + "test.jar"); 1

    HTableDescriptor htd = new HTableDescriptor("testtable"); 2
    htd.addFamily(new HColumnDescriptor("colfam1"));
    htd.setValue("COPROCESSOR$1", path.toString() +
      "|" + RegionObserverExample.class.getCanonicalName() + 3
      "|" + Coprocessor.Priority.USER);

    HBaseAdmin admin = new HBaseAdmin(conf); 4
    admin.createTable(htd);

    System.out.println(admin.getTableDescriptor(Bytes.toBytes("testtable"))); 5
  }
}
1

Get the location of the JAR file containing the coprocessor implementation.

2

Define a table descriptor.

3

Add the coprocessor definition to the descriptor.

4

Instantiate an administrative API to the cluster and add the table.

5

Verify if the definition has been applied as expected.

The final check should show you the following result when running this example against a local, standalone HBase cluster:

{NAME => 'testtable', COPROCESSOR$1 => \
  'file:/test.jar|coprocessor.RegionObserverExample|USER', FAMILIES => \
  [{NAME => 'colfam1', BLOOMFILTER => 'NONE', REPLICATION_SCOPE => '0', \
  COMPRESSION => 'NONE', VERSIONS => '3', TTL => '2147483647', BLOCKSIZE \
  => '65536', IN_MEMORY => 'false', BLOCKCACHE => 'true'}]}

The coprocessor definition has been successfully applied to the table schema. Once the table is enabled and the regions are opened, the framework will first load the configuration coprocessors and then the ones defined in the table descriptor.

The RegionObserver Class

The first subclass of Coprocessor we will look into is the one used at the region level: the RegionObserver class. You can learn from its name that it belongs to the group of observer coprocessors: they have hooks that trigger when a specific region-level operation occurs.

These operations can be divided into two groups as well: region life-cycle changes and client API calls. We will look into both in that order.

Handling region life-cycle events

While The Region Life Cycle explains the region life-cycle, Figure 4-4 shows a simplified form.

The coprocessor reacting to life-cycle state changes of a region
Figure 4-4. The coprocessor reacting to life-cycle state changes of a region

The observers have the opportunity to hook into the pending open, open, and pending close state changes. For each of them there is a set of hooks that are called implicitly by the framework.

Note

For the sake of brevity, all parameters and exceptions are omitted when referring to the observer calls. Read the online documentation for the full specification.[61] Note, though, that all calls have a special first parameter:

ObserverContext<RegionCoprocessorEnvironment> c

This special CoprocessorEnvironment wrapper gives you additional control over what should happen after the hook execution. See The RegionCoprocessorEnvironment class and The ObserverContext class for the details.

State: pending open

A region is in this state when it is about to be opened. Observing coprocessors can either piggyback or fail this process. To do so, the following calls are available:

void preOpen(...) / void postOpen(...)

These methods are called just before the region is opened, and just after it was opened. Your coprocessor implementation can use them, for instance, to indicate to the framework—in the preOpen() call—that it should abort the opening process. Or hook into the postOpen() call to trigger a cache warm up, and so on.

After the pending open, but just before the open state, the region server may have to apply records from the write-ahead log (WAL). This, in turn, invokes the following methods of the observer:

void preWALRestore(...) / void postWALRestore(...)

Hooking into these calls gives you fine-grained control over what mutation is applied during the log replay process. You get access to the edit record, which you can use to inspect what is being applied.

State: open

A region is considered open when it is deployed to a region server and fully operational. At this point, all the operations discussed throughout the book can take place; for example, the region’s in-memory store could be flushed to disk, or the region could be split when it has grown too large. The possible hooks are:

void preFlush(...) / void postFlush(...)
void preCompact(...) / void postCompact(...)
void preSplit(...) / void postSplit(...)

This should be quite intuitive by now: the pre calls are executed before, while the post calls are executed after the respective operation. For example, using the preSplit() hook, you could effectively disable the built-in region splitting process and perform these operations manually.

State: pending close

The last group of hooks for the observers is for regions that go into the pending close state. This occurs when the region transitions from open to closed. Just before, and after, the region is closed the following hooks are executed:

void preClose(...,  boolean abortRequested) / 
void postClose(..., boolean abortRequested)

The abortRequested parameter indicates why a region was closed. Usually regions are closed during normal operation, when, for example, the region is moved to a different region server for load-balancing reasons. But there also is the possibility for a region server to have gone rogue and be aborted to avoid any side effects. When this happens, all hosted regions are also aborted, and you can see from the given parameter if that was the case.

Handling client API events

As opposed to the life-cycle events, all client API calls are explicitly sent from a client application to the region server. You have the opportunity to hook into these calls just before they are applied, and just thereafter. Here is the list of the available calls:

void preGet(...) / void postGet(...)

Called before and after a client makes an HTable.get() request

void prePut(...) / void postPut(...)

Called before and after a client makes an HTable.put() request

void preDelete(...) / void postDelete(...)

Called before and after a client makes an HTable.delete() request

boolean preCheckAndPut(...) / boolean postCheckAndPut(...)

Called before and after a client invokes an HTable.checkAndPut() call

boolean preCheckAndDelete(...) / boolean postCheckAndDelete(...)

Called before and after a client invokes an HTable.checkAndDelete() call

void preGetClosestRowBefore(...) / void postGetClosestRowBefore(...)

Called before and after a client invokes an HTable.getClosestRowBefore() call

boolean preExists(...) / boolean postExists(...)

Called before and after a client invokes an HTable.exists() call

long preIncrementColumnValue(...) / long postIncrementColumnValue(...)

Called before and after a client invokes an HTable.incrementColumnValue() call

void preIncrement(...) / void postIncrement(...)

Called before and after a client invokes an HTable.increment() call

InternalScanner preScannerOpen(...) / InternalScanner postScannerOpen(...)

Called before and after a client invokes an HTable.getScanner() call

boolean preScannerNext(...) / boolean postScannerNext(...)

Called before and after a client invokes a ResultScanner.next() call

void preScannerClose(...) / void postScannerClose(...)

Called before and after a client invokes a ResultScanner.close() call

The RegionCoprocessorEnvironment class

The environment instances provided to a coprocessor that is implementing the RegionObserver interface are based on the RegionCoprocessorEnvironment class—which in turn is implementing the CoprocessorEnvironment interface. The latter was discussed in The Coprocessor Class.

On top of the provided methods, the more specific, region-oriented subclass is adding the methods described in Table 4-12.

Table 4-12. Methods provided by the RegionCoprocessorEnvironment class, in addition to the inherited one
MethodDescription
HRegion getRegion()Returns a reference to the region the current observer is associated with
RegionServerServices getRegionServerServices()Provides access to the shared RegionServerServices instance

The getRegion() call can be used to get a reference to the hosting HRegion instance, and to invoke calls this class provides. In addition, your code can access the shared region server services instance, which is explained in Table 4-13.

Table 4-13. Methods provided by the RegionServerServices class
MethodDescription
boolean isStopping()Returns true when the region server is stopping.
HLog getWAL()Provides access to the write-ahead log instance.
CompactionRequestor getCompactionRequester()Provides access to the shared CompactionRequestor instance. This can be used to initiate compactions from within the coprocessor.
FlushRequester getFlushRequester()Provides access to the shared FlushRequester instance. This can be used to initiate memstore flushes.
RegionServerAccounting getRegionServerAccounting()Provides access to the shared RegionServerAccounting instance. It allows you to check on what the server currently has allocated—for example, the global memstore size.
postOpenDeployTasks(HRegion r, CatalogTracker ct, final boolean daughter)An internal call, invoked inside the region server.
HBaseRpcMetrics getRpcMetrics()Provides access to the shared HBaseRpcMetrics instance. It has details on the RPC statistics for the current server.

I will not be discussing all the details on the provided functionality, and instead refer you to the Java API documentation.[62]

The ObserverContext class

For the callbacks provided by the RegionObserver class, there is a special context handed in as the first parameter to all calls: the ObserverContext class. It provides access to the current environment, but also adds the crucial ability to indicate to the coprocessor framework what it should do after a callback is completed.

Note

The context instance is the same for all coprocessors in the execution chain, but with the environment swapped out for each coprocessor.

Table 4-14 lists the methods as provided by the context class.

Table 4-14. Methods provided by the ObserverContext class
MethodDescription
E getEnvironment()Returns the reference to the current coprocessor environment.
void bypass()When your code invokes this method, the framework is going to use your provided value, as opposed to what usually is returned.
void complete()Indicates to the framework that any further processing can be skipped, skipping the remaining coprocessors in the execution chain. It implies that this coprocessor’s response is definitive.
boolean shouldBypass()Used internally by the framework to check on the flag.
boolean shouldComplete()Used internally by the framework to check on the flag.
void prepare(E env)Prepares the context with the specified environment. This is used internally only. It is used by the static createAndPrepare() method.
static <T extends CoprocessorEnvironment> ObserverContext<T> createAndPrepare( T env, ObserverContext<T> context)Static function to initialize a context. When the provided context is null, it will create a new instance.

The important context functions are bypass() and complete(). These functions give your coprocessor implementation the option to control the subsequent behavior of the framework. The complete() call influences the execution chain of the coprocessors, while the bypass() call stops any further default processing on the server. Use it with the earlier example of avoiding automated region splits like so:

@Override
public void preSplit(ObserverContext<RegionCoprocessorEnvironment> e) {
  e.bypass();
}

Instead of having to implement your own RegionObserver, based on the interface, you can use the following base class to only implement what is needed.

The BaseRegionObserver class

This class can be used as the basis for all your observer-type coprocessors. It has placeholders for all methods required by the RegionObserver interface. They are all left blank, so by default nothing is done when extending this class. You must override all the callbacks that you are interested in to add the required functionality.

Example 4-20 is an observer that handles specific row key requests.

Example 4-20. Region observer checking for special get requests
public class RegionObserverExample extends BaseRegionObserver {
  public static final byte[] FIXED_ROW = Bytes.toBytes("@@@GETTIME@@@");

  @Override
  public void preGet(final ObserverContext<RegionCoprocessorEnvironment> e,
      final Get get, final List<KeyValue> results) throws IOException {
    if (Bytes.equals(get.getRow(), FIXED_ROW)) { 1
      KeyValue kv = new KeyValue(get.getRow(), FIXED_ROW, FIXED_ROW,
        Bytes.toBytes(System.currentTimeMillis()));
      results.add(kv); 2
    }
  }
}
1

Check if the request row key matches a well-known one.

2

Create a special KeyValue instance containing just the current time on the server.

Note

The following was added to the hbase-site.xml file to enable the coprocessor:

<property>
  <name>hbase.coprocessor.region.classes</name>
  <value>coprocessor.RegionObserverExample</value>
</property>

The class is available to the region server’s Java Runtime Environment because we have already added the JAR of the compiled repository to the HBASE_CLASSPATH variable in hbase-env.sh—see Deployment of Custom Filters for reference.

Do not forget to restart HBase, though, to make the changes to the static configuration files active.

The row key @@@GETTIME@@@ is handled by the observer’s preGet() hook, inserting the current time of the server. Using the HBase Shell—after deploying the code to servers—you can see this in action:

hbase(main):001:0> get 'testtable', '@@@GETTIME@@@'
COLUMN                          CELL
 @@@GETTIME@@@:@@@GETTIME@@@    timestamp=9223372036854775807, \
                                value=\x00\x00\x01/s@3\xD8
1 row(s) in 0.0410 seconds

hbase(main):002:0> Time.at(Bytes.toLong( \
  "\x00\x00\x01/s@3\xD8".to_java_bytes) / 1000)   
=> Wed Apr 20 16:11:18 +0200 2011

This requires an existing table, because trying to issue a get call to a nonexistent table will raise an error, before the actual get operation is executed. Also, the example does not set the bypass flag, in which case something like the following could happen:

hbase(main):003:0> create 'testtable2', 'colfam1'
0 row(s) in 1.3070 seconds

hbase(main):004:0> put 'testtable2', '@@@GETTIME@@@', \
  'colfam1:qual1', 'Hello there!'
0 row(s) in 0.1360 seconds

hbase(main):005:0> get 'testtable2', '@@@GETTIME@@@'
COLUMN                          CELL
 @@@GETTIME@@@:@@@GETTIME@@@    timestamp=9223372036854775807, \
                                value=\x00\x00\x01/sJ\xBC\xEC
 colfam1:qual1                  timestamp=1303309353184, value=Hello there!
2 row(s) in 0.0450 seconds

A new table is created and a row with the special row key is inserted. Subsequently, the row is retrieved. You can see how the artificial column is mixed with the actual one stored earlier. To avoid this issue, Example 4-21 adds the necessary e.bypass() call.

Example 4-21. Region observer checking for special get requests and bypassing further processing
    if (Bytes.equals(get.getRow(), FIXED_ROW)) {
      KeyValue kv = new KeyValue(get.getRow(), FIXED_ROW, FIXED_ROW,
        Bytes.toBytes(System.currentTimeMillis()));
      results.add(kv);
      e.bypass(); 1
    }
1

Once the special KeyValue is inserted, all further processing is skipped.

Note

You need to adjust the hbase-site.xml file to point to the new example:

<property>
  <name>hbase.coprocessor.region.classes</name>
  <value>coprocessor.RegionObserverWithBypassExample</value>
</property>

Just as before, please restart HBase after making these adjustments.

As expected, and using the shell once more, the result is now different:

hbase(main):069:0> get 'testtable2', '@@@GETTIME@@@'
COLUMN                          CELL
 @@@GETTIME@@@:@@@GETTIME@@@    timestamp=9223372036854775807, \
                                value=\x00\x00\x01/s]\x1D4
1 row(s) in 0.0470 seconds

Only the artificial column is returned, and since the default get operation is bypassed, it is the only column retrieved. Also note how the timestamp of this column is 9223372036854775807—which is Long.MAX_VALUE on purpose. Since the example creates the KeyValue instance without specifying a timestamp, it is set to HConstants.LATEST_TIMESTAMP by default, and that is, in turn, set to Long.MAX_VALUE. You can amend the example by adding a timestamp and see how that would be printed when using the shell (an exercise left to you).

The MasterObserver Class

The second subclass of Coprocessor discussed handles all possible callbacks the master server may initiate. The operations and API calls are explained in Chapter 5, though they can be classified as data-manipulation operations, similar to DDL used in relational database systems. For that reason, the MasterObserver class provides the following hooks:

void preCreateTable(...) / void postCreateTable(...)

Called before and after a table is created.

void preDeleteTable(...) / void postDeleteTable(...)

Called before and after a table is deleted.

void preModifyTable(...) / void postModifyTable(...)

Called before and after a table is altered.

void preAddColumn(...) / void postAddColumn(...)

Called before and after a column is added to a table.

void preModifyColumn(...) / void postModifyColumn(...)

Called before and after a column is altered.

void preDeleteColumn(...) / void postDeleteColumn(...)

Called before and after a column is deleted from a table.

void preEnableTable(...) / void postEnableTable(...)

Called before and after a table is enabled.

void preDisableTable(...) / void postDisableTable(...)

Called before and after a table is disabled.

void preMove(...) / void postMove(...)

Called before and after a region is moved.

void preAssign(...) / void postAssign(...)

Called before and after a region is assigned.

void preUnassign(...) / void postUnassign(...)

Called before and after a region is unassigned.

void preBalance(...) / void postBalance(...)

Called before and after the regions are balanced.

boolean preBalanceSwitch(...) / void postBalanceSwitch(...)

Called before and after the flag for the balancer is changed.

void preShutdown(...)

Called before the cluster shutdown is initiated. There is no post hook, because after the shutdown, there is no longer a cluster to invoke the callback.

void preStopMaster(...)

Called before the master process is stopped. There is no post hook, because after the master has stopped, there is no longer a process to invoke the callback.

The MasterCoprocessorEnvironment class

Similar to how the RegionCoprocessorEnvironment is enclosing a single RegionObserver coprocessor, the MasterCoprocessorEnvironment is wrapping MasterObserver instances. It also implements the CoprocessorEnvironment interface, thus giving you, for instance, access to the getTable() call to access data from within your own implementation.

On top of the provided methods, the more specific, master-oriented subclass adds the one method described in Table 4-15.

Table 4-15. The method provided by the MasterCoprocessorEnvironment class, in addition to the inherited one
MethodDescription
MasterServices getMasterServices()Provides access to the shared MasterServices instance

Your code can access the shared master services instance, the methods of which are listed and described in Table 4-16.

Table 4-16. Methods provided by the MasterServices class
MethodDescription
AssignmentManager getAssignmentManager()Gives you access to the assignment manager instance. It is responsible for all region assignment operations, such as assign, unassign, balance, and so on.
MasterFileSystem getMasterFileSystem()Provides you with an abstraction layer for all filesystem-related operations the master is involved in—for example, creating directories for table files and logfiles.
ServerManager getServerManager()Returns the server manager instance. With it you have access to the list of servers, live or considered dead, and more.
ExecutorService getExecutorService()Used by the master to schedule system-wide events.
void checkTableModifiable(byte[] tableName)Convenient to check if a table exists and is offline so that it can be altered.

I will not be discussing all the details on the provided functionality, and instead refer you to the Java API documentation once more.[63]

The BaseMasterObserver class

Either you can base your efforts to implement a MasterObserver on the interface directly, or you can extend the BaseMasterObserver class instead. It implements the interface while leaving all callback functions empty. If you were to use this class unchanged, it would not yield any kind of reaction.

Adding functionality is achieved by overriding the appropriate event methods. You have the choice of hooking your code into the pre and/or post calls.

Example 4-22 uses the post hook after a table was created to perform additional tasks.

Example 4-22. Master observer that creates a separate directory on the filesystem when a table is created
public class MasterObserverExample extends BaseMasterObserver {

  @Override
  public void postCreateTable(
    ObserverContext<MasterCoprocessorEnvironment> env,
    HRegionInfo[] regions, boolean sync)
  throws IOException {
    String tableName = regions[0].getTableDesc().getNameAsString(); 1

    MasterServices services = env.getEnvironment().getMasterServices();
    MasterFileSystem masterFileSystem = services.getMasterFileSystem(); 2
    FileSystem fileSystem = masterFileSystem.getFileSystem();

    Path blobPath = new Path(tableName + "-blobs"); 3
    fileSystem.mkdirs(blobPath);

  }
}
1

Get the new table’s name from the table descriptor.

2

Get the available services and retrieve a reference to the actual filesystem.

3

Create a new directory that will store binary data from the client application.

Note

You need to add the following to the hbase-site.xml file for the coprocessor to be loaded by the master process:

<property>
  <name>hbase.coprocessor.master.classes</name>
  <value>coprocessor.MasterObserverExample</value>
</property>

Just as before, restart HBase after making these adjustments.

Once you have activated the coprocessor, it is listening to the said events and will trigger your code automatically. The example is using the supplied services to create a directory on the filesystem. A fictitious application, for instance, could use it to store very large binary objects (known as blobs) outside of HBase.

To trigger the event, you can use the shell like so:

hbase(main):001:0> create 'testtable', 'colfam1'
0 row(s) in 0.4300 seconds

This creates the table and afterward calls the coprocessor’s postCreateTable() method. The Hadoop command-line tool can be used to verify the results:

$ bin/hadoop dfs -ls
Found 1 items
drwxr-xr-x  - larsgeorge supergroup  0 ... /user/larsgeorge/testtable-blobs

There are many things you can implement with the MasterObserver coprocessor. Since you have access to most of the shared master resources through the MasterServices instance, you should be careful what you do, as it can potentially wreak havoc.

Finally, because the environment is wrapped in an ObserverContext, you have the same extra flow controls, exposed by the bypass() and complete() methods. You can use them to explicitly disable certain operations or skip subsequent coprocessor execution, respectively.

Endpoints

The earlier RegionObserver example used a well-known row key to add a computed column during a get request. It seems that this could suffice to implement other functionality as well—for example, aggregation functions that return the sum of all values in a specific column.

Unfortunately, this does not work, as the row key defines which region is handling the request, therefore only sending the computation request to a single server. What we want, though, is a mechanism to send such a request to all regions, and therefore all region servers, so that they can build the sum of the columns they have access to locally. Once each region has returned its partial result, we can aggregate the total on the client side much more easily. If you were to have 1,000 regions and 1 million columns, you would receive 1,000 decimal numbers on the client side—one for each region. This is fast to aggregate for the final result.

If you were to scan the entire table using a purely client API approach, in a worst-case scenario you would transfer all 1 million numbers to build the sum. Moving such computation to the servers where the data resides is a much better option. HBase, though, does not know what you may need, so to overcome this limitation, the coprocessor framework provides you with a dynamic call implementation, represented by the endpoint concept.

The CoprocessorProtocol interface

In order to provide a custom RPC protocol to clients, a coprocessor implementation defines an interface that extends CoprocessorProtocol. The interface can define any methods that the coprocessor wishes to expose. Using this protocol, you can communicate with the coprocessor instances via the following calls, provided by HTable:

<T extends CoprocessorProtocol> T coprocessorProxy(
  Class<T> protocol, byte[] row)
<T extends CoprocessorProtocol, R> Map<byte[],R> coprocessorExec(
  Class<T> protocol, byte[] startKey, byte[] endKey,
  Batch.Call<T,R> callable)
<T extends CoprocessorProtocol, R> void coprocessorExec(
  Class<T> protocol, byte[] startKey, byte[] endKey,
  Batch.Call<T,R> callable, Batch.Callback<R> callback)

Since CoprocessorProtocol instances are associated with individual regions within the table, the client RPC calls must ultimately identify which regions should be used in the CoprocessorProtocol method invocations. Though regions are seldom handled directly in client code and the region names may change over time, the coprocessor RPC calls use row keys to identify which regions should be used for the method invocations. Clients can call CoprocessorProtocol methods against one of the following:

Single region

This is done by calling coprocessorProxy() with a single row key. This returns a dynamic proxy of the CoprocessorProtocol interface, which uses the region containing the given row key—even if the row does not exist—as the RPC endpoint.

Range of regions

You can call coprocessorExec() with a start row key and an end row key. All regions in the table from the one containing the start row key to the one containing the end row key (inclusive) will be used as the RPC endpoints.

Note

The row keys passed as parameters to the HTable methods are not passed to the CoprocessorProtocol implementations. They are only used to identify the regions for endpoints of the remote calls.

The Batch class defines two interfaces used for CoprocessorProtocol invocations against multiple regions: clients implement Batch.Call to call methods of the actual CoprocessorProtocol instance. The interface’s call() method will be called once per selected region, passing the CoprocessorProtocol instance for the region as a parameter.

Clients can optionally implement Batch.Callback to be notified of the results from each region invocation as they complete. The instance’s

void update(byte[] region, byte[] row, R result)

method will be called with the value returned by R call(T instance) from each region.

The BaseEndpointCoprocessor class

Implementing an endpoint involves the following two steps:

  1. Extend the CoprocessorProtocol interface.

    This specifies the communication details for the new endpoint: it defines the RPC protocol between the client and the servers.

  2. Extend the BaseEndpointCoprocessor class.

    You need to provide the actual implementation of the endpoint by extending both the abstract BaseEndpointCoprocessor class and the protocol interface provided in step 1, defining your endpoint protocol.

Example 4-23 implements the CoprocessorProtocol to add custom functions to HBase. A client can invoke these remote calls to retrieve the number of rows and KeyValues in each region where it is running.

Example 4-23. Endpoint protocol, adding a row and KeyValue count method
public interface RowCountProtocol extends CoprocessorProtocol {
  long getRowCount() throws IOException;

  long getRowCount(Filter filter) throws IOException;

  long getKeyValueCount() throws IOException;
}

Step 2 is to combine this new protocol interface with a class that also extends BaseEndpointCoprocessor. Example 4-24 uses the environment provided to access the data using an InternalScanner instance.

Example 4-24. Endpoint implementation, adding a row and KeyValue count method
public class RowCountEndpoint extends BaseEndpointCoprocessor
  implements RowCountProtocol {

  private long getCount(Filter filter, boolean countKeyValues)
    throws IOException {
    Scan scan = new Scan();
    scan.setMaxVersions(1);
    if (filter != null) {
      scan.setFilter(filter);
    }
    RegionCoprocessorEnvironment environment =
      (RegionCoprocessorEnvironment) getEnvironment();
    // use an internal scanner to perform scanning.
    InternalScanner scanner = environment.getRegion().getScanner(scan);
    int result = 0;
    try {
      List<KeyValue> curVals = new ArrayList<KeyValue>();
      boolean done = false;
      do {
        curVals.clear();
        done = scanner.next(curVals);
        result += countKeyValues ? curVals.size() : 1;
      } while (done);
    } finally {
      scanner.close();
    }
    return result;
  }

  @Override
  public long getRowCount() throws IOException {
    return getRowCount(new FirstKeyOnlyFilter());
  }

  @Override
  public long getRowCount(Filter filter) throws IOException {
    return getCount(filter, false);
  }

  @Override
  public long getKeyValueCount() throws IOException {
    return getCount(null, true);
  }
}

Note how the FirstKeyOnlyFilter is used to reduce the number of columns being scanned.

Note

You need to add (or amend from the previous examples) the following to the hbase-site.xml file for the endpoint coprocessor to be loaded by the region server process:

<property>
  <name>hbase.coprocessor.region.classes</name>
  <value>coprocessor.RowCountEndpoint</value>
</property>

Just as before, restart HBase after making these adjustments.

Example 4-25 showcases how a client can use the provided calls of HTable to execute the deployed coprocessor endpoint functions. Since the calls are sent to each region separately, there is a need to summarize the total number at the end.

Example 4-25. Using the custom row-count endpoint
public class EndpointExample {

  public static void main(String[] args) throws IOException {
    Configuration conf = HBaseConfiguration.create();
    HTable table = new HTable(conf, "testtable");
    try {
      Map<byte[], Long> results = table.coprocessorExec(
        RowCountProtocol.class, 1
        null, null, 2
        new Batch.Call<RowCountProtocol, Long>() { 3

          @Override
          public Long call(RowCountProtocol counter) throws IOException {
            return counter.getRowCount(); 4
          }
        });

      long total = 0;
      for (Map.Entry<byte[], Long> entry : results.entrySet()) { 5
        total += entry.getValue().longValue();
        System.out.println("Region: " + Bytes.toString(entry.getKey()) +
          ", Count: " + entry.getValue());
      }
      System.out.println("Total Count: " + total);
    } catch (Throwable throwable) {
      throwable.printStackTrace();
    }
  }
}
1

Define the protocol interface being invoked.

2

Set start and end row keys to “null” to count all rows.

3

Create an anonymous class to be sent to all region servers.

4

The call() method is executing the endpoint functions.

5

Iterate over the returned map, containing the result for each region separately.

The code emits the region names, the count for each of them, and eventually the grand total:

Region: testtable,,1303417572005.51f9e2251c29ccb2...cbcb0c66858f., Count: 2
Region: testtable,row3,1303417572005.7f3df4dcba3f...dbc99fce5d87., Count: 3
Total Count: 5

The Batch class also offers a more convenient way to access the remote endpoint: using Batch.forMethod(), you can retrieve a fully configured Batch.Call instance, ready to be sent to the region servers. Example 4-26 amends the previous example to make use of this shortcut.

Example 4-26. One way in which Batch.forMethod() can reduce the client code size
      Batch.Call call = Batch.forMethod(RowCountProtocol.class,
        "getKeyValueCount");
      Map<byte[], Long> results = table.coprocessorExec(
        RowCountProtocol.class, null, null, call);

The forMethod() call uses the Java reflection API to retrieve the named method. The returned Batch.Call instance will execute the endpoint function and return the same data types as defined by the protocol for this method.

However, if you want to perform additional processing on the results, implementing Batch.Call directly will provide more power and flexibility. This can be seen in Example 4-27, which combines the row and key-value count for each region.

Example 4-27. Extending the batch call to execute multiple endpoint calls
      Map<byte[], Pair<Long, Long>> results = table.coprocessorExec(
        RowCountProtocol.class,
        null, null,
        new Batch.Call<RowCountProtocol, Pair<Long, Long>>() {
          public Pair<Long, Long> call(RowCountProtocol counter)
            throws IOException {
            return new Pair(counter.getRowCount(),
              counter.getKeyValueCount());
          }
        });

      long totalRows = 0;
      long totalKeyValues = 0;
      for (Map.Entry<byte[], Pair<Long, Long>> entry : results.entrySet()) {
        totalRows += entry.getValue().getFirst().longValue();
        totalKeyValues += entry.getValue().getSecond().longValue();
        System.out.println("Region: " + Bytes.toString(entry.getKey()) +
          ", Count: " + entry.getValue());
      }
      System.out.println("Total Row Count: " + totalRows);
      System.out.println("Total KeyValue Count: " + totalKeyValues);

Running the code will yield the following output:

Region: testtable,,1303420252525.9c336bd2b294a...0647a1f2d13b., Count: {2,4}
Region: testtable,row3,1303420252525.6d7c95de8a7...386cfec7f2., Count: {3,6}
Total Row Count: 5
Total KeyValue Count: 10

The examples so far all used the coprocessorExec() calls to batch the requests across all regions, matching the given start and end row keys. Example 4-28 uses the coprocessorProxy() call to get a local, client-side proxy of the endpoint. Since a row key is specified, the client API will route the proxy calls to the region—and to the server currently hosting it—that contains the given key, regardless of whether it actually exists: regions are specified with a start and end key only, so the match is done by range only.

Example 4-28. Using the proxy call of HTable to invoke an endpoint on a single region
      RowCountProtocol protocol = table.coprocessorProxy(
        RowCountProtocol.class, Bytes.toBytes("row4"));
      long rowsInRegion = protocol.getRowCount();
      System.out.println("Region Row Count: " + rowsInRegion);

With the proxy reference, you can invoke any remote function defined in your CoprocessorProtocol implementation from within client code, and it returns the result for the region that served the request. Figure 4-5 shows the difference between the two approaches.

Coprocessor calls batched and executed in parallel, and addressing a single region only
Figure 4-5. Coprocessor calls batched and executed in parallel, and addressing a single region only

HTablePool

Instead of creating an HTable instance for every request from your client application, it makes much more sense to create one initially and subsequently reuse them.

The primary reason for doing so is that creating an HTable instance is a fairly expensive operation that takes a few seconds to complete. In a highly contended environment with thousands of requests per second, you would not be able to use this approach at all—creating the HTable instance would be too slow. You need to create the instance at startup and use it for the duration of your client’s life cycle.

There is an additional issue with the HTable being reused by multiple threads within the same process.

Warning

The HTable class is not thread-safe, that is, the local write buffer is not guarded against concurrent modifications. Even if you were to use setAutoFlush(true) (which is the default currently; see Client-side write buffer) this is not advisable. Instead, you should use one instance of HTable for each thread you are running in your client application.

Clients can solve this problem using the HTablePool class. It only serves one purpose, namely to pool client API instances to the HBase cluster. Creating the pool is accomplished using one of these constructors:

HTablePool()
HTablePool(Configuration config, int maxSize)
HTablePool(Configuration config, int maxSize,
  HTableInterfaceFactory tableFactory)

The default constructor—the one without any parameters—creates a pool with the configuration found in the classpath, while setting the maximum size to unlimited. This equals calling the second constructor like so:

Configuration conf = HBaseConfiguration.create()
HTablePool pool = new HTablePool(conf, Integer.MAX_VALUE)

Setting the maxSize parameter gives you control over how many HTable instances a pool is allowed to contain. The optional tableFactory parameter can be used to hand in a custom factory class that creates the actual HTable instances.

Using the pool is a matter of employing the following calls:

HTableInterface getTable(String tableName)
HTableInterface getTable(byte[] tableName)
void putTable(HTableInterface table)

The getTable() calls retrieve an HTable instance from the pool, while the putTable() returns it after you are done using it. Both internally defer some of the work to the mentioned HTableInterfaceFactory instance the pool is configured with.

Note

Setting the maxSize parameter during the construction of a pool does not impose an upper limit on the number of HTableInterface instances the pool is allowing you to retrieve. You can call getTable() as much as you like to get a valid table reference.

The maximum size of the pool only sets the number of HTableInterface instances retained within the pool, for a given table name. For example, when you set the size to 5, but then call getTable() 10 times, you have created 10 HTable instances (assuming you use the default). Upon returning them using the putTable() method, five are kept for subsequent use, while the additional five you requested are simply ignored. More importantly, the release mechanisms of the factory are not invoked.

Finally, there are calls to close the pool for specific tables:

void closeTablePool(String tableName)
void closeTablePool(byte[] tableName)

Obviously, both do the same thing, with one allowing you to specify a String, and the other a byte array—use whatever is more convenient for you.

The close call of the pool iterates over the list of retained references for a specific table, invoking the release mechanism provided by the factory. This is useful for freeing all resources for a named table, and starting all over again. Keep in mind that for all resources to be released, you would need to call these methods for every table name you have used so far.

Example 4-29 uses these methods to create and use a pool.

Example 4-29. Using the HTablePool class to share HTable instances
    Configuration conf = HBaseConfiguration.create();
    HTablePool pool = new HTablePool(conf, 5); 1

    HTableInterface[] tables = new HTableInterface[10];
    for (int n = 0; n < 10; n++) {
      tables[n] = pool.getTable("testtable"); 2
      System.out.println(Bytes.toString(tables[n].getTableName()));
    }

    for (int n = 0; n < 5; n++) {
      pool.putTable(tables[n]); 3
    }

    pool.closeTablePool("testtable"); 4
1

Create the pool, allowing five HTables to be retained.

2

Get 10 HTable references, which is more than the pool is retaining.

3

Return HTable instances to the pool. Five will be kept, while the additional five will be dropped.

4

Close the entire pool, releasing all retained table references.

You should receive the following output on the console:

Acquiring tables...
testtable
testtable
testtable
testtable
testtable
testtable
testtable
testtable
testtable
testtable
Releasing tables...
Closing pool...

Note that using more than the configured maximum size of the pool works as we discussed earlier: we receive more references than were configured. Returning the tables to the pool is not yielding any logging or printout, though, doing its work behind the scenes.

Connection Handling

Every instance of HTable requires a connection to the remote servers. This is internally represented by the HConnection class, and more importantly managed process-wide by the shared HConnectionManager class. From a user perspective, there is usually no immediate need to deal with either of these two classes; instead, you simply create a new Configuration instance, and use that with your client API calls.

Internally, the connections are keyed in a map, where the key is the Configuration instance you are using. In other words, if you create a number of HTable instances while providing the same configuration reference, they all share the same underlying HConnection instance. There are good reasons for this to happen:

Share ZooKeeper connections

As each client eventually needs a connection to the ZooKeeper ensemble to perform the initial lookup of where user table regions are located, it makes sense to share this connection once it is established, with all subsequent client instances.

Cache common resources

Every lookup performed through ZooKeeper, or the -ROOT-, or .META. table, of where user table regions are located requires network round-trips. The location is then cached on the client side to reduce the amount of network traffic, and to speed up the lookup process.

Since this list is the same for every local client connecting to a remote cluster, it is equally useful to share it among multiple clients running in the same process. This is accomplished by the shared HConnection instance.

In addition, when a lookup fails—for instance, when a region was split—the connection has the built-in retry mechanism to refresh the stale cache information. This is then immediately available to all other clients sharing the same connection reference, thus further reducing the number of network round-trips initiated by a client.

Another class that benefits from the same advantages is the HTablePool: all of the pooled HTable instances automatically share the provided configuration instances, and therefore also the shared connection it references to. This also means you should always create your own configuration, whenever you plan to instantiate more than one HTable instance. For example:

HTable table1 = new HTable("table1");
//...
HTable table2 = new HTable("table2");

is less efficient than the following code:

Configuration conf = HBaseConfiguration.create();
HTable table1 = new HTable(conf, "table1");
//...
HTable table2 = new HTable(conf, "table2");

The latter implicitly uses the connection sharing, as provided by the HBase client-side API classes.

Note

There are no known performance implications for sharing a connection, even for heavily multithreaded applications.

The drawback of sharing a connection is the cleanup: when you do not explicitly close a connection, it is kept open until the client process exits. This can result in many connections that remain open to ZooKeeper, especially for heavily distributed applications, such as MapReduce jobs talking to HBase. In a worst-case scenario, you can run out of available connections, and receive an IOException instead.

You can avoid this problem by explicitly closing the shared connection, when you are done using it. This is accomplished with the close() method provided by HTable. The call decreases an internal reference count and eventually closes all shared resources, such as the connection to the ZooKeeper ensemble, and removes the connection reference from the internal list.

Every time you reuse a Configuration instance, the connection manager internally increases the reference count, so you only have to make sure you call the close() method to trigger the cleanup. There is also an explicit call to clear out a connection, or all open connections:

static void deleteConnection(Configuration conf, boolean stopProxy)
static void deleteAllConnections(boolean stopProxy)

Since all shared connections are internally keyed by the configuration instance, you need to provide that instance to close the associated connection. The boolean stopProxy parameter lets you further enforce the cleanup of the entire RPC stack of the client—which is its umbilical cord to the remote servers. Only use true when you do not need any further communication with the server to take place.

The deleteAllConnections() call only requires the boolean stopProxy flag; it simply iterates over the entire list of shared connections known to the connection manager and closes them.

If you are ever in need of using a connection explicitly, you can make use of the getConnection() call like so:

Configuration newConfig = new Configuration(originalConf);
HConnection connection = HConnectionManager.getConnection(newConfig);
// Use the connection to your hearts' delight and then when done...
HConnectionManager.deleteConnection(newConfig, true);

The advantage is that you are the sole user of that connection, but you must make sure you close it out properly as well.



[58] The various filter methods are discussed in Custom Filters.

[59] See Table 4-5 for an overview of compatible filters.

[60] Coprocessors are a fairly recent addition to HBase, and are therefore still in flux. Check with the online documentation and issue tracking system to see what is not yet implemented, or planned to be added.

[62] The Java HBase classes are documented online at http://hbase.apache.org/apidocs/.

[63] The Java HBase classes are documented online at http://hbase.apache.org/apidocs/.

Get HBase: The Definitive Guide now with the O’Reilly learning platform.

O’Reilly members experience books, live events, courses curated by job role, and more from O’Reilly and nearly 200 top publishers.