O'Reilly logo

HBase: The Definitive Guide, 2nd Edition by Lars George

Stay ahead with the world's most comprehensive technology and business learning platform.

With Safari, you learn the way you learn best. Get unlimited access to videos, live online training, learning paths, books, tutorials, and more.

Start Free Trial

No credit card required

Chapter 4. Client API: Advanced Features

Now that you understand the basic client API, we will discuss the advanced features that HBase offers to clients.

Filters

HBase filters are a powerful feature that can greatly enhance your effectiveness when working with data stored in tables. You will find predefined filters, already provided by HBase for your use, as well as a framework you can use to implement your own. You will now be introduced to both.

Introduction to Filters

The two prominent read functions for HBase are Table.get() and Table.scan(), both supporting either direct access to data or the use of a start and end key, respectively. You can limit the data retrieved by progressively adding more limiting selectors to the query. These include column families, column qualifiers, timestamps or ranges, as well as version numbers.

While this gives you control over what is included, it is missing more fine-grained features, such as selection of keys, or values, based on regular expressions. Both classes support filters for exactly these reasons: what cannot be solved with the provided API functionality selecting the required row or column keys, or values, can be achieved with filters. The base interface is aptly named Filter, and there is a list of concrete classes supplied by HBase that you can use without doing any programming.

You can, on the other hand, extend the Filter classes to implement your own requirements. All the filters are actually applied on the server side, also referred to as predicate pushdown. This ensures the most efficient selection of the data that needs to be transported back to the client. You could implement most of the filter functionality in your client code as well, but you would have to transfer much more data—something you need to avoid at scale.

Figure 4-1 shows how the filters are configured on the client, then serialized over the network, and then applied on the server.

hbas 0401
Figure 4-1. The filters created on the client side, sent through the RPC, and executed on the server side

The Filter Hierarchy

The lowest level in the filter hierarchy is the Filter interface, and the abstract FilterBase class that implements an empty shell, or skeleton, that is used by the actual filter classes to avoid having the same boilerplate code in each of them. Most concrete filter classes are direct descendants of FilterBase, but a few use another, intermediate ancestor class. They all work the same way: you define a new instance of the filter you want to apply and hand it to the Get or Scan instances, using:

setFilter(filter)

While you initialize the filter instance itself, you often have to supply parameters for whatever the filter is designed for. There is a special subset of filters, based on CompareFilter, that ask you for at least two specific parameters, since they are used by the base class to perform its task. You will learn about the two parameter types next so that you can use them in context.

Filters have access to the entire row they are applied to. This means that they can decide the fate of a row based on any available information. This includes the row key, column qualifiers, actual value of a column, timestamps, and so on. When referring to values, or comparisons, as we will discuss shortly, this can be applied to any of these details. Specific filter implementations are available that consider only one of those criteria each.

While filters can apply their logic to a specific row, they have no state and cannot span across multiple rows. There are also some scan related features—such as batching (see “Scanner Batching”)--that counteract the ability of a filter to do its work. We will discuss these limitations in due course below.

Comparison Operators

As CompareFilter-based filters add one more feature to the base FilterBase class, namely the compare() operation, it has to have a user-supplied operator type that defines how the result of the comparison is interpreted. The values are listed in Table 4-1.

Table 4-1. The possible comparison operators for CompareFilter-based filters
Operator Description

LESS

Match values less than the provided one.

LESS_OR_EQUAL

Match values less than or equal to the provided one.

EQUAL

Do an exact match on the value and the provided one.

NOT_EQUAL

Include everything that does not match the provided value.

GREATER_OR_EQUAL

Match values that are equal to or greater than the provided one.

GREATER

Only include values greater than the provided one.

NO_OP

Exclude everything.

The comparison operators define what is included, or excluded, when the filter is applied. This allows you to select the data that you want as either a range, subset, or exact and single match.

Comparators

The second type that you need to provide to CompareFilter-related classes is a comparator, which is needed to compare various values and keys in different ways. They are derived from ByteArrayComparable, which implements the Java Comparable interface. You do not have to go into the details if you just want to use an implementation provided by HBase and listed in Table 4-2. The constructors usually take the control value, that is, the one to compare each table value against.

Table 4-2. The HBase-supplied comparators, used with CompareFilter-based filters
Comparator Description

LongComparator

Assumes the given value array is a Java Long number and uses Bytes.toLong() to convert it.

BinaryComparator

Uses Bytes.compareTo() to compare the current with the provided value.

BinaryPrefixComparator

Similar to the above, but only compares up to the provided value’s length.

NullComparator

Does not compare against an actual value, but checks whether a given one is null, or not null.

BitComparator

Performs a bitwise comparison, providing a BitwiseOp enumeration with AND, OR, and XOR operators.

RegexStringComparator

Given a regular expression at instantiation, this comparator does a pattern match on the data.

SubstringComparator

Treats the value and table data as String instances and performs a contains() check.

Caution

The last four comparators listed in Table 4-2—the NullComparator, BitComparator, RegexStringComparator, and SubstringComparatoronly work with the EQUAL and NOT_EQUAL operators, as the compareTo() of these comparators returns 0 for a match or 1 when there is no match. Using them in a LESS or GREATER comparison will yield erroneous results.

Each of the comparators usually has a constructor that takes the comparison value. In other words, you need to define a value you compare each cell against. Some of these constructors take a byte[], a byte array, to do the binary comparison, for example, while others take a String parameter—since the data point compared against is assumed to be some sort of readable text. Example 4-1 shows some of these in action.

Caution

The string-based comparators, RegexStringComparator and SubstringComparator, are more expensive in comparison to the purely byte-based versions, as they need to convert a given value into a String first. The subsequent string or regular expression operation also adds to the overall cost.

Comparison Filters

The first type of supplied filter implementations are the comparison filters. They take the comparison operator and comparator instance as described above. The constructor of each of them has the same signature, inherited from CompareFilter:

CompareFilter(final CompareOp compareOp, final ByteArrayComparable comparator)

You need to supply the comparison operator and comparison class for the filters to do their work. Next you will see the actual filters implementing a specific comparison.

Please keep in mind that the general contract of the HBase filter API means you are filtering out information—filtered data is omitted from the results returned to the client. The filter is not specifying what you want to have, but rather what you do not want to have returned when reading data.

In contrast, all filters based on CompareFilter are doing the opposite, in that they include the matching values. In other words, be careful when choosing the comparison operator, as it makes the difference in regard to what the server returns. For example, instead of using LESS to skip some information, you may need to use GREATER_OR_EQUAL to include the desired data points.

RowFilter

This filter gives you the ability to filter data based on row keys.

Example 4-1 shows how the filter can use different comparator instances to get the desired results. It also uses various operators to include the row keys, while omitting others. Feel free to modify the code, changing the operators to see the possible results.

Example 4-1. Example using a filter to select specific rows
    Scan scan = new Scan();
    scan.addColumn(Bytes.toBytes("colfam1"), Bytes.toBytes("col-1"));

    Filter filter1 = new RowFilter(CompareFilter.CompareOp.LESS_OR_EQUAL, 1
      new BinaryComparator(Bytes.toBytes("row-22")));
    scan.setFilter(filter1);
    ResultScanner scanner1 = table.getScanner(scan);
    for (Result res : scanner1) {
      System.out.println(res);
    }
    scanner1.close();

    Filter filter2 = new RowFilter(CompareFilter.CompareOp.EQUAL, 2
      new RegexStringComparator(".*-.5"));
    scan.setFilter(filter2);
    ResultScanner scanner2 = table.getScanner(scan);
    for (Result res : scanner2) {
      System.out.println(res);
    }
    scanner2.close();

    Filter filter3 = new RowFilter(CompareFilter.CompareOp.EQUAL, 3
      new SubstringComparator("-5"));
    scan.setFilter(filter3);
    ResultScanner scanner3 = table.getScanner(scan);
    for (Result res : scanner3) {
      System.out.println(res);
    }
    scanner3.close();
1

Create filter, while specifying the comparison operator and comparator. Here an exact match is needed.

2

Another filter, this time using a regular expression to match the row keys.

3

The third filter uses a substring match approach.

Here is the full printout of the example on the console:

Adding rows to table...
Scanning table #1...
keyvalues={row-1/colfam1:col-1/1427273897619/Put/vlen=7/seqid=0}
keyvalues={row-10/colfam1:col-1/1427273899185/Put/vlen=8/seqid=0}
keyvalues={row-100/colfam1:col-1/1427273908651/Put/vlen=9/seqid=0}
keyvalues={row-11/colfam1:col-1/1427273899343/Put/vlen=8/seqid=0}
keyvalues={row-12/colfam1:col-1/1427273899496/Put/vlen=8/seqid=0}
keyvalues={row-13/colfam1:col-1/1427273899643/Put/vlen=8/seqid=0}
keyvalues={row-14/colfam1:col-1/1427273899785/Put/vlen=8/seqid=0}
keyvalues={row-15/colfam1:col-1/1427273899925/Put/vlen=8/seqid=0}
keyvalues={row-16/colfam1:col-1/1427273900064/Put/vlen=8/seqid=0}
keyvalues={row-17/colfam1:col-1/1427273900202/Put/vlen=8/seqid=0}
keyvalues={row-18/colfam1:col-1/1427273900343/Put/vlen=8/seqid=0}
keyvalues={row-19/colfam1:col-1/1427273900484/Put/vlen=8/seqid=0}
keyvalues={row-2/colfam1:col-1/1427273897860/Put/vlen=7/seqid=0}
keyvalues={row-20/colfam1:col-1/1427273900623/Put/vlen=8/seqid=0}
keyvalues={row-21/colfam1:col-1/1427273900757/Put/vlen=8/seqid=0}
keyvalues={row-22/colfam1:col-1/1427273900881/Put/vlen=8/seqid=0}
Scanning table #2...
keyvalues={row-15/colfam1:col-1/1427273899925/Put/vlen=8/seqid=0}
keyvalues={row-25/colfam1:col-1/1427273901253/Put/vlen=8/seqid=0}
keyvalues={row-35/colfam1:col-1/1427273902480/Put/vlen=8/seqid=0}
keyvalues={row-45/colfam1:col-1/1427273903582/Put/vlen=8/seqid=0}
keyvalues={row-55/colfam1:col-1/1427273904633/Put/vlen=8/seqid=0}
keyvalues={row-65/colfam1:col-1/1427273905577/Put/vlen=8/seqid=0}
keyvalues={row-75/colfam1:col-1/1427273906453/Put/vlen=8/seqid=0}
keyvalues={row-85/colfam1:col-1/1427273907327/Put/vlen=8/seqid=0}
keyvalues={row-95/colfam1:col-1/1427273908211/Put/vlen=8/seqid=0}
Scanning table #3...
keyvalues={row-5/colfam1:col-1/1427273898394/Put/vlen=7/seqid=0}
keyvalues={row-50/colfam1:col-1/1427273904116/Put/vlen=8/seqid=0}
keyvalues={row-51/colfam1:col-1/1427273904219/Put/vlen=8/seqid=0}
keyvalues={row-52/colfam1:col-1/1427273904324/Put/vlen=8/seqid=0}
keyvalues={row-53/colfam1:col-1/1427273904428/Put/vlen=8/seqid=0}
keyvalues={row-54/colfam1:col-1/1427273904536/Put/vlen=8/seqid=0}
keyvalues={row-55/colfam1:col-1/1427273904633/Put/vlen=8/seqid=0}
keyvalues={row-56/colfam1:col-1/1427273904729/Put/vlen=8/seqid=0}
keyvalues={row-57/colfam1:col-1/1427273904823/Put/vlen=8/seqid=0}
keyvalues={row-58/colfam1:col-1/1427273904919/Put/vlen=8/seqid=0}
keyvalues={row-59/colfam1:col-1/1427273905015/Put/vlen=8/seqid=0}

You can see how the first filter did an exact match on the row key, including all of those rows that have a key, equal to or less than the given one. Note once again the lexicographical sorting and comparison, and how it filters the row keys.

The second filter does a regular expression match, while the third uses a substring match approach. The results show that the filters work as advertised.

FamilyFilter

This filter works very similar to the RowFilter, but applies the comparison to the column families available in a row—as opposed to the row key. Using the available combinations of operators and comparators you can filter what is included in the retrieved data on a column family level. Example 4-2 shows how to use this.

Example 4-2. Example using a filter to include only specific column families
    Filter filter1 = new FamilyFilter(CompareFilter.CompareOp.LESS, 1
      new BinaryComparator(Bytes.toBytes("colfam3")));

    Scan scan = new Scan();
    scan.setFilter(filter1);
    ResultScanner scanner = table.getScanner(scan); 2
    for (Result result : scanner) {
      System.out.println(result);
    }
    scanner.close();

    Get get1 = new Get(Bytes.toBytes("row-5"));
    get1.setFilter(filter1);
    Result result1 = table.get(get1); 3
    System.out.println("Result of get(): " + result1);

    Filter filter2 = new FamilyFilter(CompareFilter.CompareOp.EQUAL,
      new BinaryComparator(Bytes.toBytes("colfam3")));
    Get get2 = new Get(Bytes.toBytes("row-5")); 4
    get2.addFamily(Bytes.toBytes("colfam1"));
    get2.setFilter(filter2);
    Result result2 = table.get(get2); 5
    System.out.println("Result of get(): " + result2);
1

Create filter, while specifying the comparison operator and comparator.

2

Scan over table while applying the filter.

3

Get a row while applying the same filter.

4

Create a filter on one column family while trying to retrieve another.

5

Get the same row while applying the new filter, this will return “NONE”.

The output—reformatted and abbreviated for the sake of readability—shows the filter in action. The input data has four column families, with two columns each, and 10 rows in total.

Adding rows to table...
Scanning table...
keyvalues={row-1/colfam1:col-1/1427274088598/Put/vlen=7/seqid=0,
           row-1/colfam1:col-2/1427274088615/Put/vlen=7/seqid=0,
           row-1/colfam2:col-1/1427274088598/Put/vlen=7/seqid=0,
           row-1/colfam2:col-2/1427274088615/Put/vlen=7/seqid=0}
keyvalues={row-10/colfam1:col-1/1427274088673/Put/vlen=8/seqid=0,
           row-10/colfam1:col-2/1427274088675/Put/vlen=8/seqid=0,
           row-10/colfam2:col-1/1427274088673/Put/vlen=8/seqid=0,
           row-10/colfam2:col-2/1427274088675/Put/vlen=8/seqid=0}
...
keyvalues={row-9/colfam1:col-1/1427274088669/Put/vlen=7/seqid=0,
           row-9/colfam1:col-2/1427274088671/Put/vlen=7/seqid=0,
           row-9/colfam2:col-1/1427274088669/Put/vlen=7/seqid=0,
           row-9/colfam2:col-2/1427274088671/Put/vlen=7/seqid=0}

Result of get(): keyvalues={
           row-5/colfam1:col-1/1427274088652/Put/vlen=7/seqid=0,
           row-5/colfam1:col-2/1427274088654/Put/vlen=7/seqid=0,
           row-5/colfam2:col-1/1427274088652/Put/vlen=7/seqid=0,
           row-5/colfam2:col-2/1427274088654/Put/vlen=7/seqid=0}

Result of get(): keyvalues=NONE

The last get() shows that you can (inadvertently) create an empty set by applying a filter for exactly one column family, while specifying a different column family selector using addFamily().

QualifierFilter

Example 4-3 shows how the same logic is applied on the column qualifier level. This allows you to filter specific columns from the table.

Example 4-3. Example using a filter to include only specific column qualifiers
    Filter filter = new QualifierFilter(CompareFilter.CompareOp.LESS_OR_EQUAL,
      new BinaryComparator(Bytes.toBytes("col-2")));

    Scan scan = new Scan();
    scan.setFilter(filter);
    ResultScanner scanner = table.getScanner(scan);
    for (Result result : scanner) {
      System.out.println(result);
    }
    scanner.close();

    Get get = new Get(Bytes.toBytes("row-5"));
    get.setFilter(filter);
    Result result = table.get(get);
    System.out.println("Result of get(): " + result);

The output is the following (abbreviated again):

Adding rows to table...
Scanning table...
keyvalues={row-1/colfam1:col-1/1427274739258/Put/vlen=7/seqid=0,
           row-1/colfam1:col-10/1427274739309/Put/vlen=8/seqid=0,
           row-1/colfam1:col-2/1427274739272/Put/vlen=7/seqid=0,
           row-1/colfam2:col-1/1427274739258/Put/vlen=7/seqid=0,
           row-1/colfam2:col-10/1427274739309/Put/vlen=8/seqid=0,
           row-1/colfam2:col-2/1427274739272/Put/vlen=7/seqid=0}
...
keyvalues={row-9/colfam1:col-1/1427274739441/Put/vlen=7/seqid=0,
           row-9/colfam1:col-10/1427274739458/Put/vlen=8/seqid=0,
           row-9/colfam1:col-2/1427274739443/Put/vlen=7/seqid=0,
           row-9/colfam2:col-1/1427274739441/Put/vlen=7/seqid=0,
           row-9/colfam2:col-10/1427274739458/Put/vlen=8/seqid=0,
           row-9/colfam2:col-2/1427274739443/Put/vlen=7/seqid=0}

Result of get(): keyvalues={
           row-5/colfam1:col-1/1427274739366/Put/vlen=7/seqid=0,
           row-5/colfam1:col-10/1427274739384/Put/vlen=8/seqid=0,
           row-5/colfam1:col-2/1427274739368/Put/vlen=7/seqid=0,
           row-5/colfam2:col-1/1427274739366/Put/vlen=7/seqid=0,
           row-5/colfam2:col-10/1427274739384/Put/vlen=8/seqid=0,
           row-5/colfam2:col-2/1427274739368/Put/vlen=7/seqid=0}

Since the filter asks for columns, or in other words column qualifiers, with a value of col-2 or less, you can see how col-1 and col-10 are also included, since the comparison—once again—is done lexicographically (means binary).

ValueFilter

This filter makes it possible to include only columns that have a specific value. Combined with the RegexStringComparator, for example, this can filter using powerful expression syntax. Example 4-4 showcases this feature. Note, though, that with certain comparators—as explained earlier—you can only employ a subset of the operators. Here a substring match is performed and this must be combined with an EQUAL, or NOT_EQUAL, operator.

Example 4-4. Example using the value based filter
    Filter filter = new ValueFilter(CompareFilter.CompareOp.EQUAL, 1
      new SubstringComparator(".4"));

    Scan scan = new Scan();
    scan.setFilter(filter); 2
    ResultScanner scanner = table.getScanner(scan);
    for (Result result : scanner) {
      for (Cell cell : result.rawCells()) {
        System.out.println("Cell: " + cell + ", Value: " + 3
          Bytes.toString(cell.getValueArray(), cell.getValueOffset(),
            cell.getValueLength()));
      }
    }
    scanner.close();

    Get get = new Get(Bytes.toBytes("row-5"));
    get.setFilter(filter); 4
    Result result = table.get(get);
    for (Cell cell : result.rawCells()) {
      System.out.println("Cell: " + cell + ", Value: " +
        Bytes.toString(cell.getValueArray(), cell.getValueOffset(),
          cell.getValueLength()));
    }
1

Create filter, while specifying the comparison operator and comparator.

2

Set filter for the scan.

3

Print out value to check that filter works.

4

Assign same filter to Get instance.

The output, confirming the proper functionality:

Adding rows to table...
Results of scan:
Cell: row-1/colfam1:col-4/1427275408429/Put/vlen=7/seqid=0, Value: val-1.4
Cell: row-1/colfam2:col-4/1427275408429/Put/vlen=7/seqid=0, Value: val-1.4
...
Cell: row-9/colfam1:col-4/1427275408605/Put/vlen=7/seqid=0, Value: val-9.4
Cell: row-9/colfam2:col-4/1427275408605/Put/vlen=7/seqid=0, Value: val-9.4

Result of get:
Cell: row-5/colfam1:col-4/1427275408527/Put/vlen=7/seqid=0, Value: val-5.4
Cell: row-5/colfam2:col-4/1427275408527/Put/vlen=7/seqid=0, Value: val-5.4

The example’s wiring code (hidden, see the online repository again) set the value to row key + “.” + column number. The rows and columns start at 1. The filter is instructed to retrieve all cells that have a value containing .4--aiming at the fourth column. And indeed, we see that only column col-4 is returned.

DependentColumnFilter

Here you have a more complex filter that does not simply filter out data based on directly available information. Rather, it lets you specify a dependent column—or reference column—that controls how other columns are filtered. It uses the timestamp of the reference column and includes all other columns that have the same timestamp. Here are the constructors provided:

DependentColumnFilter(final byte[] family, final byte[] qualifier)
DependentColumnFilter(final byte[] family, final byte[] qualifier,
  final boolean dropDependentColumn)
DependentColumnFilter(final byte[] family, final byte[] qualifier,
  final boolean dropDependentColumn, final CompareOp valueCompareOp,
  final ByteArrayComparable valueComparator)

Since this class is based on CompareFilter, it also offers you to further select columns, but for this filter it does so based on their values. Think of it as a combination of a ValueFilter and a filter selecting on a reference timestamp. You can optionally hand in your own operator and comparator pair to enable this feature. The class provides constructors, though, that let you omit the operator and comparator and disable the value filtering, including all columns by default, that is, performing the timestamp filter based on the reference column only.

Example 4-5 shows the filter in use. You can see how the optional values can be handed in as well. The dropDependentColumn parameter is giving you additional control over how the reference column is handled: it is either included or dropped by the filter, setting this parameter to false or true, respectively.

Example 4-5. Example using a filter to include only specific column families
  private static void filter(boolean drop,
      CompareFilter.CompareOp operator,
      ByteArrayComparable comparator)
  throws IOException {
    Filter filter;
    if (comparator != null) {
      filter = new DependentColumnFilter(Bytes.toBytes("colfam1"), 1
        Bytes.toBytes("col-5"), drop, operator, comparator);
    } else {
      filter = new DependentColumnFilter(Bytes.toBytes("colfam1"),
        Bytes.toBytes("col-5"), drop);
    }

    Scan scan = new Scan();
    scan.setFilter(filter);
    // scan.setBatch(4); // cause an error
    ResultScanner scanner = table.getScanner(scan);
    for (Result result : scanner) {
      for (Cell cell : result.rawCells()) {
        System.out.println("Cell: " + cell + ", Value: " +
          Bytes.toString(cell.getValueArray(), cell.getValueOffset(),
            cell.getValueLength()));
      }
    }
    scanner.close();

    Get get = new Get(Bytes.toBytes("row-5"));
    get.setFilter(filter);
    Result result = table.get(get);
    for (Cell cell : result.rawCells()) {
      System.out.println("Cell: " + cell + ", Value: " +
        Bytes.toString(cell.getValueArray(), cell.getValueOffset(),
          cell.getValueLength()));
    }
  }

  public static void main(String[] args) throws IOException {
    filter(true, CompareFilter.CompareOp.NO_OP, null);
    filter(false, CompareFilter.CompareOp.NO_OP, null); 2
    filter(true, CompareFilter.CompareOp.EQUAL,
      new BinaryPrefixComparator(Bytes.toBytes("val-5")));
    filter(false, CompareFilter.CompareOp.EQUAL,
      new BinaryPrefixComparator(Bytes.toBytes("val-5")));
    filter(true, CompareFilter.CompareOp.EQUAL,
      new RegexStringComparator(".*\\.5"));
    filter(false, CompareFilter.CompareOp.EQUAL,
      new RegexStringComparator(".*\\.5"));
  }
1

Create the filter with various options.

2

Call filter method with various options.

Caution

This filter is not compatible with the batch feature of the scan operations, that is, setting Scan.setBatch() to a number larger than zero. The filter needs to see the entire row to do its work, and using batching will not carry the reference column timestamp over and would result in erroneous results.

If you try to enable the batch mode nevertheless, you will get an error:

Exception in thread "main" \
  org.apache.hadoop.hbase.filter.IncompatibleFilterException: \
  Cannot set batch on a scan using a filter that returns true for \
  filter.hasFilterRow
    at org.apache.hadoop.hbase.client.Scan.setBatch(Scan.java:464)
    ...

The example also proceeds slightly differently compared to the earlier filters, as it sets the version to the column number for a more reproducible result. The implicit timestamps that the servers use as the version could result in fluctuating results as you cannot guarantee them using the exact time, down to the millisecond.

The filter() method used is called with different parameter combinations, showing how using the built-in value filter and the drop flag is affecting the returned data set. Here is the output of the first two filter() calls:

Adding rows to table...
Results of scan:
Cell: row-1/colfam2:col-5/5/Put/vlen=7/seqid=0, Value: val-1.5
Cell: row-10/colfam2:col-5/5/Put/vlen=8/seqid=0, Value: val-10.5
...
Cell: row-8/colfam2:col-5/5/Put/vlen=7/seqid=0, Value: val-8.5
Cell: row-9/colfam2:col-5/5/Put/vlen=7/seqid=0, Value: val-9.5
Result of get:
Cell: row-5/colfam2:col-5/5/Put/vlen=7/seqid=0, Value: val-5.5

Results of scan:
Cell: row-1/colfam1:col-5/5/Put/vlen=7/seqid=0, Value: val-1.5
Cell: row-1/colfam2:col-5/5/Put/vlen=7/seqid=0, Value: val-1.5
Cell: row-9/colfam1:col-5/5/Put/vlen=7/seqid=0, Value: val-9.5
Cell: row-9/colfam2:col-5/5/Put/vlen=7/seqid=0, Value: val-9.5
Result of get:
Cell: row-5/colfam1:col-5/5/Put/vlen=7/seqid=0, Value: val-5.5
Cell: row-5/colfam2:col-5/5/Put/vlen=7/seqid=0, Value: val-5.5

The only difference between the two calls is setting dropDependentColumn to true and false respectively. In the first scan and get output you see the checked column in colfam1 being omitted, in other words dropped as expected, while in the second half of the output you see it included.

What is this filter good for you might wonder? It is used where applications require client-side timestamps (these could be epoch based, or based on some internal global counter) to track dependent updates. Say you insert some kind of transactional data, where across the row all fields that are updated, should form some dependent update. In this case the client could set all columns that are updated in one mutation to the same timestamp, and when later wanting to show the entity at a certain point in time, get (or scan) the row at that time. All modifications from earlier (or later, or exact) changes are then masked out (or included). See “Transactions” for libraries on top of HBase that make use of such as schema.

Dedicated Filters

The second type of supplied filters are based directly on FilterBase and implement more specific use cases. Many of these filters are only really applicable when performing scan operations, since they filter out entire rows. For get() calls, this is often too restrictive and would result in a very harsh filter approach: include the whole row or nothing at all.

PrefixFilter

Given a row prefix, specified when you instantiate the filter instance, all rows with a row key matching this prefix are returned to the client. The constructor is:

PrefixFilter(final byte[] prefix)

Example 4-6 has this applied to the usual test data set.

Example 4-6. Example using the prefix based filter
    Filter filter = new PrefixFilter(Bytes.toBytes("row-1"));

    Scan scan = new Scan();
    scan.setFilter(filter);
    ResultScanner scanner = table.getScanner(scan);
    for (Result result : scanner) {
      for (Cell cell : result.rawCells()) {
        System.out.println("Cell: " + cell + ", Value: " +
          Bytes.toString(cell.getValueArray(), cell.getValueOffset(),
            cell.getValueLength()));
      }
    }
    scanner.close();

    Get get = new Get(Bytes.toBytes("row-5"));
    get.setFilter(filter);
    Result result = table.get(get);
    for (Cell cell : result.rawCells()) {
      System.out.println("Cell: " + cell + ", Value: " +
        Bytes.toString(cell.getValueArray(), cell.getValueOffset(),
          cell.getValueLength()));
    }

The output:

Results of scan:
Cell: row-1/colfam1:col-1/1427280142327/Put/vlen=7/seqid=0, Value: val-1.1
Cell: row-1/colfam1:col-10/1427280142379/Put/vlen=8/seqid=0, Value: val-1.10
...
Cell: row-1/colfam2:col-8/1427280142375/Put/vlen=7/seqid=0, Value: val-1.8
Cell: row-1/colfam2:col-9/1427280142377/Put/vlen=7/seqid=0, Value: val-1.9
Cell: row-10/colfam1:col-1/1427280142530/Put/vlen=8/seqid=0, Value: val-10.1
Cell: row-10/colfam1:col-10/1427280142546/Put/vlen=9/seqid=0, Value: val-10.10
...
Cell: row-10/colfam2:col-8/1427280142542/Put/vlen=8/seqid=0, Value: val-10.8
Cell: row-10/colfam2:col-9/1427280142544/Put/vlen=8/seqid=0, Value: val-10.9

Result of get:

It is interesting to see how the get() call fails to return anything, because it is asking for a row that does not match the filter prefix. This filter does not make much sense when doing get() calls but is highly useful for scan operations. The scan also is actively ended when the filter encounters a row key that is larger than the prefix. In this way, and combining this with a start row, for example, the filter is improving the overall performance of the scan as it has knowledge of when to skip the rest of the rows altogether.

PageFilter

You paginate through rows by employing this filter. When you create the instance, you specify a pageSize parameter, which controls how many rows per page should be returned.

PageFilter(final long pageSize)
Note

There is a fundamental issue with filtering on physically separate servers. Filters run on different region servers in parallel and cannot retain or communicate their current state across those boundaries. Thus, each filter is required to scan at least up to pageCount rows before ending the scan. This means a slight inefficiency is given for the PageFilter as more rows are reported to the client than necessary. The final consolidation on the client obviously has visibility into all results and can reduce what is accessible through the API accordingly.

The client code would need to remember the last row that was returned, and then, when another iteration is about to start, set the start row of the scan accordingly, while retaining the same filter properties.

Because pagination is setting a strict limit on the number of rows to be returned, it is possible for the filter to early out the entire scan, once the limit is reached or exceeded. Filters have a facility to indicate that fact and the region servers make use of this hint to stop any further processing.

Example 4-7 puts this together, showing how a client can reset the scan to a new start row on the subsequent iterations.

Example 4-7. Example using a filter to paginate through rows
  private static final byte[] POSTFIX = new byte[] { 0x00 };
    Filter filter = new PageFilter(15);

    int totalRows = 0;
    byte[] lastRow = null;
    while (true) {
      Scan scan = new Scan();
      scan.setFilter(filter);
      if (lastRow != null) {
        byte[] startRow = Bytes.add(lastRow, POSTFIX);
        System.out.println("start row: " +
          Bytes.toStringBinary(startRow));
        scan.setStartRow(startRow);
      }
      ResultScanner scanner = table.getScanner(scan);
      int localRows = 0;
      Result result;
      while ((result = scanner.next()) != null) {
        System.out.println(localRows++ + ": " + result);
        totalRows++;
        lastRow = result.getRow();
      }
      scanner.close();
      if (localRows == 0) break;
    }
    System.out.println("total rows: " + totalRows);

The abbreviated output:

Adding rows to table...
0: keyvalues={row-1/colfam1:col-1/1427280402935/Put/vlen=7/seqid=0, ...}
1: keyvalues={row-10/colfam1:col-1/1427280403125/Put/vlen=8/seqid=0, ...}
...
14: keyvalues={row-110/colfam1:col-1/1427280404601/Put/vlen=9/seqid=0, ...}
start row: row-110\x00
0: keyvalues={row-111/colfam1:col-1/1427280404615/Put/vlen=9/seqid=0, ...}
1: keyvalues={row-112/colfam1:col-1/1427280404628/Put/vlen=9/seqid=0, ...}
...
14: keyvalues={row-124/colfam1:col-1/1427280404786/Put/vlen=9/seqid=0, ...}
start row: row-124\x00
0: keyvalues={row-125/colfam1:col-1/1427280404799/Put/vlen=9/seqid=0, ...}
...
start row: row-999\x00
total rows: 1000

Because of the lexicographical sorting of the row keys by HBase and the comparison taking care of finding the row keys in order, and the fact that the start key on a scan is always inclusive, you need to add an extra zero byte to the previous key. This will ensure that the last seen row key is skipped and the next, in sorting order, is found. The zero byte is the smallest increment, and therefore is safe to use when resetting the scan boundaries. Even if there were a row that would match the previous plus the extra zero byte, the scan would be correctly doing the next iteration—because the start key is inclusive.

KeyOnlyFilter

Some applications need to access just the keys of each Cell, while omitting the actual data. The KeyOnlyFilter provides this functionality by applying the filter’s ability to modify the processed columns and cells, as they pass through. It does so by applying some logic that converts the current cell, stripping out the data part. The constructors of the filter are:

KeyOnlyFilter()
KeyOnlyFilter(boolean lenAsVal)

There is an optional boolean parameter, named lenAsVal. It is handed to the internal conversion call as-is, controlling what happens to the value part of each Cell instance processed. The default value of false simply sets the value to zero length, while the opposite true sets the value to the number representing the length of the original value. The latter may be useful to your application when quickly iterating over columns, where the keys already convey meaning and the length can be used to perform a secondary sort. “Client API: Best Practices” has an example.

Example 4-8 tests this filter with both constructors, creating random rows, columns, and values.

Example 4-8. Only returns the first found cell from each row
    int rowCount = 0;
    for (Result result : scanner) {
      for (Cell cell : result.rawCells()) {
        System.out.println("Cell: " + cell + ", Value: " + (
          cell.getValueLength() > 0 ?
            Bytes.toInt(cell.getValueArray(), cell.getValueOffset(),
              cell.getValueLength()) : "n/a" ));
      }
      rowCount++;
    }
    System.out.println("Total num of rows: " + rowCount);
    scanner.close();
  }

  public static void main(String[] args) throws IOException {
    Configuration conf = HBaseConfiguration.create();

    HBaseHelper helper = HBaseHelper.getHelper(conf);
    helper.dropTable("testtable");
    helper.createTable("testtable", "colfam1");
    System.out.println("Adding rows to table...");
    helper.fillTableRandom("testtable", /* row */ 1, 5, 0,
       /* col */ 1, 30, 0,  /* val */ 0, 10000, 0, true, "colfam1");

    Connection connection = ConnectionFactory.createConnection(conf);
    table = connection.getTable(TableName.valueOf("testtable"));
    System.out.println("Scan #1");
    Filter filter1 = new KeyOnlyFilter();
    scan(filter1);
    Filter filter2 = new KeyOnlyFilter(true);
    scan(filter2);

The abbreviated output will be similar to the following:

Adding rows to table...
Results of scan:
Cell: row-0/colfam1:col-17/6/Put/vlen=0/seqid=0, Value: n/a
Cell: row-0/colfam1:col-27/3/Put/vlen=0/seqid=0, Value: n/a
...
Cell: row-4/colfam1:col-3/2/Put/vlen=0/seqid=0, Value: n/a
Cell: row-4/colfam1:col-5/16/Put/vlen=0/seqid=0, Value: n/a
Total num of rows: 5

Scan #2
Results of scan:
Cell: row-0/colfam1:col-17/6/Put/vlen=4/seqid=0, Value: 8
Cell: row-0/colfam1:col-27/3/Put/vlen=4/seqid=0, Value: 6
...
Cell: row-4/colfam1:col-3/2/Put/vlen=4/seqid=0, Value: 7
Cell: row-4/colfam1:col-5/16/Put/vlen=4/seqid=0, Value: 8
Total num of rows: 5

The highlighted parts show how first the value is simply dropped and the value length is set to zero. The second, setting lenAsVal explicitly to true see a different result. The value length of 4 is attributed to the length of the payload, an integer of four bytes. The value is the random length of old value, here values between 5 and 9 (the fixed prefix val- plus a number between 0 and 10,000).

FirstKeyOnlyFilter

Caution

Even if the name implies KeyValue, or key only, this is both a misnomer. The filter returns the first cell it finds in a row, and does so with all its details, including the value. It should be named FirstCellFilter, for example.

If you need to access the first column—as sorted implicitly by HBase—in each row, this filter will provide this feature. Typically this is used by row counter type applications that only need to check if a row exists. Recall that in column-oriented databases a row really is composed of columns, and if there are none, the row ceases to exist.

Another possible use case is relying on the column sorting in lexicographical order, and setting the column qualifier to an epoch value. This would sort the column with the oldest timestamp name as the first to be retrieved. Combined with this filter, it is possible to retrieve the oldest column from every row using a single scan. More interestingly, though, is when you reverse the timestamp set as the column qualifier, and therefore retrieve the newest entry in a row in a single scan.

This class makes use of another optimization feature provided by the filter framework: it indicates to the region server applying the filter that the current row is done and that it should skip to the next one. This improves the overall performance of the scan, compared to a full table scan. The gain is more prominent in schemas with very wide rows, in other words, where you can skip many columns to reach the next row. If you only have one column per row, there will be no gain at all, obviously.

Example 4-9 has a simple example, using random rows, columns, and values, so your output will vary.

Example 4-9. Only returns the first found cell from each row
    Filter filter = new FirstKeyOnlyFilter();

    Scan scan = new Scan();
    scan.setFilter(filter);
    ResultScanner scanner = table.getScanner(scan);
    int rowCount = 0;
    for (Result result : scanner) {
      for (Cell cell : result.rawCells()) {
        System.out.println("Cell: " + cell + ", Value: " +
          Bytes.toString(cell.getValueArray(), cell.getValueOffset(),
            cell.getValueLength()));
      }
      rowCount++;
    }
    System.out.println("Total num of rows: " + rowCount);
    scanner.close();

The abbreviated output, showing that only one cell is returned per row, confirming the filter’s purpose:

Adding rows to table...
Results of scan:
Cell: row-0/colfam1:col-10/19/Put/vlen=6/seqid=0, Value: val-76
Cell: row-1/colfam1:col-0/0/Put/vlen=6/seqid=0, Value: val-19
...
Cell: row-8/colfam1:col-10/4/Put/vlen=6/seqid=0, Value: val-35
Cell: row-9/colfam1:col-1/5/Put/vlen=5/seqid=0, Value: val-0
Total num of rows: 30

FirstKeyValueMatchingQualifiersFilter

This filter is an extension to the FirstKeyOnlyFilter, but instead of returning the first found cell, it instead returns all the columns of a row, up to a given column qualifier. If the row has no such qualifier, all columns are returned. The filter is mainly used in the rowcounter shell command, to count all rows in HBase using a distributed process.

The constructor of the filter class looks like this:

FirstKeyValueMatchingQualifiersFilter(Set<byte[]> qualifiers)

Example 4-10 sets up a filter with two columns to match. It also loads the test table with random data, so your output will most certainly vary.

Example 4-10. Returns all columns, or up to the first found reference qualifier, for each row
    Set<byte[]> quals = new HashSet<byte[]>();
    quals.add(Bytes.toBytes("col-2"));
    quals.add(Bytes.toBytes("col-4"));
    quals.add(Bytes.toBytes("col-6"));
    quals.add(Bytes.toBytes("col-8"));
    Filter filter = new FirstKeyValueMatchingQualifiersFilter(quals);

    Scan scan = new Scan();
    scan.setFilter(filter);
    ResultScanner scanner = table.getScanner(scan);
    int rowCount = 0;
    for (Result result : scanner) {
      for (Cell cell : result.rawCells()) {
        System.out.println("Cell: " + cell + ", Value: " +
          Bytes.toString(cell.getValueArray(), cell.getValueOffset(),
            cell.getValueLength()));
      }
      rowCount++;
    }
    System.out.println("Total num of rows: " + rowCount);
    scanner.close();

Here is the output on the console in an abbreviated form for one execution:

Adding rows to table...
Results of scan:
Cell: row-0/colfam1:col-0/1/Put/vlen=6/seqid=0, Value: val-48
Cell: row-0/colfam1:col-1/4/Put/vlen=6/seqid=0, Value: val-78
Cell: row-0/colfam1:col-5/1/Put/vlen=6/seqid=0, Value: val-62
Cell: row-0/colfam1:col-6/6/Put/vlen=5/seqid=0, Value: val-6
Cell: row-10/colfam1:col-1/3/Put/vlen=6/seqid=0, Value: val-73
Cell: row-10/colfam1:col-6/5/Put/vlen=6/seqid=0, Value: val-11
...
Cell: row-6/colfam1:col-1/0/Put/vlen=6/seqid=0, Value: val-39
Cell: row-7/colfam1:col-9/6/Put/vlen=6/seqid=0, Value: val-57
Cell: row-8/colfam1:col-0/2/Put/vlen=6/seqid=0, Value: val-90
Cell: row-8/colfam1:col-1/4/Put/vlen=6/seqid=0, Value: val-92
Cell: row-8/colfam1:col-6/4/Put/vlen=6/seqid=0, Value: val-12
Cell: row-9/colfam1:col-1/5/Put/vlen=6/seqid=0, Value: val-35
Cell: row-9/colfam1:col-2/2/Put/vlen=6/seqid=0, Value: val-22
Total num of rows: 47

Depending on the random data generated we see more or less cells emitted per row. The filter is instructed to stop emitting cells when encountering one of the columns col-2, col-4, col-6, or col-8. For row-0 this is visible, as it had one more column, named col-7, which is omitted. row-7 has only one cell, and no matching qualifier, hence it is included completely.

InclusiveStopFilter

The row boundaries of a scan are inclusive for the start row, yet exclusive for the stop row. You can overcome the stop row semantics using this filter, which includes the specified stop row. Example 4-11 uses the filter to start at row-3, and stop at row-5 inclusively.

Example 4-11. Example using a filter to include a stop row
    Filter filter = new InclusiveStopFilter(Bytes.toBytes("row-5"));

    Scan scan = new Scan();
    scan.setStartRow(Bytes.toBytes("row-3"));
    scan.setFilter(filter);
    ResultScanner scanner = table.getScanner(scan);
    for (Result result : scanner) {
      System.out.println(result);
    }
    scanner.close();

The output on the console, when running the example code, confirms that the filter works as advertised:

Adding rows to table...
Results of scan:
keyvalues={row-3/colfam1:col-1/1427282689001/Put/vlen=7/seqid=0}
keyvalues={row-30/colfam1:col-1/1427282689069/Put/vlen=8/seqid=0}
...
keyvalues={row-48/colfam1:col-1/1427282689100/Put/vlen=8/seqid=0}
keyvalues={row-49/colfam1:col-1/1427282689102/Put/vlen=8/seqid=0}
keyvalues={row-5/colfam1:col-1/1427282689004/Put/vlen=7/seqid=0}

FuzzyRowFilter

This filter acts on row keys, but in a fuzzy manner. It needs a list of row keys that should be returned, plus an accompanying byte[] array that signifies the importance of each byte in the row key. The constructor is as such:

FuzzyRowFilter(List<Pair<byte[], byte[]>> fuzzyKeysData)

The fuzzyKeysData specifies the mentioned significance of a row key byte, by taking one of two values:

0

Indicates that the byte at the same position in the row key must match as-is.

1

Means that the corresponding row key byte does not matter and is always accepted.

An advantage of this filter is that it can likely compute the next matching row key when it comes to an end of a matching one. It implements the getNextCellHint() method to help the servers in fast-forwarding to the next range of rows that might match. This speeds up scanning, especially when the skipped ranges are quite large. Example 4-12 uses the filter to grab specific rows from a test data set.

Example 4-12. Example filtering by column prefix
    List<Pair<byte[], byte[]>> keys = new ArrayList<Pair<byte[], byte[]>>();
    keys.add(new Pair<byte[], byte[]>(
      Bytes.toBytes("row-?5"), new byte[] { 0, 0, 0, 0, 1, 0 }));
    Filter filter = new FuzzyRowFilter(keys);

    Scan scan = new Scan()
      .addColumn(Bytes.toBytes("colfam1"), Bytes.toBytes("col-5"))
      .setFilter(filter);
    ResultScanner scanner = table.getScanner(scan);
    for (Result result : scanner) {
      System.out.println(result);
    }
    scanner.close();

The example code also adds a filtering column to the scan, just to keep the output short:

Adding rows to table...
Results of scan:
keyvalues={row-05/colfam1:col-01/1/Put/vlen=9/seqid=0,
           row-05/colfam1:col-02/2/Put/vlen=9/seqid=0,
           ...
           row-05/colfam1:col-09/9/Put/vlen=9/seqid=0,
           row-05/colfam1:col-10/10/Put/vlen=9/seqid=0}
keyvalues={row-15/colfam1:col-01/1/Put/vlen=9/seqid=0,
           row-15/colfam1:col-02/2/Put/vlen=9/seqid=0,
           ...
           row-15/colfam1:col-09/9/Put/vlen=9/seqid=0,
           row-15/colfam1:col-10/10/Put/vlen=9/seqid=0}

The test code wiring adds 20 rows to the table, named row-01 to row-20. We want to retrieve all the rows that match the pattern row-?5, in other words all rows that end in the number 5. The output above confirms the correct result.

ColumnCountGetFilter

You can use this filter to only retrieve a specific maximum number of columns per row. You can set the number using the constructor of the filter:

ColumnCountGetFilter(final int n)

Since this filter stops the entire scan once a row has been found that matches the maximum number of columns configured, it is not useful for scan operations, and in fact, it was written to test filters in get() calls.

ColumnPaginationFilter

Tip

This filter’s functionality is superseded by the slicing functionality explained in “Slicing Rows”, and provided by the setMaxResultsPerColumnFamily() and setRowOffsetPerColumnFamily() methods of Scan, and Get.

Similar to the PageFilter, this one can be used to page through columns in a row. Its constructor has two parameters:

ColumnPaginationFilter(final int limit, final int offset)

It skips all columns up to the number given as offset, and then includes limit columns afterward. Example 4-13 has this applied to a normal scan.

Example 4-13. Example paginating through columns in a row
    Filter filter = new ColumnPaginationFilter(5, 15);

    Scan scan = new Scan();
    scan.setFilter(filter);
    ResultScanner scanner = table.getScanner(scan);
    for (Result result : scanner) {
      System.out.println(result);
    }
    scanner.close();

Running this example should render the following output:

Adding rows to table...
Results of scan:
keyvalues={row-01/colfam1:col-16/16/Put/vlen=9/seqid=0,
           row-01/colfam1:col-17/17/Put/vlen=9/seqid=0,
           row-01/colfam1:col-18/18/Put/vlen=9/seqid=0,
           row-01/colfam1:col-19/19/Put/vlen=9/seqid=0,
           row-01/colfam1:col-20/20/Put/vlen=9/seqid=0}
keyvalues={row-02/colfam1:col-16/16/Put/vlen=9/seqid=0,
           row-02/colfam1:col-17/17/Put/vlen=9/seqid=0,
           row-02/colfam1:col-18/18/Put/vlen=9/seqid=0,
           row-02/colfam1:col-19/19/Put/vlen=9/seqid=0,
           row-02/colfam1:col-20/20/Put/vlen=9/seqid=0}
...
Note

This example slightly changes the way the rows and columns are numbered by adding a padding to the numeric counters. For example, the first row is padded to be row-01. This also shows how padding can be used to get a more human-readable style of sorting, for example—as known from dictionaries or telephone books.

The result includes all 10 rows, starting each row at column 16 (offset = 15) and printing five columns (limit = 5). As a side note, this filter does not suffer from the issues explained in “PageFilter”, in other words, although it is distributed and not synchronized across filter instances, there are no inefficiencies incurred by reading too many columns or rows. This is because a row is contained in a single region, and no overlap to another region is required to complete the filtering task.

ColumnPrefixFilter

Analog to the PrefixFilter, which worked by filtering on row key prefixes, this filter does the same for columns. You specify a prefix when creating the filter:

ColumnPrefixFilter(final byte[] prefix)

All columns that have the given prefix are then included in the result. Example 4-14 selects all columns starting with col-1. Here we drop the padding again, to get binary sorted column names.

Example 4-14. Example filtering by column prefix
    Filter filter = new ColumnPrefixFilter(Bytes.toBytes("col-1"));

    Scan scan = new Scan();
    scan.setFilter(filter);
    ResultScanner scanner = table.getScanner(scan);
    for (Result result : scanner) {
      System.out.println(result);
    }
    scanner.close();

The result of running this example should show the filter doing its job as advertised:

Adding rows to table...
Results of scan:
keyvalues={row-1/colfam1:col-1/1/Put/vlen=7/seqid=0,
           row-1/colfam1:col-10/10/Put/vlen=8/seqid=0,
           ...
           row-1/colfam1:col-19/19/Put/vlen=8/seqid=0}
...

MultipleColumnPrefixFilter

This filter is a straight extension to the ColumnPrefixFilter, allowing the application to ask for a list of column qualifier prefixes, not just a single one. The constructor and use is also straight forward:

MultipleColumnPrefixFilter(final byte[][] prefixes)

The code in Example 4-15 adds two column prefixes, and also a row prefix to limit the output.

Example 4-15. Example filtering by column prefix
    Filter filter = new MultipleColumnPrefixFilter(new byte[][] {
      Bytes.toBytes("col-1"), Bytes.toBytes("col-2")
    });

    Scan scan = new Scan()
      .setRowPrefixFilter(Bytes.toBytes("row-1")) 1
      .setFilter(filter);
    ResultScanner scanner = table.getScanner(scan);
    for (Result result : scanner) {
      System.out.print(Bytes.toString(result.getRow()) + ": ");
      for (Cell cell : result.rawCells()) {
        System.out.print(Bytes.toString(cell.getQualifierArray(),
          cell.getQualifierOffset(), cell.getQualifierLength()) + ", ");
      }
      System.out.println();
    }
    scanner.close();
1

Limit to rows starting with a specific prefix.

The following shows what is emitted on the console (abbreviated), note how the code also prints out only the row key and column qualifiers, just to show another way of accessing the data:

Adding rows to table...
Results of scan:
row-1: col-1, col-10, col-11, col-12, col-13, col-14, col-15, col-16,
  col-17, col-18, col-19, col-2, col-20, col-21, col-22, col-23, col-24,
  col-25, col-26, col-27, col-28, col-29,
row-10: col-1, col-10, col-11, col-12, col-13, col-14, col-15, col-16,
  col-17, col-18, col-19, col-2, col-20, col-21, col-22, col-23, col-24,
  col-25, col-26, col-27, col-28, col-29,
row-18: col-1, col-10, col-11, col-12, col-13, col-14, col-15, col-16,
  col-17, col-18, col-19, col-2, col-20, col-21, col-22, col-23, col-24,
  col-25, col-26, col-27, col-28, col-29,
row-19: col-1, col-10, col-11, col-12, col-13, col-14, col-15, col-16,
  col-17, col-18, col-19, col-2, col-20, col-21, col-22, col-23, col-24,
  col-25, col-26, col-27, col-28, col-29,

ColumnRangeFilter

This filter acts like two QualifierFilter instances working together, with one checking the lower boundary, and the other doing the same for the upper. Both would have to use the provided BinaryPrefixComparator with a compare operator of LESS_OR_EQUAL, and GREATER_OR_EQUAL respectively. Since all of this is error-prone and extra work, you can just use the ColumnRangeFilter and be done. Here the constructor of the filter:

ColumnRangeFilter(final byte[] minColumn, boolean minColumnInclusive,
  final byte[] maxColumn, boolean maxColumnInclusive)

You have to provide an optional minimum and maximum column qualifier, and accompanying boolean flags if these are exclusive or inclusive. If you do not specify minimum column, then the start of table is used. Same for the maximum column, if not provided the end of the table is assumed. Example 4-16 shows an example using these parameters.

Example 4-16. Example filtering by columns within a given range
    Filter filter = new ColumnRangeFilter(Bytes.toBytes("col-05"), true,
      Bytes.toBytes("col-11"), false);

    Scan scan = new Scan()
      .setStartRow(Bytes.toBytes("row-03"))
      .setStopRow(Bytes.toBytes("row-05"))
      .setFilter(filter);
    ResultScanner scanner = table.getScanner(scan);
    for (Result result : scanner) {
      System.out.println(result);
    }
    scanner.close();

The output is as follows:

Adding rows to table...
Results of scan:
keyvalues={row-03/colfam1:col-05/5/Put/vlen=9/seqid=0,
           row-03/colfam1:col-06/6/Put/vlen=9/seqid=0,
           row-03/colfam1:col-07/7/Put/vlen=9/seqid=0,
           row-03/colfam1:col-08/8/Put/vlen=9/seqid=0,
           row-03/colfam1:col-09/9/Put/vlen=9/seqid=0,
           row-03/colfam1:col-10/10/Put/vlen=9/seqid=0}
keyvalues={row-04/colfam1:col-05/5/Put/vlen=9/seqid=0,
           row-04/colfam1:col-06/6/Put/vlen=9/seqid=0,
           row-04/colfam1:col-07/7/Put/vlen=9/seqid=0,
           row-04/colfam1:col-08/8/Put/vlen=9/seqid=0,
           row-04/colfam1:col-09/9/Put/vlen=9/seqid=0,
           row-04/colfam1:col-10/10/Put/vlen=9/seqid=0}

In this example you can see the use of the fluent interface again to set up the scan instance. It also limits the number of rows scanned (just because).

SingleColumnValueFilter

You can use this filter when you have exactly one column that decides if an entire row should be returned or not. You need to first specify the column you want to track, and then some value to check against. The constructors offered are:

SingleColumnValueFilter(final byte[] family, final byte[] qualifier,
  final CompareOp compareOp, final byte[] value)
SingleColumnValueFilter(final byte[] family, final byte[] qualifier,
    final CompareOp compareOp, final ByteArrayComparable comparator)
protected SingleColumnValueFilter(final byte[] family, final byte[] qualifier,
  final CompareOp compareOp, ByteArrayComparable comparator,
  final boolean filterIfMissing, final boolean latestVersionOnly)

The first one is a convenience function as it simply creates a BinaryComparator instance internally on your behalf. The second takes the same parameters we used for the CompareFilter-based classes. Although the SingleColumnValueFilter does not inherit from the CompareFilter directly, it still uses the same parameter types. The third, and final constructor, adds two additional boolean flags, which, alternatively, can be set with getter and setter methods after the filter has been constructed:

boolean getFilterIfMissing()
void setFilterIfMissing(boolean filterIfMissing)
boolean getLatestVersionOnly()
void setLatestVersionOnly(boolean latestVersionOnly)

The former controls what happens to rows that do not have the column at all. By default, they are included in the result, but you can use setFilterIfMissing(true) to reverse that behavior, that is, all rows that do not have the reference column are dropped from the result.

Note

You must include the column you want to filter by, in other words, the reference column, into the families you query for—using addColumn(), for example. If you fail to do so, the column is considered missing and the result is either empty, or contains all rows, based on the getFilterIfMissing() result.

By using setLatestVersionOnly(false)--the default is true--you can change the default behavior of the filter, which is only to check the newest version of the reference column, to instead include previous versions in the check as well. Example 4-17 combines these features to select a specific set of rows only.

Example 4-17. Example using a filter to return only rows with a given value in a given column
    SingleColumnValueFilter filter = new SingleColumnValueFilter(
      Bytes.toBytes("colfam1"),
      Bytes.toBytes("col-5"),
      CompareFilter.CompareOp.NOT_EQUAL,
      new SubstringComparator("val-5"));
    filter.setFilterIfMissing(true);

    Scan scan = new Scan();
    scan.setFilter(filter);
    ResultScanner scanner = table.getScanner(scan);
    for (Result result : scanner) {
      for (Cell cell : result.rawCells()) {
        System.out.println("Cell: " + cell + ", Value: " +
          Bytes.toString(cell.getValueArray(), cell.getValueOffset(),
            cell.getValueLength()));
      }
    }
    scanner.close();

    Get get = new Get(Bytes.toBytes("row-6"));
    get.setFilter(filter);
    Result result = table.get(get);
    System.out.println("Result of get: ");
    for (Cell cell : result.rawCells()) {
      System.out.println("Cell: " + cell + ", Value: " +
        Bytes.toString(cell.getValueArray(), cell.getValueOffset(),
          cell.getValueLength()));
    }

The output shows how the scan is filtering out all columns from row-5, since their value starts with val-5. We are asking the filter to do a substring match on val-5 and use the NOT_EQUAL comparator to include all other matching rows:

Adding rows to table...
Results of scan:
Cell: row-1/colfam1:col-1/1427279447557/Put/vlen=7/seqid=0, Value: val-1.1
Cell: row-1/colfam1:col-10/1427279447613/Put/vlen=8/seqid=0, Value: val-1.10
...
Cell: row-4/colfam2:col-8/1427279447667/Put/vlen=7/seqid=0, Value: val-4.8
Cell: row-4/colfam2:col-9/1427279447669/Put/vlen=7/seqid=0, Value: val-4.9
Cell: row-6/colfam1:col-1/1427279447692/Put/vlen=7/seqid=0, Value: val-6.1
Cell: row-6/colfam1:col-10/1427279447709/Put/vlen=8/seqid=0, Value: val-6.10
...
Cell: row-9/colfam2:col-8/1427279447759/Put/vlen=7/seqid=0, Value: val-9.8
Cell: row-9/colfam2:col-9/1427279447761/Put/vlen=7/seqid=0, Value: val-9.9
Result of get:
Cell: row-6/colfam1:col-1/1427279447692/Put/vlen=7/seqid=0, Value: val-6.1
Cell: row-6/colfam1:col-10/1427279447709/Put/vlen=8/seqid=0, Value: val-6.10
...
Cell: row-6/colfam2:col-8/1427279447705/Put/vlen=7/seqid=0, Value: val-6.8
Cell: row-6/colfam2:col-9/1427279447707/Put/vlen=7/seqid=0, Value: val-6.9

SingleColumnValueExcludeFilter

The SingleColumnValueFilter we just discussed is extended in this class to provide slightly different semantics: the reference column, as handed into the constructor, is omitted from the result. In other words, you have the same features, constructors, and methods to control how this filter works. The only difference is that you will never get the column you are checking against as part of the Result instance(s) on the client side.

TimestampsFilter

When you need fine-grained control over what versions are included in the scan result, this filter provides the means. You have to hand in a List of timestamps:

TimestampsFilter(List<Long> timestamps)
Note

As you have seen throughout the book so far, a version is a specific value of a column at a unique point in time, denoted with a timestamp. When the filter is asking for a list of timestamps, it will attempt to retrieve the column versions with the matching timestamps.

Example 4-18 sets up a filter with three timestamps and adds a time range to the second scan.

Example 4-18. Example filtering data by timestamps
    List<Long> ts = new ArrayList<Long>();
    ts.add(new Long(5));
    ts.add(new Long(10)); 1
    ts.add(new Long(15));
    Filter filter = new TimestampsFilter(ts);

    Scan scan1 = new Scan();
    scan1.setFilter(filter); 2
    ResultScanner scanner1 = table.getScanner(scan1);
    for (Result result : scanner1) {
      System.out.println(result);
    }
    scanner1.close();

    Scan scan2 = new Scan();
    scan2.setFilter(filter);
    scan2.setTimeRange(8, 12); 3
    ResultScanner scanner2 = table.getScanner(scan2);
    for (Result result : scanner2) {
      System.out.println(result);
    }
    scanner2.close();
1

Add timestamps to the list.

2

Add the filter to an otherwise default Scan instance.

3

Also add a time range to verify how it affects the filter

Here is the output on the console in an abbreviated form:

Adding rows to table...
Results of scan #1:
keyvalues={row-1/colfam1:col-10/10/Put/vlen=8/seqid=0,
           row-1/colfam1:col-15/15/Put/vlen=8/seqid=0,
           row-1/colfam1:col-5/5/Put/vlen=7/seqid=0}
keyvalues={row-100/colfam1:col-10/10/Put/vlen=10/seqid=0,
           row-100/colfam1:col-15/15/Put/vlen=10/seqid=0,
           row-100/colfam1:col-5/5/Put/vlen=9/seqid=0}
...
keyvalues={row-99/colfam1:col-10/10/Put/vlen=9/seqid=0,
           row-99/colfam1:col-15/15/Put/vlen=9/seqid=0,
           row-99/colfam1:col-5/5/Put/vlen=8/seqid=0}

Results of scan #2:
keyvalues={row-1/colfam1:col-10/10/Put/vlen=8/seqid=0}
keyvalues={row-10/colfam1:col-10/10/Put/vlen=9/seqid=0}
...
keyvalues={row-98/colfam1:col-10/10/Put/vlen=9/seqid=0}
keyvalues={row-99/colfam1:col-10/10/Put/vlen=9/seqid=0}

The first scan, only using the filter, is outputting the column values for all three specified timestamps as expected. The second scan only returns the timestamp that fell into the time range specified when the scan was set up. Both time-based restrictions, the filter and the scanner time range, are doing their job and the result is a combination of both.

RandomRowFilter

Finally, there is a filter that shows what is also possible using the API: including random rows into the result. The constructor is given a parameter named chance, which represents a value between 0.0 and 1.0:

RandomRowFilter(float chance)

Internally, this class is using a Java Random.nextFloat() call to randomize the row inclusion, and then compares the value with the chance given. Giving it a negative chance value will make the filter exclude all rows, while a value larger than 1.0 will make it include all rows. Example 4-19 uses a chance of 50%, iterating three times over the scan:

Example 4-19. Example filtering rows randomly
    Filter filter = new RandomRowFilter(0.5f);

    for (int loop = 1; loop <= 3; loop++) {
      Scan scan = new Scan();
      scan.setFilter(filter);
      ResultScanner scanner = table.getScanner(scan);
      for (Result result : scanner) {
        System.out.println(Bytes.toString(result.getRow()));
      }
      scanner.close();
    }

The random results for one execution looked like:

Adding rows to table...
Results of scan for loop: 1
row-1
row-10
row-3
row-9
Results of scan for loop: 2
row-10
row-2
row-3
row-5
row-6
row-8
Results of scan for loop: 3
row-1
row-3
row-4
row-8
row-9

Your results will most certainly vary.

Decorating Filters

While the provided filters are already very powerful, sometimes it can be useful to modify, or extend, the behavior of a filter to gain additional control over the returned data. Some of this additional control is not dependent on the filter itself, but can be applied to any of them. This is what the decorating filter group of classes is about.

Note

Decorating filters implement the same Filter interface, just like any other single-purpose filter. In doing so, they can be used as a drop-in replacement for those filters, while combining their behavior with the wrapped filter instance.

SkipFilter

This filter wraps a given filter and extends it to exclude an entire row, when the wrapped filter hints for a Cell to be skipped. In other words, as soon as a filter indicates that a column in a row is omitted, the entire row is omitted.

Note

The wrapped filter must implement the filterKeyValue() method, or the SkipFilter will not work as expected.1 This is because the SkipFilter is only checking the results of that method to decide how to handle the current row. See Table 4-9 on page Table 4-9 for an overview of compatible filters.

Example 4-20 combines the SkipFilter with a ValueFilter to first select all columns that have no zero-valued column, and subsequently drops all other partial rows that do not have a matching value.

Example 4-20. Example of using a filter to skip entire rows based on another filter’s results
    Filter filter1 = new ValueFilter(CompareFilter.CompareOp.NOT_EQUAL,
      new BinaryComparator(Bytes.toBytes("val-0")));

    Scan scan = new Scan();
    scan.setFilter(filter1); 1
    ResultScanner scanner1 = table.getScanner(scan);
    for (Result result : scanner1) {
      for (Cell cell : result.rawCells()) {
        System.out.println("Cell: " + cell + ", Value: " +
          Bytes.toString(cell.getValueArray(), cell.getValueOffset(),
            cell.getValueLength()));
      }
    }
    scanner1.close();

    Filter filter2 = new SkipFilter(filter1);

    scan.setFilter(filter2); 2
    ResultScanner scanner2 = table.getScanner(scan);
    for (Result result : scanner2) {
      for (Cell cell : result.rawCells()) {
        System.out.println("Cell: " + cell + ", Value: " +
          Bytes.toString(cell.getValueArray(), cell.getValueOffset(),
            cell.getValueLength()));
      }
    }
    scanner2.close();
1

Only add the ValueFilter to the first scan.

2

Add the decorating skip filter for the second scan.

The example code should print roughly the following results when you execute it—note, though, that the values are randomized, so you should get a slightly different result for every invocation:

Adding rows to table...
Results of scan #1:
Cell: row-01/colfam1:col-01/1/Put/vlen=5/seqid=0, Value: val-4
Cell: row-01/colfam1:col-02/2/Put/vlen=5/seqid=0, Value: val-4
Cell: row-01/colfam1:col-03/3/Put/vlen=5/seqid=0, Value: val-1
Cell: row-01/colfam1:col-04/4/Put/vlen=5/seqid=0, Value: val-3
Cell: row-01/colfam1:col-05/5/Put/vlen=5/seqid=0, Value: val-1
Cell: row-02/colfam1:col-01/1/Put/vlen=5/seqid=0, Value: val-1
Cell: row-02/colfam1:col-03/3/Put/vlen=5/seqid=0, Value: val-2
Cell: row-02/colfam1:col-04/4/Put/vlen=5/seqid=0, Value: val-4
Cell: row-02/colfam1:col-05/5/Put/vlen=5/seqid=0, Value: val-2
...
Cell: row-30/colfam1:col-01/1/Put/vlen=5/seqid=0, Value: val-2
Cell: row-30/colfam1:col-02/2/Put/vlen=5/seqid=0, Value: val-4
Cell: row-30/colfam1:col-03/3/Put/vlen=5/seqid=0, Value: val-4
Cell: row-30/colfam1:col-05/5/Put/vlen=5/seqid=0, Value: val-4
Total cell count for scan #1: 124
Results of scan #2:
Cell: row-01/colfam1:col-01/1/Put/vlen=5/seqid=0, Value: val-4
Cell: row-01/colfam1:col-02/2/Put/vlen=5/seqid=0, Value: val-4
Cell: row-01/colfam1:col-03/3/Put/vlen=5/seqid=0, Value: val-1
Cell: row-01/colfam1:col-04/4/Put/vlen=5/seqid=0, Value: val-3
Cell: row-01/colfam1:col-05/5/Put/vlen=5/seqid=0, Value: val-1
Cell: row-06/colfam1:col-01/1/Put/vlen=5/seqid=0, Value: val-4
Cell: row-06/colfam1:col-02/2/Put/vlen=5/seqid=0, Value: val-4
Cell: row-06/colfam1:col-03/3/Put/vlen=5/seqid=0, Value: val-4
Cell: row-06/colfam1:col-04/4/Put/vlen=5/seqid=0, Value: val-3
Cell: row-06/colfam1:col-05/5/Put/vlen=5/seqid=0, Value: val-2
...
Cell: row-28/colfam1:col-01/1/Put/vlen=5/seqid=0, Value: val-2
Cell: row-28/colfam1:col-02/2/Put/vlen=5/seqid=0, Value: val-1
Cell: row-28/colfam1:col-03/3/Put/vlen=5/seqid=0, Value: val-2
Cell: row-28/colfam1:col-04/4/Put/vlen=5/seqid=0, Value: val-4
Cell: row-28/colfam1:col-05/5/Put/vlen=5/seqid=0, Value: val-2
Total cell count for scan #2: 55

The first scan returns all columns that are not zero valued. Since the value is assigned at random, there is a high probability that you will get at least one or more columns of each possible row. Some rows will miss a column—these are the omitted zero-valued ones.

The second scan, on the other hand, wraps the first filter and forces all partial rows to be dropped. You can see from the console output how only complete rows are emitted, that is, those with all five columns the example code creates initially. The total Cell count for each scan confirms the more restrictive behavior of the SkipFilter variant.

WhileMatchFilter

This second decorating filter type works somewhat similarly to the previous one, but aborts the entire scan once a piece of information is filtered. This works by checking the wrapped filter and seeing if it skips a row by its key, or a column of a row because of a Cell check.2

Example 4-21 is a slight variation of the previous example, using different filters to show how the decorating class works.

Example 4-21. Example of using a filter to skip entire rows based on another filter’s results
    Filter filter1 = new RowFilter(CompareFilter.CompareOp.NOT_EQUAL,
      new BinaryComparator(Bytes.toBytes("row-05")));

    Scan scan = new Scan();
    scan.setFilter(filter1);
    ResultScanner scanner1 = table.getScanner(scan);
    for (Result result : scanner1) {
      for (Cell cell : result.rawCells()) {
        System.out.println("Cell: " + cell + ", Value: " +
          Bytes.toString(cell.getValueArray(), cell.getValueOffset(),
            cell.getValueLength()));
      }
    }
    scanner1.close();

    Filter filter2 = new WhileMatchFilter(filter1);

    scan.setFilter(filter2);
    ResultScanner scanner2 = table.getScanner(scan);
    for (Result result : scanner2) {
      for (Cell cell : result.rawCells()) {
        System.out.println("Cell: " + cell + ", Value: " +
          Bytes.toString(cell.getValueArray(), cell.getValueOffset(),
            cell.getValueLength()));
      }
    }
    scanner2.close();

Once you run the example code, you should get this output on the console:

Adding rows to table...
Results of scan #1:
Cell: row-01/colfam1:col-01/1/Put/vlen=9/seqid=0, Value: val-01.01
Cell: row-02/colfam1:col-01/1/Put/vlen=9/seqid=0, Value: val-02.01
Cell: row-03/colfam1:col-01/1/Put/vlen=9/seqid=0, Value: val-03.01
Cell: row-04/colfam1:col-01/1/Put/vlen=9/seqid=0, Value: val-04.01
Cell: row-06/colfam1:col-01/1/Put/vlen=9/seqid=0, Value: val-06.01
Cell: row-07/colfam1:col-01/1/Put/vlen=9/seqid=0, Value: val-07.01
Cell: row-08/colfam1:col-01/1/Put/vlen=9/seqid=0, Value: val-08.01
Cell: row-09/colfam1:col-01/1/Put/vlen=9/seqid=0, Value: val-09.01
Cell: row-10/colfam1:col-01/1/Put/vlen=9/seqid=0, Value: val-10.01
Total cell count for scan #1: 9

Results of scan #2:
Cell: row-01/colfam1:col-01/1/Put/vlen=9/seqid=0, Value: val-01.01
Cell: row-02/colfam1:col-01/1/Put/vlen=9/seqid=0, Value: val-02.01
Cell: row-03/colfam1:col-01/1/Put/vlen=9/seqid=0, Value: val-03.01
Cell: row-04/colfam1:col-01/1/Put/vlen=9/seqid=0, Value: val-04.01
Total cell count for scan #2: 4

The first scan used just the RowFilter to skip one out of 10 rows; the rest is returned to the client. Adding the WhileMatchFilter for the second scan shows its behavior to stop the entire scan operation, once the wrapped filter omits a row or column. In the example this is row-05, triggering the end of the scan.

FilterList

So far you have seen how filters—on their own, or decorated—are doing the work of filtering out various dimensions of a table, ranging from rows, to columns, and all the way to versions of values within a column. In practice, though, you may want to have more than one filter being applied to reduce the data returned to your client application. This is what the FilterList is for.

Note

The FilterList class implements the same Filter interface, just like any other single-purpose filter. In doing so, it can be used as a drop-in replacement for those filters, while combining the effects of each included instance.

You can create an instance of FilterList while providing various parameters at instantiation time, using one of these constructors:

FilterList(final List<Filter> rowFilters)
FilterList(final Filter... rowFilters)
FilterList(final Operator operator)
FilterList(final Operator operator, final List<Filter> rowFilters)
FilterList(final Operator operator, final Filter... rowFilters)

The rowFilters parameter specifies the list of filters that are assessed together, using an operator to combine their results. Table 4-3 lists the possible choices of operators. The default is MUST_PASS_ALL, and can therefore be omitted from the constructor when you do not need a different one. Otherwise, there are two variants that take a List or filters, and another that does the same but uses the newer Java vararg construct (shorthand for manually creating an array).

Table 4-3. Possible values for the FilterList.Operator enumeration
Operator Description

MUST_PASS_ALL

A value is only included in the result when all filters agree to do so, i.e., no filter is omitting the value.

MUST_PASS_ONE

As soon as a value was allowed to pass one of the filters, it is included in the overall result.

Adding filters, after the FilterList instance has been created, can be done with:

void addFilter(Filter filter)

You can only specify one operator per FilterList, but you are free to add other FilterList instances to an existing FilterList, thus creating a hierarchy of filters, combined with the operators you need.

You can further control the execution order of the included filters by carefully choosing the List implementation you require. For example, using ArrayList would guarantee that the filters are applied in the order they were added to the list. This is shown in Example 4-22.

Example 4-22. Example of using a filter list to combine single purpose filters
    List<Filter> filters = new ArrayList<Filter>();

    Filter filter1 = new RowFilter(CompareFilter.CompareOp.GREATER_OR_EQUAL,
      new BinaryComparator(Bytes.toBytes("row-03")));
    filters.add(filter1);

    Filter filter2 = new RowFilter(CompareFilter.CompareOp.LESS_OR_EQUAL,
      new BinaryComparator(Bytes.toBytes("row-06")));
    filters.add(filter2);

    Filter filter3 = new QualifierFilter(CompareFilter.CompareOp.EQUAL,
      new RegexStringComparator("col-0[03]"));
    filters.add(filter3);

    FilterList filterList1 = new FilterList(filters);

    Scan scan = new Scan();
    scan.setFilter(filterList1);
    ResultScanner scanner1 = table.getScanner(scan);
    for (Result result : scanner1) {
      for (Cell cell : result.rawCells()) {
        System.out.println("Cell: " + cell + ", Value: " +
          Bytes.toString(cell.getValueArray(), cell.getValueOffset(),
            cell.getValueLength()));
      }
    }
    scanner1.close();

    FilterList filterList2 = new FilterList(
      FilterList.Operator.MUST_PASS_ONE, filters);

    scan.setFilter(filterList2);
    ResultScanner scanner2 = table.getScanner(scan);
    for (Result result : scanner2) {
      for (Cell cell : result.rawCells()) {
        System.out.println("Cell: " + cell + ", Value: " +
          Bytes.toString(cell.getValueArray(), cell.getValueOffset(),
            cell.getValueLength()));
      }
    }
    scanner2.close();

And the output again:

Adding rows to table...
Results of scan #1 - MUST_PASS_ALL:
Cell: row-03/colfam1:col-03/3/Put/vlen=9/seqid=0, Value: val-03.03
Cell: row-04/colfam1:col-03/3/Put/vlen=9/seqid=0, Value: val-04.03
Cell: row-05/colfam1:col-03/3/Put/vlen=9/seqid=0, Value: val-05.03
Cell: row-06/colfam1:col-03/3/Put/vlen=9/seqid=0, Value: val-06.03
Total cell count for scan #1: 4

Results of scan #2 - MUST_PASS_ONE:
Cell: row-01/colfam1:col-01/1/Put/vlen=9/seqid=0, Value: val-01.01
Cell: row-01/colfam1:col-02/2/Put/vlen=9/seqid=0, Value: val-01.02
...
Cell: row-10/colfam1:col-04/4/Put/vlen=9/seqid=0, Value: val-10.04
Cell: row-10/colfam1:col-05/5/Put/vlen=9/seqid=0, Value: val-10.05
Total cell count for scan #2: 50

The first scan filters out a lot of details, as at least one of the filters in the list excludes some information. Only where they all let the information pass is it returned to the client.

In contrast, the second scan includes all rows and columns in the result. This is caused by setting the FilterList operator to MUST_PASS_ONE, which includes all the information as soon as a single filter lets it pass. And in this scenario, all values are passed by at least one of them, including everything.

Custom Filters

Eventually, you may exhaust the list of supplied filter types and need to implement your own. This can be done by either implementing the abstract Filter class, or extending the provided FilterBase class. The latter provides default implementations for all methods that are members of the interface. The Filter class has the following structure:

public abstract class Filter {
  public enum ReturnCode {
    INCLUDE, INCLUDE_AND_NEXT_COL, SKIP, NEXT_COL, NEXT_ROW,
    SEEK_NEXT_USING_HINT
  }
  public void reset() throws IOException
  public boolean filterRowKey(byte[] buffer, int offset, int length)
    throws IOException
  public boolean filterAllRemaining() throws IOException
  public ReturnCode filterKeyValue(final Cell v) throws IOException
  public Cell transformCell(final Cell v) throws IOException
  public void filterRowCells(List<Cell> kvs) throws IOException
  public boolean hasFilterRow()
  public boolean filterRow() throws IOException
  public Cell getNextCellHint(final Cell currentKV) throws IOException
  public boolean isFamilyEssential(byte[] name) throws IOException
  public void setReversed(boolean reversed)
  public boolean isReversed()
  public byte[] toByteArray() throws IOException
  public static Filter parseFrom(final byte[] pbBytes)
    throws DeserializationException
}

The interface provides a public enumeration type, named ReturnCode, that is used by the filterKeyValue() method to indicate what the execution framework should do next. Instead of blindly iterating over all values, the filter has the ability to skip a value, the remainder of a column, or the rest of the entire row. This helps tremendously in terms of improving performance while retrieving data.

Note

The servers may still need to scan the entire row to find matching data, but the optimizations provided by the filterKeyValue() return code can reduce the work required to do so.

Table 4-4 lists the possible values and their meaning.

Table 4-4. Possible values for the Filter.ReturnCode enumeration
Return code Description

INCLUDE

Include the given Cell instance in the result.

INCLUDE_AND_NEXT_COL

Include current cell and move to next column, i.e. skip all further versions of the current.

SKIP

Skip the current cell and proceed to the next.

NEXT_COL

Skip the remainder of the current column, proceeding to the next. This is used by the TimestampsFilter, for example.

NEXT_ROW

Similar to the previous, but skips the remainder of the current row, moving to the next. The RowFilter makes use of this return code, for example.

SEEK_NEXT_USING_HINT

Some filters want to skip a variable number of cells and use this return code to indicate that the framework should use the getNextCellHint() method to determine where to skip to. The ColumnPrefixFilter, for example, uses this feature.

Most of the provided methods are called at various stages in the process of retrieving a row for a client—for example, during a scan operation. Putting them in call order, you can expect them to be executed in the following sequence:

hasFilterRow()

This is checked first as part of the read path to do two things: first, to decide if the filter is clashing with other read settings, such as scanner batching, and second, to call the filterRow() and filterRowCells() methods subsequently. It also enforces to load the entire row before calling these methods.

filterRowKey(byte[] buffer, int offset, int length)

The next check is against the row key, using this method of the Filter implementation. You can use it to skip an entire row from being further processed. The RowFilter uses it to suppress entire rows being returned to the client.

filterKeyValue(final Cell v)

When a row is not filtered (yet), the framework proceeds to invoke this method for every Cell that is part of the current row being materialized for the read. The ReturnCode indicates what should happen with the current cell.

transformCell()

Once the cell has passed the check and is available, the transform call allows the filter to modify the cell, before it is added to the resulting row.

filterRowCells(List<Cell> kvs)

Once all row and cell checks have been performed, this method of the filter is called, giving you access to the list of Cell instances that have not been excluded by the previous filter methods. The DependentColumnFilter uses it to drop those columns that do not match the reference column.

filterRow()

After everything else was checked and invoked, the final inspection is performed using filterRow(). A filter that uses this functionality is the PageFilter, checking if the number of rows to be returned for one iteration in the pagination process is reached, returning true afterward. The default false would include the current row in the result.

reset()

This resets the filter for every new row the scan is iterating over. It is called by the server, after a row is read, implicitly. This applies to get and scan operations, although obviously it has no effect for the former, as `get()`s only read a single row.

filterAllRemaining()

This method can be used to stop the scan, by returning true. It is used by filters to provide the early out optimization mentioned. If a filter returns false, the scan is continued, and the aforementioned methods are called. Obviously, this also implies that for get() operations this call is not useful.

Figure 4-2 shows the logical flow of the filter methods for a single row. There is a more fine-grained process to apply the filters on a column level, which is not relevant in this context.

filterflow
Figure 4-2. The logical flow through the filter methods for a single row

The Filter interface has a few more methods at its disposal. Table 4-5 lists them for your perusal.

Table 4-5. Additional methods provided by the Filter class
Method Description

getNextCellHint()

This method is invoked when the filter’s filterKeyValue() method returns ReturnCode.SEEK_NEXT_USING_HINT. Use it to skip large ranges of rows—if possible.

isFamilyEssential()

Discussed in “Load Column Families on Demand”, used to avoid unnecessary loading of cells from column families in low-cardinality scans.

setReversed()/isReversed()

Flags the direction the filter instance is observing. A reverse scan must use reverse filters too.

toByteArray()/parseFrom()

Used to de-/serialize the filter’s internal state to ship to the servers for application.

The reverse flag, assigned with setReversed(true), helps the filter to come to the right decision. Here is a snippet from the PrefixFilter.filterRowKey() method, showing how the result of the binary prefix comparison is reversed based on this flag:

...
int cmp = Bytes.compareTo(buffer, offset, this.prefix.length,
  this.prefix, 0, this.prefix.length);
if ((!isReversed() && cmp > 0) || (isReversed() && cmp < 0)) {
  passedPrefix = true;
}
...

Example 4-23 implements a custom filter, using the methods provided by FilterBase, overriding only those methods that need to be changed (or, more specifically, at least implement those that are marked abstract). The filter first assumes all rows should be filtered, that is, removed from the result. Only when there is a value in any column that matches the given reference does it include the row, so that it is sent back to the client. See “Custom Filter Loading” for how to load the custom filters into the Java server process.

Example 4-23. Implements a filter that lets certain rows pass
public class CustomFilter extends FilterBase {

  private byte[] value = null;
  private boolean filterRow = true;

  public CustomFilter() {
    super();
  }

  public CustomFilter(byte[] value) {
    this.value = value; 1
  }

  @Override
  public void reset() {
    this.filterRow = true; 2
  }

  @Override
  public ReturnCode filterKeyValue(Cell cell) {
    if (CellUtil.matchingValue(cell, value)) {
      filterRow = false; 3
    }
    return ReturnCode.INCLUDE; 4
  }

  @Override
  public boolean filterRow() {
    return filterRow; 5
  }

  @Override
  public byte [] toByteArray() {
    FilterProtos.CustomFilter.Builder builder =
      FilterProtos.CustomFilter.newBuilder();
    if (value != null) builder.setValue(ByteStringer.wrap(value)); 6
    return builder.build().toByteArray();
  }

  //@Override
  public static Filter parseFrom(final byte[] pbBytes)
  throws DeserializationException {
    FilterProtos.CustomFilter proto;
    try {
      proto = FilterProtos.CustomFilter.parseFrom(pbBytes); 7
    } catch (InvalidProtocolBufferException e) {
      throw new DeserializationException(e);
    }
    return new CustomFilter(proto.getValue().toByteArray());
  }
}
1

Set the value to compare against.

2

Reset filter flag for each new row being tested.

3

When there is a matching value, then let the row pass.

4

Always include, since the final decision is made later.

5

Here the actual decision is taking place, based on the flag status.

6

Writes the given value out so it can be sent to the servers.

7

Used by the servers to establish the filter instance with the correct values.

The most interesting part about the custom filter is the serialization using Protocol Buffers (Protobuf, for short).3 The first thing to do is define a message in Protobuf, which is done in a simple text file, here named CustomFilters.proto:

option java_package = "filters.generated";
option java_outer_classname = "FilterProtos";
option java_generic_services = true;
option java_generate_equals_and_hash = true;
option optimize_for = SPEED;

message CustomFilter {
  required bytes value = 1;
}

The file defines the output class name, the package to use during code generation and so on. The next step is to compile the definition file into code. This is done using the Protobuf protoc tool.

Tip

The Protocol Buffer library usually comes as a source package that needs to be compiled and locally installed. There are also pre-built binary packages for many operating systems. On OS X, for example, you can run the following, assuming Homebrew was installed:

$ brew install protobuf

You can verify the installation by running $ protoc --version and check it prints a version number:

$ protoc --version
libprotoc 2.6.1

The online code repository of the book has a script bin/doprotoc.sh that runs the code generation. It essentially runs the following command from the repository root directory:

$ protoc -Ich04/src/main/protobuf --java_out=ch04/src/main/java \
  ch04/src/main/protobuf/CustomFilters.proto

This will place the generated class file in the source directory, as specified. After that you will be able to use the generated types in your custom filter as shown in the example. Example 4-24 uses the new custom filter to find rows with specific values in it, also using a FilterList.

Example 4-24. Example using a custom filter
    List<Filter> filters = new ArrayList<Filter>();

    Filter filter1 = new CustomFilter(Bytes.toBytes("val-05.05"));
    filters.add(filter1);

    Filter filter2 = new CustomFilter(Bytes.toBytes("val-02.07"));
    filters.add(filter2);

    Filter filter3 = new CustomFilter(Bytes.toBytes("val-09.01"));
    filters.add(filter3);

    FilterList filterList = new FilterList(
      FilterList.Operator.MUST_PASS_ONE, filters);

    Scan scan = new Scan();
    scan.setFilter(filterList);
    ResultScanner scanner = table.getScanner(scan);
    for (Result result : scanner) {
      for (Cell cell : result.rawCells()) {
        System.out.println("Cell: " + cell + ", Value: " +
          Bytes.toString(cell.getValueArray(), cell.getValueOffset(),
            cell.getValueLength()));
      }
    }
    scanner.close();

Just as with the earlier examples, here is what should appear as output on the console when executing this example:

Adding rows to table...
Results of scan:
Cell: row-02/colfam1:col-01/1/Put/vlen=9/seqid=0, Value: val-02.01
Cell: row-02/colfam1:col-02/2/Put/vlen=9/seqid=0, Value: val-02.02
...
Cell: row-02/colfam1:col-06/6/Put/vlen=9/seqid=0, Value: val-02.06
Cell: row-02/colfam1:col-07/7/Put/vlen=9/seqid=0, Value: val-02.07
Cell: row-02/colfam1:col-08/8/Put/vlen=9/seqid=0, Value: val-02.08
...
Cell: row-05/colfam1:col-04/4/Put/vlen=9/seqid=0, Value: val-05.04
Cell: row-05/colfam1:col-05/5/Put/vlen=9/seqid=0, Value: val-05.05
Cell: row-05/colfam1:col-06/6/Put/vlen=9/seqid=0, Value: val-05.06
...
Cell: row-05/colfam1:col-10/10/Put/vlen=9/seqid=0, Value: val-05.10
Cell: row-09/colfam1:col-01/1/Put/vlen=9/seqid=0, Value: val-09.01
Cell: row-09/colfam1:col-02/2/Put/vlen=9/seqid=0, Value: val-09.02
...
Cell: row-09/colfam1:col-09/9/Put/vlen=9/seqid=0, Value: val-09.09
Cell: row-09/colfam1:col-10/10/Put/vlen=9/seqid=0, Value: val-09.10

As expected, the entire row that has a column with the value matching one of the references is included in the result.

Custom Filter Loading

Once you have written your filter, you need to deploy it to your HBase setup. You need to compile the class, pack it into a Java Archive (JAR) file, and make it available to the region servers. You can use the build system of your choice to prepare the JAR file for deployment, and a configuration management system to actually provision the file to all servers. Once you have uploaded the JAR file, you have two choices how to load them:

Static Configuration

In this case, you need to add the JAR file to the hbase-env.sh configuration file, for example:

# Extra Java CLASSPATH elements.  Optional.
# export HBASE_CLASSPATH=
export HBASE_CLASSPATH="/hbase-book/ch04/target/hbase-book-ch04-2.0.jar"

This is using the JAR file created by the Maven build as supplied by the source code repository accompanying this book. It uses an absolute, local path since testing is done on a standalone setup, in other words, with the development environment and HBase running on the same physical machine.

Note that you must restart the HBase daemons so that the changes in the configuration file are taking effect. Once this is done you can proceed to test the new filter.

Dynamic Loading

You still build the JAR file the same way, but instead of hardcoding its path into the configuration files, you can use the cluster wide, shared JAR file directory in HDFS that is used to load JAR files from. See the following configuration property from the hbase-default.xml file:

<property>
  <name>hbase.dynamic.jars.dir</name>
  <value>${hbase.rootdir}/lib</value>
</property>

The default points to ${hbase.rootdir}/lib, which usually resolves to /hbase/lib/ within HDFS. The full path would be similar to this example path: hdfs://master.foobar.com:9000/hbase/lib. If this directory exists and contains files ending in .jar, then the servers will load those files and make the contained classes available. To do so, the files are copied to a local directory named jars, located in a parent directory set again in the HBase default properties:

<property>
  <name>hbase.local.dir</name>
  <value>${hbase.tmp.dir}/local/</value>
</property>

An example path for a cluster with a configured temporary directory pointing to /data/tmp/ you will see the JAR files being copied to /data/tmp/local/jars. You will see this directory again later on when we talk about dynamic coprocessor loading in “Coprocessor Loading”. The local JAR files are flagged to be deleted when the server process ends normally.

The dynamic loading directory is monitored for changes, and will refresh the JAR files locally if they have been updated in the shared location.

Note that no matter how you load the classes and their containing JARs, HBase is currently not able to unload a previously loaded class. This means that once loaded, you cannot replace a class with the same name. The only way short of restarting the server processes is to add a version number to the class and JAR name to load the new one by new name. This leaves the previous classes loaded in memory and might cause memory issues after some time.

Filter Parser Utility

The client-side filter package comes with another helper class, named ParseFilter. It is used in all the places where filters need to be described with text and then, eventually, converted to a Java class. This happens in the gateway servers, such as for REST or Thrift. The HBase Shell also makes use of the class allowing a shell user to specify a filter on the command line, and then executing the filter as part of a subsequent scan, or get, operation. The following executes a scan on one of the earlier test tables (so your results may vary), adding a row prefix and qualifier filter, using the shell:

hbase(main):001:0> scan 'testtable', \
  { FILTER => "PrefixFilter('row-2') AND QualifierFilter(<=, 'binary:col-2')" }
ROW              COLUMN+CELL
 row-20           column=colfam1:col-0, timestamp=7, value=val-46
 row-21           column=colfam1:col-0, timestamp=7, value=val-87
 row-21           column=colfam1:col-2, timestamp=5, value=val-26
 ...
 row-28           column=colfam1:col-2, timestamp=3, value=val-74
 row-29           column=colfam1:col-1, timestamp=0, value=val-86
 row-29           column=colfam1:col-2, timestamp=3, value=val-21
10 row(s) in 0.0170 seconds

What seems odd at first is the "binary:col-2" parameter. The second part after the colon is the value handed into the filter. The first part is the way the filter parser class is allowing you to specify a comparator for filters based on CompareFilter (see “Comparators”). Here is a list of supported comparator prefixes:

Table 4-6. String representation of Comparator types
String Type

binary

BinaryComparator

binaryprefix

BinaryPrefixComparator

regexstring

RegexStringComparator

substring

SubstringComparator

Since a comparison filter also requires a comparison operation, there is a way of expressing this in string format. The example above uses "<=" to specify less than or equal. Since there is an enumeration provided by the CompareFilter class, there is a matching pattern between the string representation and the enumeration value, as shown in the next table (also see “Comparison Operators”):

Table 4-7. String representation of compare operation
String Type

<

CompareOp.LESS

<=

CompareOp.LESS_OR_EQUAL

>

CompareOp.GREATER

>=

CompareOp.GREATER_OR_EQUAL

=

CompareOp.EQUAL

!=

CompareOp.NOT_EQUAL

The filter parser supports a few more text based tokens that translate into filter classes. You can combine filters with the AND and OR keywords, which are subsequently translated into FilterList instances that are either set to MUST_PASS_ALL, or MUST_PASS_ONE respectively (“FilterList” describes this in more detail). An example might be:

hbase(main):001:0> scan 'testtable', \
  { FILTER => "(PrefixFilter('row-2') AND ( \
    QualifierFilter(>=, 'binary:col-2'))) AND (TimestampsFilter(1, 5))" }
ROW              COLUMN+CELL
 row-2            column=colfam1:col-9, timestamp=5, value=val-31
 row-21           column=colfam1:col-2, timestamp=5, value=val-26
 row-23           column=colfam1:col-5, timestamp=5, value=val-55
 row-28           column=colfam1:col-5, timestamp=1, value=val-54
4 row(s) in 0.3190 seconds

Finally, there are the keywords SKIP and WHILE, representing the use of a SkipFilter (see “SkipFilter”) and WhileMatchFilter (see “WhileMatchFilter”). Refer to the mentioned sections for details on their features.

hbase(main):001:0> scan 'testtable', \
  { FILTER => "SKIP ValueFilter(>=, 'binary:val-5') " }
ROW              COLUMN+CELL
 row-11           column=colfam1:col-0, timestamp=8, value=val-82
 row-48           column=colfam1:col-3, timestamp=6, value=val-55
 row-48           column=colfam1:col-7, timestamp=3, value=val-80
 row-48           column=colfam1:col-8, timestamp=2, value=val-65
 row-7            column=colfam1:col-9, timestamp=6, value=val-57
3 row(s) in 0.0150 seconds

The precedence of the keywords the parser understands is the following, listed from highest to lowest:

Table 4-8. Precedence of string keywords
Keyword Description

SKIP/WHILE

Wrap filter into SkipFilter, or WhileMatchFilter instance.

AND

Add both filters left and right of keyword to FilterList instance using MUST_PASS_ALL.

OR

Add both filters left and right of keyword to FilterList instance using MUST_PASS_ONE.

From code you can invoke one of the following methods to parse a filter string into class instances:

Filter parseFilterString(String filterString)
  throws CharacterCodingException
Filter parseFilterString (byte[] filterStringAsByteArray)
  throws CharacterCodingException
Filter parseSimpleFilterExpression(byte[] filterStringAsByteArray)
  throws CharacterCodingException

The parseSimpleFilterExpression() parses one specific filter instance, and is used mainly from within the parseFilterString() methods. The latter handles the combination of multiple filters with AND and OR, plus the decorating filter wrapping with SKIP and WHILE. The two parseFilterString() methods are the same, one is taking a string and the other a string converted to a byte[] array.

The ParseFilter class—by default—only supports the filters that are shipped with HBase. The unsupported filters on top of that are FirstKeyValueMatchingQualifiersFilter, FuzzyRowFilter, and RandomRowFilter (as of this writing). In your own code you can register your own, and retrieve the list of supported filters using the following methods of this class:

static Map<String, String> getAllFilters()
Set<String> getSupportedFilters()
static void registerFilter(String name, String filterClass)

Filters Summary

Table 4-9 summarizes some of the features and compatibilities related to the provided filter implementations. The ✓ symbol means the feature is available, while ✗ indicates it is missing.

Table 4-9. Summary of filter features and compatibilities between them
Filter Batcha Skipb While-Matchc Listd Early Oute Getsf Scansg

RowFilter

FamilyFilter

QualifierFilter

ValueFilter

DependentColumnFilter

SingleColumnValueFilter

SingleColumnValueExcludeFilter

PrefixFilter

PageFilter

KeyOnlyFilter

FirstKeyOnlyFilter

FirstKeyValueMatchingQualifiersFilter

InclusiveStopFilter

FuzzyRowFilter

ColumnCountGetFilter

ColumnPaginationFilter

ColumnPrefixFilter

MultipleColumnPrefixFilter

ColumnRange

TimestampsFilter

RandomRowFilter

SkipFilter

✓/✗h

✓/✗h

WhileMatchFilter

✓/✗h

✓/✗h

FilterList

✓/✗h

✓/✗h

✓/✗h

✓/✗h

a Filter supports Scan.setBatch(), i.e., the scanner batch mode.

b Filter can be used with the decorating SkipFilter class.

c Filter can be used with the decorating WhileMatchFilter class.

d Filter can be used with the combining FilterList class.

e Filter has optimizations to stop a scan early, once there are no more matching rows ahead.

f Filter can be usefully applied to Get instances.

g Filter can be usefully applied to Scan instances.

h Depends on the included filters.

Counters

In addition to the functionality we already discussed, HBase offers another advanced feature: counters. Many applications that collect statistics—such as clicks or views in online advertising—were used to collect the data in log files that would subsequently be analyzed. Using counters offers the potential of switching to live accounting, foregoing the delayed batch processing step completely.

Introduction to Counters

In addition to the check-and-modify operations you saw earlier, HBase also has a mechanism to treat columns as counters. Otherwise, you would have to lock a row, read the value, increment it, write it back, and eventually unlock the row for other writers to be able to access it subsequently. This can cause a lot of contention, and in the event of a client process, crashing it could leave the row locked until the lease recovery kicks in—which could be disastrous in a heavily loaded system.

The client API provides specialized methods to do the read-modify-write operation atomically in a single client-side call. Earlier versions of HBase only had calls that would involve an RPC for every counter update, while newer versions started to add the same mechanisms used by the CRUD operations—as explained in “CRUD Operations”--which can bundle multiple counter updates in a single RPC.

Before we discuss each type separately, you need to have a few more details regarding how counters work on the column level. Here is an example using the shell that creates a table, increments a counter twice, and then queries the current value:

hbase(main):001:0> create 'counters', 'daily', 'weekly', 'monthly'
0 row(s) in 1.1930 seconds

hbase(main):002:0> incr 'counters', '20150101', 'daily:hits', 1
COUNTER VALUE = 1
0 row(s) in 0.0490 seconds

hbase(main):003:0> incr 'counters', '20150101', 'daily:hits', 1
COUNTER VALUE = 2
0 row(s) in 0.0170 seconds

hbase(main):04:0> get_counter 'counters', '20150101', 'daily:hits'
COUNTER VALUE = 2

Every call to incr increases the counter by the given value (here 1). The final check using get_counter shows the current value as expected. The format of the shell’s incr command is as follows:

incr '<table>', '<row>', '<column>', [<increment-value>]

You can also access the counter with a get call, giving you this result:

hbase(main):005:0> get 'counters', '20150101'
COLUMN       CELL
 daily:hits   timestamp=1427485256567, value=\x00\x00\x00\x00\x00\x00\x00\x02
1 row(s) in 0.0280 seconds

This is obviously not very readable, but it shows that a counter is simply a column, like any other. You can also specify a larger increment value:

hbase(main):006:0> incr 'counters', '20150101', 'daily:hits', 20
COUNTER VALUE = 22
0 row(s) in 0.0180 seconds

hbase(main):007:0> get_counter 'counters', '20150101', 'daily:hits'
COUNTER VALUE = 22

hbase(main):008:0> get 'counters', '20150101'
COLUMN       CELL
 daily:hits   timestamp=1427489182419, value=\x00\x00\x00\x00\x00\x00\x00\x16
1 row(s) in 0.0200 seconds

Accessing the counter directly gives you the byte[] array representation, with the shell printing the separate bytes as hexadecimal values. Using the get_counter once again shows the current value in a more human-readable format, and confirms that variable increments are possible and work as expected.

Finally, you can use the increment value of the incr call to not only increase the counter, but also retrieve the current value, and decrease it as well. In fact, you can omit it completely and the default of 1 is assumed:

hbase(main):009:0> incr 'counters', '20150101', 'daily:hits'
COUNTER VALUE = 23
0 row(s) in 0.1700 seconds

hbase(main):010:0> incr 'counters', '20150101', 'daily:hits'
COUNTER VALUE = 24
0 row(s) in 0.0230 seconds

hbase(main):011:0> incr 'counters', '20150101', 'daily:hits', 0
COUNTER VALUE = 24
0 row(s) in 0.0170 seconds

hbase(main):012:0> incr 'counters', '20150101', 'daily:hits', -1
COUNTER VALUE = 23
0 row(s) in 0.0210 seconds

hbase(main):013:0> incr 'counters', '20150101', 'daily:hits', -1
COUNTER VALUE = 22
0 row(s) in 0.0200 seconds

Using the increment value—the last parameter of the incr command—you can achieve the behavior shown in Table 4-10.

Table 4-10. The increment value and its effect on counter increments
Value Effect

greater than zero

Increase the counter by the given value.

zero

Retrieve the current value of the counter. Same as using the get_counter shell command.

less than zero

Decrease the counter by the given value.

Obviously, using the shell’s incr command only allows you to increase a single counter. You can do the same using the client API, described next.

Single Counters

The first type of increment call is for single counters only: you need to specify the exact column you want to use. The methods, provided by Table, are as such:

long incrementColumnValue(byte[] row, byte[] family, byte[] qualifier,
  long amount) throws IOException;
long incrementColumnValue(byte[] row, byte[] family, byte[] qualifier,
  long amount, Durability durability) throws IOException;

Given the coordinates of a column, and the increment amount, these methods only differ by the optional durability parameter—which works the same way as the Put.setDurability() method (see “Durability, Consistency, and Isolation” for the general discussion of this feature). Omitting durability uses the default value of Durability.SYNC_WAL, meaning the write-ahead log is active. Apart from that, you can use them straight forward, as shown in Example 4-25.

Example 4-25. Example using the single counter increment methods
    long cnt1 = table.incrementColumnValue(Bytes.toBytes("20110101"), 1
      Bytes.toBytes("daily"), Bytes.toBytes("hits"), 1);
    long cnt2 = table.incrementColumnValue(Bytes.toBytes("20110101"), 2
      Bytes.toBytes("daily"), Bytes.toBytes("hits"), 1);

    long current = table.incrementColumnValue(Bytes.toBytes("20110101"), 3
      Bytes.toBytes("daily"), Bytes.toBytes("hits"), 0);

    long cnt3 = table.incrementColumnValue(Bytes.toBytes("20110101"), 4
      Bytes.toBytes("daily"), Bytes.toBytes("hits"), -1);
1

Increase counter by one.

2

Increase counter by one a second time.

3

Get current value of the counter without increasing it.

4

Decrease counter by one.

The output on the console is:

cnt1: 1, cnt2: 2, current: 2, cnt3: 1

Just as with the shell commands used earlier, the API calls have the same effect: they increment the counter when using a positive increment value, retrieve the current value when using zero for the increment, and decrease the counter by using a negative increment value.

Multiple Counters

Another way to increment counters is provided by the increment() call of Table. It works similarly to the CRUD-type operations discussed earlier, using the following method to do the increment:

Result increment(final Increment increment) throws IOException

You must create an instance of the Increment class and fill it with the appropriate details—for example, the counter coordinates. The constructors provided by this class are:

Increment(byte[] row)
Increment(final byte[] row, final int offset, final int length)
Increment(Increment i)

You must provide a row key when instantiating an Increment, which sets the row containing all the counters that the subsequent call to increment() should modify. There is also the variant already known to you that takes a larger array with an offset and length parameter to extract the row key from. Finally, there is also the one you have seen before, which takes an existing instance and copies all state from it.

Once you have decided which row to update and created the Increment instance, you need to add the actual counters—meaning columns—you want to increment, using these methods:

Increment addColumn(byte[] family, byte[] qualifier, long amount)
Increment add(Cell cell) throws IOException

The first variant takes the column coordinates, while the second is reusing an existing cell. This is useful, if you have just retrieved a counter and now want to increment it. The add() call checks that the given cell matches the row key of the Increment instance.

The difference here, as compared to the Put methods, is that there is no option to specify a version—or timestamp—when dealing with increments: versions are handled implicitly. Furthermore, there is no addFamily() equivalent, because counters are specific columns, and they need to be specified as such. It therefore makes no sense to add a column family alone.

A special feature of the Increment class is the ability to take an optional time range:

Increment setTimeRange(long minStamp, long maxStamp) throws IOException
TimeRange getTimeRange()

Setting a time range for a set of counter increments seems odd in light of the fact that versions are handled implicitly. The time range is actually passed on to the servers to restrict the internal get operation from retrieving the current counter values. You can use it to expire counters, for example, to partition them by time: when you set the time range to be restrictive enough, you can mask out older counters from the internal get, making them look like they are nonexistent. An increment would assume they are unset and start at 1 again. The getTimeRange() returns the currently assigned time range (and might be null if not set at all).

Similar to the shell example shown earlier, Example 4-26 uses various increment values to increment, retrieve, and decrement the given counters.

Example 4-26. Example incrementing multiple counters in one row
    Increment increment1 = new Increment(Bytes.toBytes("20150101"));

    increment1.addColumn(Bytes.toBytes("daily"), Bytes.toBytes("clicks"), 1);
    increment1.addColumn(Bytes.toBytes("daily"), Bytes.toBytes("hits"), 1); 1
    increment1.addColumn(Bytes.toBytes("weekly"), Bytes.toBytes("clicks"), 10);
    increment1.addColumn(Bytes.toBytes("weekly"), Bytes.toBytes("hits"), 10);

    Result result1 = table.increment(increment1); 2

    for (Cell cell : result1.rawCells()) {
      System.out.println("Cell: " + cell +
        " Value: " + Bytes.toLong(cell.getValueArray(), cell.getValueOffset(),
        cell.getValueLength())); 3
    }

    Increment increment2 = new Increment(Bytes.toBytes("20150101"));

    increment2.addColumn(Bytes.toBytes("daily"), Bytes.toBytes("clicks"), 5);
    increment2.addColumn(Bytes.toBytes("daily"), Bytes.toBytes("hits"), 1); 4
    increment2.addColumn(Bytes.toBytes("weekly"), Bytes.toBytes("clicks"), 0);
    increment2.addColumn(Bytes.toBytes("weekly"), Bytes.toBytes("hits"), -5);

    Result result2 = table.increment(increment2);

    for (Cell cell : result2.rawCells()) {
      System.out.println("Cell: " + cell +
        " Value: " + Bytes.toLong(cell.getValueArray(),
          cell.getValueOffset(), cell.getValueLength()));
    }
1

Increment the counters with various values.

2

Call the actual increment method with the above counter updates and receive the results.

3

Print the cell and returned counter value.

4

Use positive, negative, and zero increment values to achieve the wanted counter changes.

When you run the example, the following is output on the console:

Cell: 20150101/daily:clicks/1427651982538/Put/vlen=8/seqid=0 Value: 1
Cell: 20150101/daily:hits/1427651982538/Put/vlen=8/seqid=0 Value: 1
Cell: 20150101/weekly:clicks/1427651982538/Put/vlen=8/seqid=0 Value: 10
Cell: 20150101/weekly:hits/1427651982538/Put/vlen=8/seqid=0 Value: 10

Cell: 20150101/daily:clicks/1427651982543/Put/vlen=8/seqid=0 Value: 6
Cell: 20150101/daily:hits/1427651982543/Put/vlen=8/seqid=0 Value: 2
Cell: 20150101/weekly:clicks/1427651982543/Put/vlen=8/seqid=0 Value: 10
Cell: 20150101/weekly:hits/1427651982543/Put/vlen=8/seqid=0 Value: 5

When you compare the two sets of increment results, you will notice that this works as expected.

The Increment class provides additional methods, which are listed in Table 4-11 for your reference. Once again, many are inherited from the superclasses, such as Mutation (see “Query versus Mutation” again).

Table 4-11. Quick overview of additional methods provided by the Increment class
Method Description

cellScanner()

Provides a scanner over all cells available in this instance.

getACL()/setACL()

The ACLs for this operation (might be null).

getAttribute()/setAttribute()

Set and get arbitrary attributes associated with this instance of Increment.

getAttributesMap()

Returns the entire map of attributes, if any are set.

getCellVisibility()/setCellVisibility()

The cell level visibility for all included cells.

getClusterIds()/setClusterIds()

The cluster IDs as needed for replication purposes.

getDurability()/setDurability()

The durability settings for the mutation.

getFamilyCellMap()/setFamilyCellMap()

The list of all cells of this instance.

getFamilyMapOfLongs()

Returns a list of Long instance, instead of cells (which getFamilyCellMap() does), for what was added to this instance so far. The list is indexed by families, and then by column qualifier.

getFingerprint()

Compiles details about the instance into a map for debugging, or logging.

getId()/setId()

An ID for the operation, useful for identifying the origin of a request later.

getRow()

Returns the row key as specified when creating the Increment instance.

getTimeStamp()

Not useful with Increment. Defaults to HConstants.LATEST_TIMESTAMP.

getTTL()/setTTL()

Sets the cell level TTL value, which is being applied to all included Cell instances before being persisted.

hasFamilies()

Another helper to check if a family—or column—has been added to the current instance of the Increment class.

heapSize()

Computes the heap space required for the current Increment instance. This includes all contained data and space needed for internal structures.

isEmpty()

Checks if the family map contains any Cell instances.

numFamilies()

Convenience method to retrieve the size of the family map, containing all Cell instances.

size()

Returns the number of Cell instances that will be applied with this Increment.

toJSON()/toJSON(int)

Converts the first 5 or N columns into a JSON format.

toMap()/toMap(int)

Converts the first 5 or N columns into a map. This is more detailed than what getFingerprint() returns.

toString()/toString(int)

Converts the first 5 or N columns into a JSON, or map (if JSON fails due to encoding problems).

A non-Mutation method provided by Increment is:

Map<byte[], NavigableMap<byte[], Long>> getFamilyMapOfLongs()

The above Example 4-26 in the online repository shows how this can give you access to the list of increment values of a configured Increment instance. It is omitted above for the sake of brevity, but the online code has this available (around line number 40).

Coprocessors

Earlier we discussed how you can use filters to reduce the amount of data being sent over the network from the servers to the client. With the coprocessor feature in HBase, you can even move part of the computation to where the data lives.

Note

We slightly go on a tangent here as far as interface audience is concerned. If you refer back to [Link to Come] you will see how we, up until now, solely covered Public APIs, that is, those that are annotated as being public. For coprocessors we are now looking at an API annotated as @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.COPROC), since it is meant for HBase system developers. A normal API user will make use of coprocessors, but most likely not develop them. Coprocessors are very low-level, and are usually for very experienced developers only.

Introduction to Coprocessors

Using the client API, combined with specific selector mechanisms, such as filters, or column family scoping, it is possible to limit what data is transferred to the client. It would be good, though, to take this further and, for example, perform certain operations directly on the server side while only returning a small result set. Think of this as a small MapReduce framework that distributes work across the entire cluster.

A coprocessor enables you to run arbitrary code directly on each region server. More precisely, it executes the code on a per-region basis, giving you trigger- like functionality—similar to stored procedures in the RDBMS world. From the client side, you do not have to take specific actions, as the framework handles the distributed nature transparently.

There is a set of implicit events that you can use to hook into, performing auxiliary tasks. If this is not enough, you can also extend the RPC protocol to introduce your own set of calls, which are invoked from your client and executed on the server on your behalf.

Just as with the custom filters (see “Custom Filters”), you need to create special Java classes that implement specific interfaces. Once they are compiled, you make these classes available to the servers in the form of a JAR file. The region server process can instantiate these classes and execute them in the correct environment. In contrast to the filters, though, coprocessors can be loaded dynamically as well. This allows you to extend the functionality of a running HBase cluster.

Use cases for coprocessors are, for instance, using hooks into row mutation operations to maintain secondary indexes, or implementing some kind of referential integrity. Filters could be enhanced to become stateful, and therefore make decisions across row boundaries. Aggregate functions, such as sum(), or avg(), known from RDBMSes and SQL, could be moved to the servers to scan the data locally and only returning the single number result across the network (which is showcased by the supplied AggregateImplementation class).

Note

Another good use case for coprocessors is access control. The authentication, authorization, and auditing features added in HBase version 0.92 are based on coprocessors. They are loaded at system startup and use the provided trigger-like hooks to check if a user is authenticated, and authorized to access specific values stored in tables.

The framework already provides classes, based on the coprocessor framework, which you can use to extend from when implementing your own functionality. They fall into two main groups: endpoint and observer. Here is a brief overview of their purpose:

Endpoint

Next to event handling there may be also a need to add custom operations to a cluster. User code can be deployed to the servers hosting the data to, for example, perform server-local computations.

Endpoints are dynamic extensions to the RPC protocol, adding callable remote procedures. Think of them as stored procedures, as known from RDBMSes. They may be combined with observer implementations to directly interact with the server-side state.

Observer

This type of coprocessor is comparable to triggers: callback functions (also referred to here as hooks) are executed when certain events occur. This includes user-generated, but also server-internal, automated events.

The interfaces provided by the coprocessor framework are:

MasterObserver

This can be used to react to administrative or DDL-type operations. These are cluster-wide events.

RegionServerObserver

Hooks into commands sent to a region server, and covers region server-wide events.

RegionObserver

Used to handle data manipulation events. They are closely bound to the regions of a table.

WALObserver

This provides hooks into the write-ahead log processing, which is region server-wide.

BulkLoadObserver

Handles events around the bulk loading API. Triggered before and after the loading takes place.

EndpointObserver

Whenever an endpoint is invoked by a client, this observer provides a callback method.

Observers provide you with well-defined event callbacks, for every operation a cluster server may handle.

All of these interfaces are based on the Coprocessor interface to gain common features, but then implement their own specific functionality.

Finally, coprocessors can be chained, very similar to what the Java Servlet API does with request filters. The following section discusses the various types available in the coprocessor framework. Figure 4-3 shows an overview of all the classes we will be looking into.

coprohierarchy
Figure 4-3. The class hierarchy of the coprocessor related classes

The Coprocessor Class Trinity

All user coprocessor classes must be based on the Coprocessor interface. It defines the basic contract of a coprocessor and facilitates the management by the framework itself. The interface provides two sets of types, which are used throughout the framework: the PRIORITY constants4, and State enumeration. Table 4-12 explains the priority values.

Table 4-12. Priorities as defined by the Coprocessor.PRIORITY_<XYZ> constants
Name Value Description

PRIORITY_HIGHEST

0

Highest priority, serves as an upper boundary.

PRIORITY_SYSTEM

536870911

High priority, used for system coprocessors (Integer.MAX_VALUE / 4).

PRIORITY_USER

1073741823

For all user coprocessors, which are executed subsequently (Integer.MAX_VALUE / 2).

PRIORITY_LOWEST

2147483647

Lowest possible priority, serves as a lower boundary (Integer.MAX_VALUE).

The priority of a coprocessor defines in what order the coprocessors are executed: system-level instances are called before the user-level coprocessors are executed.

Note

Within each priority level, there is also the notion of a sequence number, which keeps track of the order in which the coprocessors were loaded. The number starts with zero, and is increased by one thereafter.

The number itself is not very helpful, but you can rely on the framework to order the coprocessors—in each priority group—ascending by sequence number. This defines their execution order.

Coprocessors are managed by the framework in their own life cycle. To that effect, the Coprocessor interface offers two calls:

void start(CoprocessorEnvironment env) throws IOException
void stop(CoprocessorEnvironment env) throws IOException

These two methods are called when the coprocessor class is started, and eventually when it is decommissioned. The provided CoprocessorEnvironment instance is used to retain the state across the lifespan of the coprocessor instance. A coprocessor instance is always contained in a provided environment, which provides the following methods:

String getHBaseVersion()

Returns the HBase version identification string, for example "1.0.0".

int getVersion()

Returns the version of the Coprocessor interface.

Coprocessor getInstance()

Returns the loaded coprocessor instance.

int getPriority()

Provides the priority level of the coprocessor.

int getLoadSequence()

The sequence number of the coprocessor. This is set when the instance is loaded and reflects the execution order.

Configuration getConfiguration()

Provides access to the current, server-wide configuration.

HTableInterface getTable(TableName tableName)
HTableInterface getTable(TableName tableName, ExecutorService service)

Returns a Table implementation for the given table name. This allows the coprocessor to access the actual table data.5 The second variant does the same, but allows the specification of a custom ExecutorService instance.

Coprocessors should only deal with what they have been given by their environment. There is a good reason for that, mainly to guarantee that there is no back door for malicious code to harm your data.

Note

Coprocessor implementations should be using the getTable() method to access tables. Note that this class adds certain safety measures to the returned Table implementation. While there is currently nothing that can stop you from retrieving your own Table instances inside your coprocessor code, this is likely to be checked against in the future and possibly denied.

The start() and stop() methods of the Coprocessor interface are invoked implicitly by the framework as the instance is going through its life cycle. Each step in the process has a well-known state. Table 4-13 lists the life-cycle state values as provided by the coprocessor interface.

Table 4-13. The states as defined by the Coprocessor.State enumeration
Value Description

UNINSTALLED

The coprocessor is in its initial state. It has no environment yet, nor is it initialized.

INSTALLED

The instance is installed into its environment.

STARTING

This state indicates that the coprocessor is about to be started, that is, its start() method is about to be invoked.

ACTIVE

Once the start() call returns, the state is set to active.

STOPPING

The state set just before the stop() method is called.

STOPPED

Once stop() returns control to the framework, the state of the coprocessor is set to stopped.

The final piece of the puzzle is the CoprocessorHost class that maintains all the coprocessor instances and their dedicated environments. There are specific subclasses, depending on where the host is used, in other words, on the master, region server, and so on.

The trinity of Coprocessor, CoprocessorEnvironment, and CoprocessorHost forms the basis for the classes that implement the advanced functionality of HBase, depending on where they are used. They provide the life-cycle support for the coprocessors, manage their state, and offer the environment for them to execute as expected. In addition, these classes provide an abstraction layer that developers can use to easily build their own custom implementation.

Figure 4-4 shows how the calls from a client flow through the list of coprocessors. Note how the order is the same on the incoming and outgoing sides: first are the system-level ones, and then the user ones in the order in which they were loaded.

hbas 0403
Figure 4-4. Coprocessors executed sequentially, in their environment, and per region

Coprocessor Loading

Coprocessors are loaded in a variety of ways. Before we discuss the actual coprocessor types and how to implement your own, we will talk about how to deploy them so that you can try the provided examples. You can either configure coprocessors to be loaded in a static way, or load them dynamically while the cluster is running. The static method uses the configuration files and table schemas, while the dynamic loading of coprocessors is only using the table schemas.

There is also a cluster-wide switch that allows you to disable all coprocessor loading, controlled by the following two configuration properties:

hbase.coprocessor.enabled

The default is true and means coprocessor classes for system and user tables are loaded. Setting this property to false stops the servers from loading any of them. You could use this during testing, or during cluster emergencies.

hbase.coprocessor.user.enabled

Again, the default is true, that is, all user table coprocessors are loaded when the server starts, or a region opens, etc. Setting this property to false suppresses the loading of user table coprocessors only.

Caution

Disabling coprocessors, using the cluster-wide configuration properties, means that whatever additional processing they add, your cluster will not have this functionality available. This includes, for example, security checks, or maintenance of referential integrity. Be very careful!

Loading from Configuration

You can configure globally which coprocessors are loaded when HBase starts. This is done by adding one, or more, of the following to the hbase-site.xml configuration file (but please, replace the example class names with your own ones!):

<property>
  <name>hbase.coprocessor.master.classes</name>
  <value>coprocessor.MasterObserverExample</value>
</property>
<property>
  <name>hbase.coprocessor.regionserver.classes</name>
  <value>coprocessor.RegionServerObserverExample</value>
</property>
<property>
  <name>hbase.coprocessor.region.classes</name>
  <value>coprocessor.system.RegionObserverExample,
         coprocessor.AnotherCoprocessor</value>
</property>
<property>
  <name>hbase.coprocessor.user.region.classes</name>
  <value>coprocessor.user.RegionObserverExample</value>
</property>
<property>
  <name>hbase.coprocessor.wal.classes</name>
  <value>coprocessor.WALObserverExample, bar.foo.MyWALObserver</value>
</property>

The order of the classes in each configuration property is important, as it defines the execution order. All of these coprocessors are loaded with the system priority. You should configure all globally active classes here so that they are executed first and have a chance to take authoritative actions. Security coprocessors are loaded this way, for example.

Note

The configuration file is the first to be examined as HBase starts. Although you can define additional system-level coprocessors in other places, the ones here are executed first. They are also sometimes referred to as default coprocessors.

Only one of the five possible configuration keys is read by the matching CoprocessorHost implementation. For example, the coprocessors defined in hbase.coprocessor.master.classes are loaded by the MasterCoprocessorHost class.

Table 4-14 shows where each configuration property is used.

Table 4-14. Possible configuration properties and where they are used
Property Coprocessor Host Server Type

hbase.coprocessor.master.classes

MasterCoprocessorHost

Master Server

hbase.coprocessor.regionserver.classes

RegionServerCoprocessorHost

Region Server

hbase.coprocessor.region.classes

RegionCoprocessorHost

Region Server

hbase.coprocessor.user.region.classes

RegionCoprocessorHost

Region Server

hbase.coprocessor.wal.classes

WALCoprocessorHost

Region Server

There are two separate properties provided for classes loaded into regions, and the reason is this:

hbase.coprocessor.region.classes

All listed coprocessors are loaded at system priority for every table in HBase, including the special catalog tables.

hbase.coprocessor.user.region.classes

The coprocessor classes listed here are also loaded at system priority, but only for user tables, not the special catalog tables.

Apart from that, the coprocessors defined with either property are loaded when a region is opened for a table. Note that you cannot specify for which user and/or system table, or region, they are loaded, or in other words, they are loaded for every table and region. You need to keep this in mind when designing your own coprocessors.

Be careful what you do as lifecycle events are triggered and your coprocessor code is setting up resources. As instantiating your coprocessor is part of opening regions, any longer delay might be noticeable. In other words, you should be very diligent to only do as light work as possible during open and close events.

What is also important to consider is that when a coprocessor, loaded from the configuration, fails to start, in other words it is throwing an exception, it will cause the entire server process to be aborted. When this happens, the process will log the error and a list of loaded (or configured rather) coprocessors, which might help identifying the culprit.

Loading from Table Descriptor

The other option to define which coprocessors to load is the table descriptor. As this is per table, the coprocessors defined here are only loaded for regions of that table—and only by the region servers hosting these regions. In other words, you can only use this approach for region-related coprocessors, not for master, or WAL-related ones. On the other hand, since they are loaded in the context of a table, they are more targeted compared to the configuration loaded ones, which apply to all tables. You need to add their definition to the table descriptor using one of two methods:

  1. Using the generic HTableDescriptor.setValue() with a specific key, or

  2. use the newer HTableDescriptor.addCoprocessor() method.

If you use the first method, you need to create a key that must start with COPROCESSOR, and the value has to conform to the following format:

[<path-to-jar>]|<classname>|[<priority>][|key1=value1,key2=value2,...]

Here is an example that defines a few coprocessors, the first with system-level priority, the others with user-level priorities:

'COPROCESSOR$1' => \
  'hdfs://localhost:8020/users/leon/test.jar|coprocessor.Test|2147483647'
'COPROCESSOR$2' => \
  '/Users/laura/test2.jar|coprocessor.AnotherTest|1073741822'
'COPROCESSOR$3' => \
  '/home/kg/advacl.jar|coprocessor.AdvancedAcl|1073741823|keytab=/etc/keytab'
'COPROCESSOR$99' => '|com.foo.BarCoprocessor|'

The key is a combination of the prefix COPROCESSOR, a dollar sign as a divider, and an ordering number, for example: COPROCESSOR$1. Using the $<number> postfix for the key enforces the order in which the definitions, and therefore the coprocessors, are loaded. This is especially interesting and important when loading multiple coprocessors with the same priority value. When you use the addCoprocessor() method to add a coprocessor to a table descriptor, the method will look for the highest assigned number and use the next free one after that. It starts out at 1, and increments by one from there.

The value is composed of three to four parts, serving the following purpose:

path-to-jar

Optional — The path can either be a fully qualified HDFS location, or any other path supported by the Hadoop FileSystem class. The second (and third) coprocessor definition, for example, uses a local path instead. If left empty, the coprocessor class must be accessible through the already configured class path.

If you specify a path in HDFS (or any other non-local file system URI), the coprocessor class loader support will first copy the JAR file to a local location, similar to what was explained in “Custom Filters”. The difference is that the file is located in a further subdirectory named tmp, for example /data/tmp/hbase-hadoop/local/jars/tmp/. The name of the JAR is also changed to a unique internal name, using the following pattern:

.<path-prefix>.<jar-filename>.<current-timestamp>.jar

The path prefix is usually a random UUID. Here is a complete example:

$ $ ls -A /data/tmp/hbase-hadoop/local/jars/tmp/
.c20a1e31-7715-4016-8fa7-b69f636cb07c.hbase-book-ch04.jar.1434434412813.jar

The local file is deleted upon normal server process termination.

classname

Required — This defines the actual implementation class. While the JAR may contain many coprocessor classes, only one can be specified per table attribute. Use the standard Java package name conventions to specify the class.

priority

Optional — The priority must be a number between the boundaries explained in Table 4-12. If not specified, it defaults to Coprocessor.PRIORITY_USER, in other words 1073741823. You can set any priority to indicate the proper execution order of the coprocessors. In the above example you can see that coprocessor #2 has a one-lower priority compared to #3. This would cause #3 to be called before #2 in the chain of events.

key=value

Optional — These are key/value parameters that are added to the configuration handed into the coprocessor, and retrievable by calling CoprocessorEnvironment.getConfiguration() from, for example, the start() method. For example:

private String keytab;

@Override
public void start(CoprocessorEnvironment env) throws IOException {
  this.keytab = env.getConfiguration().get("keytab");
}

The above getConfiguration() call is returning the current server configuration file, merged with any optional parameter specified in the coprocessor declaration. The former is the hbase-site.xml, merged with the provided hbase-default.xml, and all changes made through any previous dynamic configuration update. Since this is then merged with the per-coprocessor parameters (if there are any), it is advisable to use a specific, unique prefix for the keys to not accidentally override any of the HBase settings. For example, a key with a prefix made from the coprocessor class, plus its assigned value, could look like this: com.foobar.copro.ReferentialIntegrity.table.main=production:users.

Note

It is advised to avoid using extra whitespace characters in the coprocessor definition. The parsing should take care of all leading or trailing spaces, but if in doubt try removing them to eliminate any possible parsing quirks.

The last coprocessor definition in the example is the shortest possible, omitting all optional parts. All that is needed is the class name, as shown, while retaining the dividing pipe symbols. Example 4-27 shows how this can be done using the administrative API for HBase.

Example 4-27. Load a coprocessor using the table descriptor
public class LoadWithTableDescriptorExample {

  public static void main(String[] args) throws IOException {
    Configuration conf = HBaseConfiguration.create();
    Connection connection = ConnectionFactory.createConnection(conf);
    TableName tableName = TableName.valueOf("testtable");

    HTableDescriptor htd = new HTableDescriptor(tableName); 1
    htd.addFamily(new HColumnDescriptor("colfam1"));
    htd.setValue("COPROCESSOR$1", "|" + 2
      RegionObserverExample.class.getCanonicalName() +
      "|" + Coprocessor.PRIORITY_USER);

    Admin admin = connection.getAdmin(); 3
    admin.createTable(htd);

    System.out.println(admin.getTableDescriptor(tableName)); 4
    admin.close();
    connection.close();
  }
}
1

Define a table descriptor.

2

Add the coprocessor definition to the descriptor, while omitting the path to the JAR file.

3

Acquire an administrative API to the cluster and add the table.

4

Verify if the definition has been applied as expected.

Using the second approach, using the addCoprocessor() method provided by the descriptor class, simplifies all of this, as shown in Example 4-28. It will compute the next free coprocessor key using the above rules, and assign the value in the proper format.

Example 4-28. Load a coprocessor using the table descriptor using provided method
    HTableDescriptor htd = new HTableDescriptor(tableName) 1
      .addFamily(new HColumnDescriptor("colfam1"))
      .addCoprocessor(RegionObserverExample.class.getCanonicalName(),
        null, Coprocessor.PRIORITY_USER, null); 2

    Admin admin = connection.getAdmin();
    admin.createTable(htd);
1

Use fluent interface to create and configure the instance.

2

Use the provided method to add the coprocessor.

The examples omit setting the JAR file name since we assume the same test setup as before, and earlier we have added the JAR file to the hbase-env.sh file. With that, the coprocessor class is part of the server class path and we can skip setting it again. Running the examples against the assumed local, standalone HBase setup should emit the following:

'testtable', {TABLE_ATTRIBUTES => {METADATA => { \
  'COPROCESSOR$1' => '|coprocessor.RegionObserverExample|1073741823'}}, \
  {NAME => 'colfam1', DATA_BLOCK_ENCODING => 'NONE', BLOOMFILTER => 'ROW', \
   REPLICATION_SCOPE => '0', VERSIONS => '1', COMPRESSION => 'NONE', \
   MIN_VERSIONS => '0', TTL => 'FOREVER', KEEP_DELETED_CELLS => 'FALSE', \
   BLOCKSIZE => '65536', IN_MEMORY => 'false', BLOCKCACHE => 'true'}

The coprocessor definition has been successfully applied to the table schema. Once the table is enabled and the regions are opened, the framework will first load the configuration coprocessors and then the ones defined in the table descriptor. The same considerations as mentioned before apply here as well: be careful to not slow down the region deployment process by long running, or resource intensive, operations in your lifecycle callbacks, and avoid any exceptions being thrown or the server process might be ended.

The difference here is that for table coprocessors there is a configuration property named hbase.coprocessor.abortonerror, which you can set to true or false, indicating what you want to happen if an error occurs during the initialization of a coprocessor class. The default is true, matching the behavior of the configuration-loaded coprocessors. Setting it to false will simply log the error that was encountered, but move on with business as usual. Of course, the erroneous coprocessor will neither be loaded nor be active.

Loading from HBase Shell

If you want to load coprocessors while HBase is running, there is an option to dynamically load the necessary classes and containing JAR files. This is accomplished using the table descriptor and the alter call, provided by the administrative API (see “Table Operations”) and exposed through the HBase Shell. The process is to update the table schema and then reload the table regions. The shell does this in one call, as shown in the following example:

hbase(main):001:0> alter 'testqauat:usertable', \
  'coprocessor' => 'file:///opt/hbase-book/hbase-book-ch05-2.0.jar| \
  coprocessor.SequentialIdGeneratorObserver|'
Updating all regions with the new schema...
1/11 regions updated.
6/11 regions updated.
11/11 regions updated.
Done.
0 row(s) in 5.0540 seconds

hbase(main):002:0> describe 'testqauat:usertable'
Table testqauat:usertable is ENABLED
testqauat:usertable, {TABLE_ATTRIBUTES => {coprocessor$1 => \
 'file:///opt/hbase-book/hbase-book-ch05-2.0.jar|coprocessor \
  .SequentialIdGeneratorObserver|'}
COLUMN FAMILIES DESCRIPTION
{NAME => 'cf1', DATA_BLOCK_ENCODING => 'NONE', BLOOMFILTER => 'ROW', \
  REPLICATION_SCOPE => '0', COMPRESSION => 'NONE', VERSIONS => '1', \
  TTL => 'FOREVER', MIN_VERSIONS => '0', KEEP_DELETED_CELLS => 'FALSE', \
  BLOCKSIZE => '65536', IN_MEMORY => 'false', BLOCKCACHE => 'true'}
1 row(s) in 0.0220 seconds

The second command uses describe to verify the coprocessor was set, and what the assigned key for it is, here coprocessor$1. As for the path used for the JAR file, keep in mind that it is considered the source for the JAR file, and that it is copied into the local temporary location before being loaded into the Java process as explained above. You can use the region server UI to verify that the class has been loaded successfully, by checking the Software Attributes section at the end of the status page. In this table there is a line listing the loaded coprocessor classes, as shown in Figure 4-5.

uirsswattrcp
Figure 4-5. The Region Server status page lists the loaded coprocessors
Tip

While you will learn more about the HBase Shell in “Namespace and Data Definition Commands”, a quick tip about using the alter command to add a table attribute: You can omit the METHOD => 'table_att' parameter as shown above, because adding/setting a parameter is the assumed default operation. Only for removing an attribute do you have to explicitly specify the method, as shown next when removing the previously set coprocessor.

Once a coprocessor is loaded, you can also remove them in the same dynamic fashion, that is, using the HBase Shell to update the schema and reload the affected table regions on all region servers in one single command:

hbase(main):003:0> alter 'testqauat:usertable', METHOD => 'table_att_unset', \
  NAME => 'coprocessor$1'
Updating all regions with the new schema...
2/11 regions updated.
8/11 regions updated.
11/11 regions updated.
Done.
0 row(s) in 4.2160 seconds

hbase(main):004:0> describe 'testqauat:usertable'
Table testqauat:usertable is ENABLED
testqauat:usertable
COLUMN FAMILIES DESCRIPTION
{NAME => 'cf1', DATA_BLOCK_ENCODING => 'NONE', BLOOMFILTER => 'ROW', \
  REPLICATION_SCOPE => '0', COMPRESSION => 'NONE', VERSIONS => '1', \
  TTL => 'FOREVER', MIN_VERSIONS => '0', KEEP_DELETED_CELLS => 'FALSE',
  BLOCKSIZE => '65536', IN_MEMORY => 'false', BLOCKCACHE => 'true'}
1 row(s) in 0.0180 seconds

Removing a coprocessor requires you to know its key in the table schema. We have already retrieved that one earlier with the describe command shown in the example. The unset (which removes the table schema attribute) operation removes the key named coprocessor$1, which was the said key we determined earlier. After all regions are reloaded, we can use the describe command again to check if coprocessor reference has indeed be removed, which is the case here.

Loading coprocessors using the dynamic table schema approach bears the same burden as mentioned before: you cannot unload classes or JAR files, therefore you may have to restart the region server process for an update of the classes. You could work around for a limited amount of time by versioning the class and JAR file names, but the loaded classes may cause memory pressure eventually and force you to cycle the processes.

Endpoints

The first of two major features provided by the coprocessor framework that we are going to look at are endpoints. They solve a problem with moving data for analytical queries, that would benefit from pre-calculating intermediate results where the data resides, and then just shipping the results back to the client. Sounds familiar? Yes, this is what MapReduce does in Hadoop, that is, ship the code to the data, do the computation, and persist the results.

An inherent feature of MapReduce is that it has intrinsic knowledge of what datanode is holding which block of information. When you execute a job, the NameNode will instruct the scheduler to ship the code to all nodes that contain data that is part of job parameters. With HBase, we could run a client-side scan that ships all the data to the client to do the computation. But at scale, this will not be efficient, because the inertia of data exceeds the amount of processing performed. In other words, all the time is spent in moving the data, the I/O.

What we need instead is the ability, just as with MapReduce, to ship the processing to the servers, do the aggregation or any other computation on the server-side, and only return the much smaller results back to the client. And that, in a nutshell, is what Endpoints are all about. You instruct the servers to load code with every region of a given table, and when you need to scan the table, partially or completely, it will call the server-side code, which then can scan the necessary data where it resides: on the data servers.

Once the computation is completed, the results are shipped back to the client, one result per region, and aggregated there for the final result. For example, if you were to have 1,000 regions and 1 million columns, and you want to summarize the stored data, you would receive 1,000 decimal numbers on the client side—one for each region. This is fast to aggregate for the final result. If you were to scan the entire table using a purely client API approach, in a worst-case scenario you would transfer all 1 million numbers to build the sum.

The Service Interface

Endpoints are implemented as an extension to the RPC protocol between the client and server. In the past (before HBase 0.96) this was done by literally extending the protocol classes. After the move to the Protocol Buffer (Protobuf for short) based RPC, adding custom services on the server side was greatly simplified. The payload is serialized as a Protobuf message and sent from client to server (and back again) using the provided coprocessor services API.

In order to provide an endpoint to clients, a coprocessor generates a Protobuf implementation that extends the Service class. This service can define any methods that the coprocessor wishes to expose. Using the generated classes, you can communicate with the coprocessor instances via the following calls, provided by Table:

CoprocessorRpcChannel coprocessorService(byte[] row)

<T extends Service, R> Map<byte[],R> coprocessorService(final Class<T> service,
  byte[] startKey, byte[] endKey, final Batch.Call<T,R> callable)
  throws ServiceException, Throwable
<T extends Service, R> void coprocessorService(final Class<T> service,
  byte[] startKey, byte[] endKey, final Batch.Call<T,R> callable,
  final Batch.Callback<R> callback) throws ServiceException, Throwable

<R extends Message> Map<byte[], R> batchCoprocessorService(
  Descriptors.MethodDescriptor methodDescriptor, Message request,
  byte[] startKey, byte[] endKey, R responsePrototype)
  throws ServiceException, Throwable
<R extends Message> void batchCoprocessorService(
  Descriptors.MethodDescriptor methodDescriptor,
  Message request, byte[] startKey, byte[] endKey, R responsePrototype,
  Batch.Callback<R> callback) throws ServiceException, Throwable

Since Service instances are associated with individual regions within a table, the client RPC calls must ultimately identify which regions should be used in the service’s method invocations. Though regions are seldom handled directly in client code and the region names may change over time, the coprocessor RPC calls use row keys to identify which regions should be used for the method invocations. Clients can call Service methods against one of the following:

Single Region

This is done by calling coprocessorService() with a single row key. This returns an instance of the CoprocessorRpcChannel class, which directly extends Protobuf classes. It can be used to invoke any endpoint call linked to the region containing the specified row. Note that the row does not need to exist: the region selected is the one that does or would contain the given key.

Ranges of Regions

You can call coprocessorService() with a start row key and an end row key. All regions in the table from the one containing the start row key to the one containing the end row key (inclusive) will be used as the endpoint targets. This is done in parallel up to the amount of threads configured in the executor pool instance in use.

Batched Regions

If you call batchCoprocessorService() instead, you still parallelize the execution across all regions, but calls to the same region server are sent together in a single invocation. This will cut down the number of network roundtrips, and is especially useful when the expected results of each endpoint invocation is very small.

Note

The row keys passed as parameters to the Table methods are not passed to the Service implementations. They are only used to identify the regions for endpoints of the remote calls. As mentioned, they do not have to actually exist, they merely identify the matching regions by start and end key boundaries.

Some of the table methods to invoke endpoints are using the Batch class, which you have seen in action in “Batch Operations” before. The abstract class defines two interfaces used for Service invocations against multiple regions: clients implement Batch.Call to call methods of the actual Service implementation instance. The interface’s call() method will be called once per selected region, passing the Service implementation instance for the region as a parameter.

Clients can optionally implement Batch.Callback to be notified of the results from each region invocation as they complete. The instance’s

void update(byte[] region, byte[] row, R result)

method will be called with the value returned by

R call(T instance)

from each region. You can see how the actual service type "T", and return type "R" are specified as Java generics: they depend on the concrete implementation of an endpoint, that is, the generated Java classes based on the Protobuf message declaring the service, methods, and their types.

Implementing Endpoints

Implementing an endpoint involves the following two steps:

  1. Define the Protobuf service and generate classes

    This specifies the communication details for the endpoint: it defines the RPC service, its methods, and messages used between the client and the servers. With the help of the Protobuf compiler the service definition is compiled into custom Java classes.

  2. Extend the generated, custom Service subclass

    You need to provide the actual implementation of the endpoint by extending the generated, abstract class derived from the Service superclass.

The following defines a Protobuf service, named RowCountService, with methods that a client can invoke to retrieve the number of rows and Cells in each region where it is running. Following Maven project layout rules, they go into ${PROJECT_HOME}/src/main/protobuf, here with the name RowCountService.proto:

option java_package = "coprocessor.generated";
option java_outer_classname = "RowCounterProtos";
option java_generic_services = true;
option java_generate_equals_and_hash = true;
option optimize_for = SPEED;

message CountRequest {
}

message CountResponse {
  required int64 count = 1 [default = 0];
}

service RowCountService {
  rpc getRowCount(CountRequest)
    returns (CountResponse);
  rpc getCellCount(CountRequest)
    returns (CountResponse);
}

The file defines the output class name, the package to use during code generation and so on. The last thing in step #1 is to compile the definition file into code, which is accomplished by using the Protobuf protoc tool.

Tip

The Protocol Buffer library usually comes as a source package that needs to be compiled and locally installed. There are also pre-built binary packages for many operating systems. On OS X, for example, you can run the following, assuming Homebrew was installed:

$ brew install protobuf

You can verify the installation by running $ protoc --version and check it prints a version number:

$ protoc --version
libprotoc 2.6.1

The online code repository of the book has a script bin/doprotoc.sh that runs the code generation. It essentially runs the following command from the repository root directory:

$ protoc -Ich04/src/main/protobuf --java_out=ch04/src/main/java \
  ch04/src/main/protobuf/RowCountService.proto

This will place the generated class file in the source directory, as specified. After that you will be able to use the generated types. Step #2 is to flesh out the generated code, since it creates an abstract class for you. All the declared RPC methods need to be implemented with the user code. This is done by extending the generated class, plus merging in the Coprocessor and CoprocessorService interface functionality. The latter two define the lifecycle callbacks, plus flagging the class as a service. Example 4-29 shows this for the above row-counter service, using the coprocessor environment provided to access the region, and eventually the data with an InternalScanner instance.

Example 4-29. Example endpoint implementation, adding a row and cell count method.
public class RowCountEndpoint extends RowCounterProtos.RowCountService
  implements Coprocessor, CoprocessorService {

  private RegionCoprocessorEnvironment env;

  @Override
  public void start(CoprocessorEnvironment env) throws IOException {
    if (env instanceof RegionCoprocessorEnvironment) {
      this.env = (RegionCoprocessorEnvironment) env;
    } else {
      throw new CoprocessorException("Must be loaded on a table region!");
    }
  }

  @Override
  public void stop(CoprocessorEnvironment env) throws IOException {
    // nothing to do when coprocessor is shutting down
  }

  @Override
  public Service getService() {
    return this;
  }

  @Override
  public void getRowCount(RpcController controller,
    RowCounterProtos.CountRequest request,
    RpcCallback<RowCounterProtos.CountResponse> done) {
    RowCounterProtos.CountResponse response = null;
    try {
      long count = getCount(new FirstKeyOnlyFilter(), false);
      response = RowCounterProtos.CountResponse.newBuilder()
        .setCount(count).build();
    } catch (IOException ioe) {
      ResponseConverter.setControllerException(controller, ioe);
    }
    done.run(response);
  }

  @Override
  public void getCellCount(RpcController controller,
    RowCounterProtos.CountRequest request,
    RpcCallback<RowCounterProtos.CountResponse> done) {
    RowCounterProtos.CountResponse response = null;
    try {
      long count = getCount(null, true);
      response = RowCounterProtos.CountResponse.newBuilder()
        .setCount(count).build();
    } catch (IOException ioe) {
      ResponseConverter.setControllerException(controller, ioe);
    }
    done.run(response);
  }

  /**
   * Helper method to count rows or cells.
   * *
   * @param filter The optional filter instance.
   * @param countCells Hand in <code>true</code> for cell counting.
   * @return The count as per the flags.
   * @throws IOException When something fails with the scan.
   */
  private long getCount(Filter filter, boolean countCells)
  throws IOException {
    long count = 0;
    Scan scan = new Scan();
    scan.setMaxVersions(1);
    if (filter != null) {
      scan.setFilter(filter);
    }
    try (
      InternalScanner scanner = env.getRegion().getScanner(scan);
    ) {
      List<Cell> results = new ArrayList<Cell>();
      boolean hasMore = false;
      byte[] lastRow = null;
      do {
        hasMore = scanner.next(results);
        for (Cell cell : results) {
          if (!countCells) {
            if (lastRow == null || !CellUtil.matchingRow(cell, lastRow)) {
              lastRow = CellUtil.cloneRow(cell);
              count++;
            }
          } else count++;
        }
        results.clear();
      } while (hasMore);
    }
    return count;
  }
}

Note how the FirstKeyOnlyFilter is used to reduce the number of columns being scanned, in case of performing a row count operation. For small rows, this will not yield much of an improvement, but for tables with very wide rows, skipping all remaining columns (and more so cells if you enabled multi-versioning) of a row can speed up the row count tremendously.

Note

You need to add (or amend from the previous examples) the following to the hbase-site.xml file for the endpoint coprocessor to be loaded by the region server process:

<property>
  <name>hbase.coprocessor.user.region.classes</name>
  <value>coprocessor.RowCountEndpoint</value>
</property>

Just as before, restart HBase after making these adjustments.

Example 4-30 showcases how a client can use the provided calls of Table to execute the deployed coprocessor endpoint functions. Since the calls are sent to each region separately, there is a need to summarize the total number at the end.

Example 4-30. Example using the custom row-count endpoint
public class EndpointExample {

  public static void main(String[] args) throws IOException {
    Configuration conf = HBaseConfiguration.create();
    TableName tableName = TableName.valueOf("testtable");
    Connection connection = ConnectionFactory.createConnection(conf);
    Table table = connection.getTable(tableName);
    try {
      final RowCounterProtos.CountRequest request =
        RowCounterProtos.CountRequest.getDefaultInstance();
      Map<byte[], Long> results = table.coprocessorService(
        RowCounterProtos.RowCountService.class, 1
        null, null, 2
        new Batch.Call<RowCounterProtos.RowCountService, Long>() { 3
          public Long call(RowCounterProtos.RowCountService counter)
          throws IOException {
            BlockingRpcCallback<RowCounterProtos.CountResponse> rpcCallback =
              new BlockingRpcCallback<RowCounterProtos.CountResponse>();
            counter.getRowCount(null, request, rpcCallback); 4
            RowCounterProtos.CountResponse response = rpcCallback.get();
            return response.hasCount() ? response.getCount() : 0;
          }
        }
      );

      long total = 0;
      for (Map.Entry<byte[], Long> entry : results.entrySet()) { 5
        total += entry.getValue().longValue();
        System.out.println("Region: " + Bytes.toString(entry.getKey()) +
          ", Count: " + entry.getValue());
      }
      System.out.println("Total Count: " + total);
    } catch (Throwable throwable) {
      throwable.printStackTrace();
    }
  }
}
1

Define the protocol interface being invoked.

2

Set start and end row key to “null” to count all rows.

3

Create an anonymous class to be sent to all region servers.

4

The call() method is executing the endpoint functions.

5

Iterate over the returned map, containing the result for each region separately.

The code emits the region names, the count for each of them, and eventually the grand total:

Before endpoint call...
Cell: row1/colfam1:qual1/2/Put/vlen=4/seqid=0, Value: val2
Cell: row1/colfam2:qual1/2/Put/vlen=4/seqid=0, Value: val2
...
Cell: row5/colfam1:qual1/2/Put/vlen=4/seqid=0, Value: val2
Cell: row5/colfam2:qual1/2/Put/vlen=4/seqid=0, Value: val2
Region: testtable,,1427209872848.6eab8b854b5868ec...a66e83ea822c., Count: 2
Region: testtable,row3,1427209872848.3afd10e33044...8e071ce165ce., Count: 3
Total Count: 5

Example 4-31 slightly modifies the example to use the batch calls, that is, where all calls to a region server are grouped and sent together, for all hosted regions of that server.

Example 4-31. Example using the custom row-count endpoint in batch mode
      final CountRequest request = CountRequest.getDefaultInstance();
      Map<byte[], CountResponse> results = table.batchCoprocessorService(
        RowCountService.getDescriptor().findMethodByName("getRowCount"),
        request, HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW,
        CountResponse.getDefaultInstance());

      long total = 0;
      for (Map.Entry<byte[], CountResponse> entry : results.entrySet()) {
        CountResponse response = entry.getValue();
        total += response.hasCount() ? response.getCount() : 0;
        System.out.println("Region: " + Bytes.toString(entry.getKey()) +
          ", Count: " + entry.getValue());
      }
      System.out.println("Total Count: " + total);

The output is the same (the region name will vary for every execution of the example, as it contains the time a region was created), so we can refrain here from showing it again. Also, for such a small example, and especially running on a local test rig, the difference of either call is none. It will really show when you have many regions per server, and the returned data is very small: only then the cost of the RPC roundtrips are noticeable.

Note

Example 4-31 does not use null for the start and end keys, but rather HConstants.EMPTY_START_ROW and HConstants.EMPTY_END_ROW, as provided by the API classes. This is synonym to not specifying the keys at all.6

If you want to perform additional processing on the results, you can further extend the Batch.Call code. This can be seen in Example 4-32, which combines the row and cell count for each region.

Example 4-32. Example extending the batch call to execute multiple endpoint calls
      final RowCounterProtos.CountRequest request =
        RowCounterProtos.CountRequest.getDefaultInstance();
      Map<byte[], Pair<Long, Long>> results = table.coprocessorService(
        RowCounterProtos.RowCountService.class,
        null, null,
        new Batch.Call<RowCounterProtos.RowCountService, Pair<Long, Long>>() {
          public Pair<Long, Long> call(RowCounterProtos.RowCountService counter)
          throws IOException {
            BlockingRpcCallback<RowCounterProtos.CountResponse> rowCallback =
              new BlockingRpcCallback<RowCounterProtos.CountResponse>();
            counter.getRowCount(null, request, rowCallback);

            BlockingRpcCallback<RowCounterProtos.CountResponse> cellCallback =
              new BlockingRpcCallback<RowCounterProtos.CountResponse>();
            counter.getCellCount(null, request, cellCallback);

            RowCounterProtos.CountResponse rowResponse = rowCallback.get();
            Long rowCount = rowResponse.hasCount() ?
              rowResponse.getCount() : 0;

            RowCounterProtos.CountResponse cellResponse = cellCallback.get();
            Long cellCount = cellResponse.hasCount() ?
              cellResponse.getCount() : 0;

            return new Pair<Long, Long>(rowCount, cellCount);
          }
        }
      );

      long totalRows = 0;
      long totalKeyValues = 0;
      for (Map.Entry<byte[], Pair<Long, Long>> entry : results.entrySet()) {
        totalRows += entry.getValue().getFirst().longValue();
        totalKeyValues += entry.getValue().getSecond().longValue();
        System.out.println("Region: " + Bytes.toString(entry.getKey()) +
          ", Count: " + entry.getValue());
      }
      System.out.println("Total Row Count: " + totalRows);
      System.out.println("Total Cell Count: " + totalKeyValues);

Running the code will yield the following output:

Region: testtable,,1428306403441.94e36bc7ab66c0e535dc3c21d9755ad6., Count: {2,4}
Region: testtable,row3,1428306403441.720b383e551e96cd290bd4b74b472e11., Count: {3,6}
Total Row Count: 5
Total KeyValue Count: 10

The examples so far all used the coprocessorService() calls to batch the requests across all regions, matching the given start and end row keys. Example 4-33 uses the single-row coprocessorService() call to get a local, client-side proxy of the endpoint. Since a row key is specified, the client API will route the proxy calls to the region—and to the server currently hosting it—that contains the given key (again, regardless of whether it actually exists or not: regions are specified with a start and end key only, so the match is done by range only).

Example 4-33. Example using the proxy call of HTable to invoke an endpoint on a single region
      HRegionInfo hri = admin.getTableRegions(tableName).get(0);
      Scan scan = new Scan(hri.getStartKey(), hri.getEndKey())
        .setMaxVersions();
      ResultScanner scanner = table.getScanner(scan);
      for (Result result : scanner) {
        System.out.println("Result: " + result);
      }

      CoprocessorRpcChannel channel = table.coprocessorService(
        Bytes.toBytes("row1"));
      RowCountService.BlockingInterface service =
        RowCountService.newBlockingStub(channel);
      CountRequest request = CountRequest.newBuilder().build();
      CountResponse response = service.getCellCount(null, request);
      long cellsInRegion = response.hasCount() ? response.getCount() : -1;
      System.out.println("Region Cell Count: " + cellsInRegion);

      request = CountRequest.newBuilder().build();
      response = service.getRowCount(null, request);
      long rowsInRegion = response.hasCount() ? response.getCount() : -1;
      System.out.println("Region Row Count: " + rowsInRegion);

The output will be:

Result: keyvalues={row1/colfam1:qual1/2/Put/vlen=4/seqid=0,
                   row1/colfam1:qual1/1/Put/vlen=4/seqid=0,
                   row1/colfam2:qual1/2/Put/vlen=4/seqid=0,
                   row1/colfam2:qual1/1/Put/vlen=4/seqid=0}
Result: keyvalues={row2/colfam1:qual1/2/Put/vlen=4/seqid=0,
                   row2/colfam1:qual1/1/Put/vlen=4/seqid=0,
                   row2/colfam2:qual1/2/Put/vlen=4/seqid=0,
                   row2/colfam2:qual1/1/Put/vlen=4/seqid=0}
Region Cell Count: 4
Region Row Count: 2

The local scan differs from the numbers returned by the endpoint, which is caused by the coprocessor code setting setMaxVersions(1), while the local scan omits the limit and returns all versions of any cell in that same region. It shows once more how careful you should be to set these parameters to what is expected by the clients. If in doubt, you could make the maximum version a parameter that is passed to the endpoint through the Request implementation.

With the proxy reference, you can invoke any remote function defined in your derived Service implementation from within client code, and it returns the result for the region that served the request. Figure 4-6 shows the difference between the two approaches offered by coprocessorService(): single and multi region coverage.

coprorpc
Figure 4-6. Coprocessor calls batched and executed in parallel, and addressing a single region only

Observers

While endpoints somewhat reflect the functionality of database stored procedures, the observers are akin to triggers. The difference to endpoints is that observers are not only running in the context of a region. They can run in many different parts of the system and react to events that are triggered by clients, but also implicitly by servers themselves. For example, when one of the servers is recovering a region after another server has failed. Or when the master is taking actions on the cluster state, etc.

Another difference is that observers are using pre-defined hooks into the server processes, that is, you cannot add your own custom ones. They also act on the server side only, with no connection to the client. What you can do though is combine an endpoint with an observer for region-related functionality, exposing observer state through a custom RPC API (see Example 4-34).

Since you can load many observers into the same set of contexts, that is, region, region server, master server, WAL, bulk loading, and endpoints, it is crucial to set the order of their invocation chain appropriately. We discussed that in “Coprocessor Loading”, looking into the priority and ordering dependent on how they are declared. Once loaded, the observers are chained together and executed in that order.

The ObserverContext Class

So far we have talked about the general architecture of coprocessors, their super class, how they are loaded into the server process, and how to implement endpoints. Before we can move on into the actual observers, we need to introduce one more basic class. For the callbacks provided by the Observer classes, there is a special context handed in as the first parameter to all calls: an instance of the ObserverContext class. It provides access to the current environment, but also adds the interesting ability to indicate to the coprocessor framework what it should do after a callback is completed.

Note

The observer context instance is the same for all coprocessors in the execution chain, but with the environment swapped out for each coprocessor.

Here are the methods as provided by the context class:

E getEnvironment()

Returns the reference to the current coprocessor environment. It is paramterized to return the matching environment for a specific coprocessor implementation. A RegionObserver for example would be presented with an implementation instance of the RegionCoprocessorEnvironment interface.

void prepare(E env)

Prepares the context with the specified environment. This is used internally only by the static createAndPrepare() method.

void bypass()

When your code invokes this method, the framework is going to use your provided value, as opposed to what usually is returned by the calling method.

void complete()

Indicates to the framework that any further processing can be skipped, skipping the remaining coprocessors in the execution chain. It implies that this coprocessor’s response is definitive.

boolean shouldBypass()

Used internally by the framework to check on the bypass flag.

boolean shouldComplete()

Used internally by the framework to check on the complete flag.

static <T extends CoprocessorEnvironment> ObserverContext<T> createAndPrepare(T env, ObserverContext<T> context)

Static function to initialize a context. When the provided context is null, it will create a new instance.

The important context functions are bypass() and complete(). These functions give your coprocessor implementation the option to control the subsequent behavior of the framework. The complete() call influences the execution chain of the coprocessors, while the bypass() call stops any further default processing on the server within the current observer. For example, you could avoid automated region splits like so:

@Override
public void preSplit(ObserverContext<RegionCoprocessorEnvironment> e) {
  e.bypass();
  e.complete();
}

There is a subtle difference between bypass and complete that needs to be clarified: they are serving different purposes, with different effects dependent on their usage. The following table lists the usual effects of either flag on the current and subsequent coprocessors, and when used in the pre or post hooks.

Table 4-15. Overview of bypass and complete, and their effects on coprocessors
Bypass Complete Current - Pre Subsequent - Pre Current - Post Subsequent - Post

no effect

no effect

no effect

no effect

skip further processing

no effect

no effect

no effect

no effect

skip

no effect

skip

skip further processing

skip

no effect

skip

Note that there are exceptions to the rule, that is, some pre hooks cannot honor the bypass flag, etc. Setting bypass for post hooks usually make no sense, since there is little to nothing left to bypass. Consult the JavaDoc for each callback to learn if (and how) it honors the bypass flag.

The RegionObserver Class

The first observer subclass of Coprocessor we will look into is the one used at the region level: the RegionObserver class. For the sake of brevity, all parameters and exceptions are omitted when referring to the observer calls. Please read the online documentation for the full specification.7 Note that all calls of this observer class have the same first parameter (denoted as part of the “…” in the calls below), ObserverContext<RegionCoprocessorEnvironment> ctx8, providing access to the context instance. The context is explained in “The ObserverContext Class”, while the special environment class is explained in “The RegionCoprocessorEnvironment Class”.

The operations can be divided into two groups: region life-cycle changes and client API calls. We will look into both in that order, but before we do, there is a generic callback for many operations of both kinds:

enum Operation {
  ANY, GET, PUT, DELETE, SCAN, APPEND, INCREMENT, SPLIT_REGION,
  MERGE_REGION, BATCH_MUTATE, REPLAY_BATCH_MUTATE, COMPACT_REGION
}

postStartRegionOperation(..., Operation operation)
postCloseRegionOperation(..., Operation operation)

These methods in a RegionObserver are invoked when any of the possible Operations listed is called. It gives the coprocessor the ability to take invasive, or more likely, evasive actions, such as throwing an exception to stop the operation from taking place altogether.

Handling Region Life-Cycle Events

While [Link to Come] explains the region life-cycle, Figure 4-7 shows a simplified form.

hbas 0404
Figure 4-7. The coprocessor reacting to life-cycle state changes of a region

The observers have the opportunity to hook into the pending open, open, and pending close state changes. For each of them there is a set of hooks that are called implicitly by the framework.

State: pending open

A region is in this state when it is about to be opened. Observing coprocessors can either piggyback or fail this process. To do so, the following callbacks in order of their invocation are available:

postLogReplay(...)

preOpen(...)
preStoreFileReaderOpen(...)
postStoreFileReaderOpen(...)
preWALRestore(...) / postWALRestore(...)
postOpen(...)

These methods are called just before the region is opened, before and after the store files are opened in due course, the WAL being replayed, and just after the region was opened. Your coprocessor implementation can use them, for instance, to indicate to the framework—in the preOpen() call—that it should abort the opening process. Or hook into the postOpen() call to trigger a cache warm up, and so on.

The first event, postLogReplay(), is triggered dependent on what WAL recovery mode is configured: distributed log splitting or log replay (see [Link to Come] and the hbase.master.distributed.log.replay configuration property). The former runs before a region is opened, and would therefore be triggering the callback first. The latter opens the region, and then replays the edits, triggering the callback after the region open event.

In both recovery modes, but again dependent on which is active, the region server may have to apply records from the write-ahead log (WAL). This, in turn, invokes the pre/postWALRestore() methods of the observer. In case of using the distributed log splitting, this will take place after the pending open, but just before the open state. Otherwise, this is called after the open event, as edits are replayed. Hooking into these WAL calls gives you fine-grained control over what mutation is applied during the log replay process. You get access to the edit record, which you can use to inspect what is being applied.

State: open

A region is considered open when it is deployed to a region server and fully operational. At this point, all the operations discussed throughout the book can take place; for example, the region’s in-memory store could be flushed to disk, or the region could be split when it has grown too large. The possible hooks are:

preFlushScannerOpen(...)
preFlush(...) / postFlush(...)

preCompactSelection(...) / postCompactSelection(...)
preCompactScannerOpen(...)
preCompact(...) / postCompact(...)

preSplit(...)
preSplitBeforePONR(...)
preSplitAfterPONR(...)
postSplit(...)
postCompleteSplit(...) / preRollBackSplit(...) / postRollBackSplit(...)

This should be quite intuitive by now: the pre calls are executed before, while the post calls are executed after the respective operation. For example, using the preSplit() hook, you could effectively disable the built-in region splitting process and perform these operations manually. Some calls are only available as pre-hooks, some only as post-hooks.

The hooks for flush, compact, and split are directly linked to the matching region housekeeping functions. There are also some more specialized hooks, that happen as part of those three functions. For example, the preFlushScannerOpen() is called when the scanner for the memstore (bear with me here, [Link to Come] will explain all the workings later) is set up. This is just before the actual flush takes place.

Similarly, for compactions, first the server selects the files included, which is wrapped in coprocessor callbacks (postfixed CompactSelection). After that the store scanners are opened and, finally, the actual compaction happens.

For splits, there are callbacks reflecting current stage, with a particular point-of-no-return (PONR) in between. This occurs, after the split process started, but before any definitive actions have taken place. Splits are handled like a transaction internally, and when this transaction is about to be committed, the preSplitBeforePONR() is invoked, and the preSplitAfterPONR() right after. There is also a final completed or rollback call, informing you of the outcome of the split transaction.

State: pending close

The last group of hooks for the observers is for regions that go into the pending close state. This occurs when the region transitions from open to closed. Just before, and after, the region is closed the following hooks are executed:

preClose(...,  boolean abortRequested)
postClose(..., boolean abortRequested)

The abortRequested parameter indicates why a region was closed. Usually regions are closed during normal operation, when, for example, the region is moved to a different region server for load-balancing reasons. But there also is the possibility for a region server to have gone rogue and be aborted to avoid any side effects. When this happens, all hosted regions are also aborted, and you can see from the given parameter if that was the case.

On top of that, this class also inherits the start() and stop() methods, allowing the allocation, and release, of lifetime resources.

Handling Client API Events

As opposed to the life-cycle events, all client API calls are explicitly sent from a client application to the region server. You have the opportunity to hook into these calls just before they are applied, and just thereafter. Here is the list of the available calls:

Table 4-16. Callbacks for client API functions
API Call Pre-Hook Post-Hook

Table.put()

prePut(...)

void postPut(...)

Table.checkAndPut()

preCheckAndPut(...), preCheckAndPutAfterRowLock(...), prePut(...)

postPut(...), postCheckAndPut(...)

Table.get()

preGetOp(...)

void postGetOp(...)

Table.delete(), Table.batch()

preDelete(...), prePrepareTimeStampForDeleteVersion(...)

void postDelete(...)

Table.checkAndDelete()

preCheckAndDelete(...), preCheckAndDeleteAfterRowLock(...), preDelete(...)

postDelete(...), postCheckAndDelete(...)

Table.mutateRow()

preBatchMutate(...), prePut(...)/preGetOp(...)

postBatchMutate(...), postPut(...)/postGetOp(...), postBatchMutateIndispensably()

Table.append(),

preAppend(...), preAppendAfterRowLock()

postMutationBeforeWAL(...), postAppend(...)

Table.batch()

preBatchMutate(...), prePut(...)/preGetOp(...)/preDelete(...), prePrepareTimeStampForDeleteVersion(...)/

postPut(...)/postGetOp(...), postBatchMutate(...)

Table.checkAndMutate()

preBatchMutate(...)

postBatchMutate(...)

Table.getScanner()

preScannerOpen(...), preStoreScannerOpen(...)

postInstantiateDeleteTracker(...), postScannerOpen(...)

ResultScanner.next()

preScannerNext(...)

postScannerFilterRow(...), postScannerNext(...)

ResultScanner.close()

preScannerClose(...)

postScannerClose(...)

Table.increment(), Table.batch()

preIncrement(...), preIncrementAfterRowLock(...)

postMutationBeforeWAL(...), postIncrement(...)

Table.incrementColumnValue()

preIncrementColumnValue(...)

postIncrementColumnValue(...)

`Table.getClosestRowBefore()`a

preGetClosestRowBefore(...)

postGetClosestRowBefore(...)

Table.exists()

preExists(...)

postExists(...)

completebulkload (tool)

preBulkLoadHFile(...)

postBulkLoadHFile(...)

a This API call has been removed in HBase 1.0. It will be removed in the coprocessor API soon as well.

The table lists the events in calling order, separated by comma. When you see a slash (“/”) instead, then the callback depends on the contained operations. For example, when you batch a put and delete in one batch() call, then you would receive the pre/postPut() and pre/postDelete() callbacks, for each contained instance. There are many low-level methods, that allow you to hook into very essential processes of HBase’s inner workings. Usually the method name should explain the nature of the invocation, and with the parameters provided in the online API documentation you can determine what your options are. If all fails, you are an expert at this point anyways asking for such details, presuming you can refer to the source code, if need be.

Example 4-34 shows another (albeit somewhat advanced) way of figuring out the call order of coprocessor methods. The example code combines a RegionObserver with a custom Endpoint, and uses an internal list to track all invocations of any callback.

Example 4-34. Observer collecting invocation statistics.
@SuppressWarnings("deprecation") // because of API usage
public class ObserverStatisticsEndpoint
  extends ObserverStatisticsProtos.ObserverStatisticsService
  implements Coprocessor, CoprocessorService, RegionObserver {

  private RegionCoprocessorEnvironment env;
  private Map<String, Integer> stats = new LinkedHashMap<>();

  // Lifecycle methods

  @Override
  public void start(CoprocessorEnvironment env) throws IOException {
    if (env instanceof RegionCoprocessorEnvironment) {
      this.env = (RegionCoprocessorEnvironment) env;
    } else {
      throw new CoprocessorException("Must be loaded on a table region!");
    }
  }

  ...
  // Endpoint methods

  @Override
  public void getStatistics(RpcController controller,
    ObserverStatisticsProtos.StatisticsRequest request,
    RpcCallback<ObserverStatisticsProtos.StatisticsResponse> done) {
    ObserverStatisticsProtos.StatisticsResponse response = null;
    try {
      ObserverStatisticsProtos.StatisticsResponse.Builder builder =
        ObserverStatisticsProtos.StatisticsResponse.newBuilder();
      ObserverStatisticsProtos.NameInt32Pair.Builder pair =
        ObserverStatisticsProtos.NameInt32Pair.newBuilder();
      for (Map.Entry<String, Integer> entry : stats.entrySet()) {
        pair.setName(entry.getKey());
        pair.setValue(entry.getValue().intValue());
        builder.addAttribute(pair.build());
      }
      response = builder.build();
      // optionally clear out stats
      if (request.hasClear() && request.getClear()) {
        synchronized (stats) {
          stats.clear();
        }
      }
    } catch (Exception e) {
      ResponseConverter.setControllerException(controller,
        new IOException(e));
    }
    done.run(response);
  }

  /**
   * Internal helper to keep track of call counts.
   *
   * @param call The name of the call.
   */
  private void addCallCount(String call) {
    synchronized (stats) {
      Integer count = stats.get(call);
      if (count == null) count = new Integer(1);
      else count = new Integer(count + 1);
      stats.put(call, count);
    }
  }

  // All Observer callbacks follow here

  @Override
  public void preOpen(
    ObserverContext<RegionCoprocessorEnvironment> observerContext)
    throws IOException {
    addCallCount("preOpen");
  }

  @Override
  public void postOpen(
    ObserverContext<RegionCoprocessorEnvironment> observerContext) {
    addCallCount("postOpen");
  }

  ...
}

This is combined with the code in Example 4-35, which then executes every API call, followed by calling on the custom endpoint getStatistics(), which returns (and optionally clears) the collected invocation list.

Example 4-35. Use an endpoint to query observer statistics
  private static Table table = null;

  private static void printStatistics(boolean print, boolean clear)
  throws Throwable {
    final StatisticsRequest request = StatisticsRequest
      .newBuilder().setClear(clear).build();
    Map<byte[], Map<String, Integer>> results = table.coprocessorService(
      ObserverStatisticsService.class,
      null, null,
      new Batch.Call<ObserverStatisticsProtos.ObserverStatisticsService,
                     Map<String, Integer>>() {
        public Map<String, Integer> call(
          ObserverStatisticsService statistics)
        throws IOException {
          BlockingRpcCallback<StatisticsResponse> rpcCallback =
            new BlockingRpcCallback<StatisticsResponse>();
          statistics.getStatistics(null, request, rpcCallback);
          StatisticsResponse response = rpcCallback.get();
          Map<String, Integer> stats = new LinkedHashMap<String, Integer>();
          for (NameInt32Pair pair : response.getAttributeList()) {
            stats.put(pair.getName(), pair.getValue());
          }
          return stats;
        }
      }
    );
    if (print) {
      for (Map.Entry<byte[], Map<String, Integer>> entry : results.entrySet()) {
        System.out.println("Region: " + Bytes.toString(entry.getKey()));
        for (Map.Entry<String, Integer> call : entry.getValue().entrySet()) {
          System.out.println("  " + call.getKey() + ": " + call.getValue());
        }
      }
      System.out.println();
    }
  }

  public static void main(String[] args) throws IOException {
    Configuration conf = HBaseConfiguration.create();
    Connection connection = ConnectionFactory.createConnection(conf);
    HBaseHelper helper = HBaseHelper.getHelper(conf);
    helper.dropTable("testtable");
    helper.createTable("testtable", 3, "colfam1", "colfam2");
    helper.put("testtable",
      new String[]{"row1", "row2", "row3", "row4", "row5"},
      new String[]{"colfam1", "colfam2"}, new String[]{"qual1", "qual1"},
      new long[]{1, 2}, new String[]{"val1", "val2"});
    System.out.println("Before endpoint call...");
    helper.dump("testtable",
      new String[]{"row1", "row2", "row3", "row4", "row5"},
      null, null);
    try {
      TableName tableName = TableName.valueOf("testtable");
      table = connection.getTable(tableName);

      System.out.println("Apply single put...");
      Put put = new Put(Bytes.toBytes("row10"));
      put.addColumn(Bytes.toBytes("colfam1"), Bytes.toBytes("qual10"),
        Bytes.toBytes("val10"));
      table.put(put);
      printStatistics(true, true);

      System.out.println("Do single get...");
      Get get = new Get(Bytes.toBytes("row10"));
      get.addColumn(Bytes.toBytes("colfam1"), Bytes.toBytes("qual10"));
      table.get(get);
      printStatistics(true, true);
      ...
    } catch (Throwable throwable) {
      throwable.printStackTrace();
    }
  }

The output then reveals how each API call is triggering a multitude of callbacks, and different points in time:

Apply single put...
Region: testtable,,1428081747767.4fe07b3f06d5a2ed0ceb686aa0920b0b.
  postStartRegionOperation: 1
  - postStartRegionOperation-BATCH_MUTATE: 1
  prePut: 1
  preBatchMutate: 1
  postBatchMutate: 1
  postPut: 1
  postBatchMutateIndispensably: 1
  postCloseRegionOperation: 1
  - postCloseRegionOperation-BATCH_MUTATE: 1

Do single get...
Region: testtable,,1428081747767.4fe07b3f06d5a2ed0ceb686aa0920b0b.
  preGetOp: 1
  postStartRegionOperation: 2
  - postStartRegionOperation-SCAN: 2
  preStoreScannerOpen: 1
  postInstantiateDeleteTracker: 1
  postCloseRegionOperation: 2
  - postCloseRegionOperation-SCAN: 2
  postGetOp: 1

Send batch with put and get...
Region: testtable,,1428081747767.4fe07b3f06d5a2ed0ceb686aa0920b0b.
  preGetOp: 1
  postStartRegionOperation: 3
  - postStartRegionOperation-SCAN: 2
  preStoreScannerOpen: 1
  postInstantiateDeleteTracker: 1
  postCloseRegionOperation: 3
  - postCloseRegionOperation-SCAN: 2
  postGetOp: 1
  - postStartRegionOperation-BATCH_MUTATE: 1
  prePut: 1
  preBatchMutate: 1
  postBatchMutate: 1
  postPut: 1
  postBatchMutateIndispensably: 1
  - postCloseRegionOperation-BATCH_MUTATE: 1

Scan single row...
  -> after getScanner()...
Region: testtable,,1428081747767.4fe07b3f06d5a2ed0ceb686aa0920b0b.
  preScannerOpen: 1
  postStartRegionOperation: 1
  - postStartRegionOperation-SCAN: 1
  preStoreScannerOpen: 2
  postInstantiateDeleteTracker: 2
  postCloseRegionOperation: 1
  - postCloseRegionOperation-SCAN: 1
  postScannerOpen: 1

  -> after next()...
Region: testtable,,1428081747767.4fe07b3f06d5a2ed0ceb686aa0920b0b.
  preScannerNext: 1
  postStartRegionOperation: 1
  - postStartRegionOperation-SCAN: 1
  postCloseRegionOperation: 1
  - postCloseRegionOperation-ANY: 1
  postScannerNext: 1
  preScannerClose: 1
  postScannerClose: 1

  -> after close()...
Region: testtable,,1428081747767.4fe07b3f06d5a2ed0ceb686aa0920b0b.

Scan multiple rows...
  -> after getScanner()...
Region: testtable,,1428081747767.4fe07b3f06d5a2ed0ceb686aa0920b0b.
  preScannerOpen: 1
  postStartRegionOperation: 1
  - postStartRegionOperation-SCAN: 1
  preStoreScannerOpen: 2
  postInstantiateDeleteTracker: 2
  postCloseRegionOperation: 1
  - postCloseRegionOperation-SCAN: 1
  postScannerOpen: 1

  -> after next()...
Region: testtable,,1428081747767.4fe07b3f06d5a2ed0ceb686aa0920b0b.
  preScannerNext: 1
  postStartRegionOperation: 1
  - postStartRegionOperation-SCAN: 1
  postCloseRegionOperation: 1
  - postCloseRegionOperation-ANY: 1
  postScannerNext: 1
  preScannerClose: 1
  postScannerClose: 1

  -> after close()...
Region: testtable,,1428081747767.4fe07b3f06d5a2ed0ceb686aa0920b0b.

Apply single put with mutateRow()...
Region: testtable,,1428081747767.4fe07b3f06d5a2ed0ceb686aa0920b0b.
  postStartRegionOperation: 2
  - postStartRegionOperation-ANY: 2
  prePut: 1
  postCloseRegionOperation: 2
  - postCloseRegionOperation-ANY: 2
  preBatchMutate: 1
  postBatchMutate: 1
  postPut: 1
  postBatchMutateIndispensably: 1

Apply single column increment...
Region: testtable,,1428081747767.4fe07b3f06d5a2ed0ceb686aa0920b0b.
  preIncrement: 1
  postStartRegionOperation: 4
  - postStartRegionOperation-INCREMENT: 1
  - postStartRegionOperation-ANY: 1
  postCloseRegionOperation: 4
  - postCloseRegionOperation-ANY: 1
  preIncrementAfterRowLock: 1
  - postStartRegionOperation-SCAN: 2
  preStoreScannerOpen: 1
  postInstantiateDeleteTracker: 1
  - postCloseRegionOperation-SCAN: 2
  postScannerFilterRow: 1
  postMutationBeforeWAL: 1
  - postMutationBeforeWAL-INCREMENT: 1
  - postCloseRegionOperation-INCREMENT: 1
  postIncrement: 1

Apply multi column increment...
Region: testtable,,1428081747767.4fe07b3f06d5a2ed0ceb686aa0920b0b.
  preIncrement: 1
  postStartRegionOperation: 4
  - postStartRegionOperation-INCREMENT: 1
  - postStartRegionOperation-ANY: 1
  postCloseRegionOperation: 4
  - postCloseRegionOperation-ANY: 1
  preIncrementAfterRowLock: 1
  - postStartRegionOperation-SCAN: 2
  preStoreScannerOpen: 1
  postInstantiateDeleteTracker: 1
  - postCloseRegionOperation-SCAN: 2
  postScannerFilterRow: 1
  postMutationBeforeWAL: 2
  - postMutationBeforeWAL-INCREMENT: 2
  - postCloseRegionOperation-INCREMENT: 1
  postIncrement: 1

Apply single incrementColumnValue...
Region: testtable,,1428081747767.4fe07b3f06d5a2ed0ceb686aa0920b0b.
  preIncrement: 1
  postStartRegionOperation: 4
  - postStartRegionOperation-INCREMENT: 1
  - postStartRegionOperation-ANY: 1
  postCloseRegionOperation: 4
  - postCloseRegionOperation-ANY: 1
  preIncrementAfterRowLock: 1
  - postStartRegionOperation-SCAN: 2
  preStoreScannerOpen: 1
  postInstantiateDeleteTracker: 1
  - postCloseRegionOperation-SCAN: 2
  postMutationBeforeWAL: 1
  - postMutationBeforeWAL-INCREMENT: 1
  - postCloseRegionOperation-INCREMENT: 1
  postIncrement: 1

Call single exists()...
Region: testtable,,1428081747767.4fe07b3f06d5a2ed0ceb686aa0920b0b.
  preExists: 1
  preGetOp: 1
  postStartRegionOperation: 2
  - postStartRegionOperation-SCAN: 2
  preStoreScannerOpen: 1
  postInstantiateDeleteTracker: 1
  postCloseRegionOperation: 2
  - postCloseRegionOperation-SCAN: 2
  postGetOp: 1
  postExists: 1

Apply single delete...
Region: testtable,,1428081747767.4fe07b3f06d5a2ed0ceb686aa0920b0b.
  postStartRegionOperation: 4
  - postStartRegionOperation-DELETE: 1
  - postStartRegionOperation-BATCH_MUTATE: 1
  preDelete: 1
  prePrepareTimeStampForDeleteVersion: 1
  - postStartRegionOperation-SCAN: 2
  preStoreScannerOpen: 1
  postInstantiateDeleteTracker: 1
  postCloseRegionOperation: 4
  - postCloseRegionOperation-SCAN: 2
  preBatchMutate: 1
  postBatchMutate: 1
  postDelete: 1
  postBatchMutateIndispensably: 1
  - postCloseRegionOperation-BATCH_MUTATE: 1
  - postCloseRegionOperation-DELETE: 1

Apply single append...
Region: testtable,,1428081747767.4fe07b3f06d5a2ed0ceb686aa0920b0b.
  preAppend: 1
  postStartRegionOperation: 4
  - postStartRegionOperation-APPEND: 1
  - postStartRegionOperation-ANY: 1
  postCloseRegionOperation: 4
  - postCloseRegionOperation-ANY: 1
  preAppendAfterRowLock: 1
  - postStartRegionOperation-SCAN: 2
  preStoreScannerOpen: 1
  postInstantiateDeleteTracker: 1
  - postCloseRegionOperation-SCAN: 2
  postScannerFilterRow: 1
  postMutationBeforeWAL: 1
  - postMutationBeforeWAL-APPEND: 1
  - postCloseRegionOperation-APPEND: 1
  postAppend: 1

Apply checkAndPut (failing)...
  -> success: false
Region: testtable,,1428081747767.4fe07b3f06d5a2ed0ceb686aa0920b0b.
  preCheckAndPut: 1
  postStartRegionOperation: 4
  - postStartRegionOperation-ANY: 2
  postCloseRegionOperation: 4
  - postCloseRegionOperation-ANY: 2
  preCheckAndPutAfterRowLock: 1
  - postStartRegionOperation-SCAN: 2
  preStoreScannerOpen: 1
  postInstantiateDeleteTracker: 1
  - postCloseRegionOperation-SCAN: 2
  postCheckAndPut: 1

Apply checkAndPut (succeeding)...
  -> success: true
Region: testtable,,1428081747767.4fe07b3f06d5a2ed0ceb686aa0920b0b.
  preCheckAndPut: 1
  postStartRegionOperation: 5
  - postStartRegionOperation-ANY: 2
  postCloseRegionOperation: 5
  - postCloseRegionOperation-ANY: 2
  preCheckAndPutAfterRowLock: 1
  - postStartRegionOperation-SCAN: 2
  preStoreScannerOpen: 1
  postInstantiateDeleteTracker: 1
  - postCloseRegionOperation-SCAN: 2
  postScannerFilterRow: 1
  - postStartRegionOperation-BATCH_MUTATE: 1
  prePut: 1
  preBatchMutate: 1
  postBatchMutate: 1
  postPut: 1
  postBatchMutateIndispensably: 1
  - postCloseRegionOperation-BATCH_MUTATE: 1
  postCheckAndPut: 1

Apply checkAndDelete (failing)...
  -> success: false
Region: testtable,,1428081747767.4fe07b3f06d5a2ed0ceb686aa0920b0b.
  preCheckAndDelete: 1
  postStartRegionOperation: 4
  - postStartRegionOperation-ANY: 2
  postCloseRegionOperation: 4
  - postCloseRegionOperation-ANY: 2
  preCheckAndDeleteAfterRowLock: 1
  - postStartRegionOperation-SCAN: 2
  preStoreScannerOpen: 1
  postInstantiateDeleteTracker: 1
  - postCloseRegionOperation-SCAN: 2
  postCheckAndDelete: 1

Apply checkAndDelete (succeeding)...
  -> success: true
Region: testtable,,1428081747767.4fe07b3f06d5a2ed0ceb686aa0920b0b.
  preCheckAndDelete: 1
  postStartRegionOperation: 7
  - postStartRegionOperation-ANY: 2
  postCloseRegionOperation: 7
  - postCloseRegionOperation-ANY: 2
  preCheckAndDeleteAfterRowLock: 1
  - postStartRegionOperation-SCAN: 4
  preStoreScannerOpen: 2
  postInstantiateDeleteTracker: 2
  - postCloseRegionOperation-SCAN: 4
  postScannerFilterRow: 1
  - postStartRegionOperation-BATCH_MUTATE: 1
  preDelete: 1
  prePrepareTimeStampForDeleteVersion: 1
  preBatchMutate: 1
  postBatchMutate: 1
  postDelete: 1
  postBatchMutateIndispensably: 1
  - postCloseRegionOperation-BATCH_MUTATE: 1
  postCheckAndDelete: 1

Apply checkAndMutate (failing)...
  -> success: false
Region: testtable,,1428081747767.4fe07b3f06d5a2ed0ceb686aa0920b0b.
  postStartRegionOperation: 4
  - postStartRegionOperation-ANY: 2
  postCloseRegionOperation: 4
  - postCloseRegionOperation-ANY: 2
  - postStartRegionOperation-SCAN: 2
  preStoreScannerOpen: 1
  postInstantiateDeleteTracker: 1
  - postCloseRegionOperation-SCAN: 2

Apply checkAndMutate (succeeding)...
  -> success: true
Region: testtable,,1428081747767.4fe07b3f06d5a2ed0ceb686aa0920b0b.
  postStartRegionOperation: 8
  - postStartRegionOperation-ANY: 4
  postCloseRegionOperation: 8
  - postCloseRegionOperation-ANY: 4
  - postStartRegionOperation-SCAN: 4
  preStoreScannerOpen: 2
  postInstantiateDeleteTracker: 2
  - postCloseRegionOperation-SCAN: 4
  prePut: 1
  preDelete: 1
  prePrepareTimeStampForDeleteVersion: 1
  postScannerFilterRow: 1
  preBatchMutate: 1
  postBatchMutate: 1
  postPut: 1
  postDelete: 1
  postBatchMutateIndispensably: 1

Refer to the code for details, but the console output above is complete and should give you guidance to identify the various callbacks, and when they are invoked.

The RegionCoprocessorEnvironment Class

The environment instances provided to a coprocessor that is implementing the RegionObserver interface are based on the RegionCoprocessorEnvironment class—which in turn is implementing the CoprocessorEnvironment interface. The latter was discussed in “The Coprocessor Class Trinity”.

On top of the provided methods, the more specific, region-oriented subclass is adding the methods described in Table 4-17.

Table 4-17. Specific methods provided by the RegionCoprocessorEnvironment class
Method Description

getRegion()

Returns a reference to the region the current observer is associated with.

getRegionInfo()

Get information about the region associated with the current coprocessor instance.

getRegionServerServices()

Provides access to the shared RegionServerServices instance.

getSharedData()

All the shared data between the instances of this coprocessor.

The getRegion() call can be used to get a reference to the hosting HRegion instance, and to invoke calls this class provides. If you are in need of general information about the region, call getRegionInfo() to retrieve a HRegionInfo instance. This class has useful functions that allow to get the range of contained keys, the name of the region, and flags about its state. Some of the methods are:

byte[] getStartKey()
byte[] getEndKey()
byte[] getRegionName()
boolean isSystemTable()
int getReplicaId()
...

Consult the online documentation to study the available list of calls. In addition, your code can access the shared region server services instance, using the getRegionServerServices() method and returning an instance of RegionServerServices. It provides many, very advanced methods, and Table 4-18 list them for your perusal. We will not be discussing all the details of the provided functionality, and instead refer you again to the Java API documentation.9

Table 4-18. Methods provided by the RegionServerServices class
abort() Allows aborting the entire server process, shutting down the instance with the given reason.

addToOnlineRegions()

Adds a given region to the list of online regions. This is used for internal bookkeeping.

getCompactionRequester()

Provides access to the shared CompactionRequestor instance. This can be used to initiate compactions from within the coprocessor.

getConfiguration()

Returns the current server configuration.

getConnection()

Provides access to the shared connection instance.

getCoordinatedStateManager()

Access to the shared state manager, gives access to the TableStateManager, which in turn can be used to check on the state of a table.

getExecutorService()

Used by the master to schedule system-wide events.

getFileSystem()

Returns the Hadoop FileSystem instance, allowing access to the underlying file system.

getFlushRequester()

Provides access to the shared FlushRequester instance. This can be used to initiate memstore flushes.

getFromOnlineRegions()

Returns a HRegion instance for a given region, must be hosted by same server.

getHeapMemoryManager()

Provides access to a manager instance, gives access to heap related information, such as occupancy.

getLeases()

Returns the list of leases, as acquired for example by client side scanners.

getMetaTableLocator()

The method returns a class providing system table related functionality.

getNonceManager()

Gives access to the nonce manager, which is used to generate unique IDs.

getOnlineRegions()

Lists all online regions on the current server for a given table.

getRecoveringRegions()

Lists all regions that are currently in the process of replaying WAL entries.

getRegionServerAccounting()

Provides access to the shared RegionServerAccounting instance. It allows you to check on what the server currently has allocated—for example, the global memstore size.

getRegionsInTransitionInRS()

List of regions that are currently in-transition.

getRpcServer()

Returns a reference to the low-level RPC implementation instance.

getServerName()

The server name, which is unique for every region server process.

getTableLockManager()

Gives access to the lock manager. Can be used to acquire read and write locks for the entire table.

getWAL()

Provides access to the write-ahead log instance.

getZooKeeper()

Returns a reference to the ZooKeeper watcher instance.

isAborted()

Flag is true when abort() was called previously.

isStopped()

Returns true when stop() (inherited from Stoppable) was called beforehand.

isStopping()

Returns true when the region server is stopping.

postOpenDeployTasks()

Called by the region server after opening a region, does internal housekeeping work.

registerService()

Registers a new custom service. Called when server starts and coprocessors are loaded.

removeFromOnlineRegions()

Removes a given region from the internal list of online regions.

reportRegionStateTransition()

Triggers a report chain when a state change is needed for a region. Sent to the Master.

stop()

Stops the server gracefully.

There is no need of having to implement your own RegionObserver class, based on the interface, you can use the BaseRegionObserver class to only implement what is needed.

The BaseRegionObserver Class

This class can be used as the basis for all your observer-type coprocessors. It has placeholders for all methods required by the RegionObserver interface. They are all left blank, so by default nothing is done when extending this class. You must override all the callbacks that you are interested in to add the required functionality.

Example 4-36 is an observer that handles specific row key requests.

Example 4-36. Example region observer checking for special get requests
public class RegionObserverExample extends BaseRegionObserver {
  public static final byte[] FIXED_ROW = Bytes.toBytes("@@@GETTIME@@@");

  @Override
  public void preGetOp(ObserverContext<RegionCoprocessorEnvironment> e,
    Get get, List<Cell> results) throws IOException {
    if (Bytes.equals(get.getRow(), FIXED_ROW)) { 1
      Put put = new Put(get.getRow());
      put.addColumn(FIXED_ROW, FIXED_ROW, 2
        Bytes.toBytes(System.currentTimeMillis()));
      CellScanner scanner = put.cellScanner();
      scanner.advance();
      Cell cell = scanner.current(); 3
      results.add(cell); 4
    }
  }
}
1

Check if the request row key matches a well known one.

2

Create cell indirectly using a Put instance.

3

Get first cell from Put using the CellScanner instance.

4

Create a special KeyValue instance containing just the current time on the server.

Note

The following was added to the hbase-site.xml file to enable the coprocessor:

<property>
  <name>hbase.coprocessor.user.region.classes</name>
  <value>coprocessor.RegionObserverExample</value>
</property>

The class is available to the region server’s Java Runtime Environment because we have already added the JAR of the compiled repository to the HBASE_CLASSPATH variable in hbase-env.sh—see “Coprocessor Loading” for reference.

Do not forget to restart HBase, though, to make the changes to the static configuration files active.

The row key @@@GETTIME@@@ is handled by the observer’s preGetOp() hook, inserting the current time of the server. Using the HBase Shell—after deploying the code to servers—you can see this in action:

hbase(main):001:0> get 'testtable', '@@@GETTIME@@@'
COLUMN                          CELL
 @@@GETTIME@@@:@@@GETTIME@@@    timestamp=9223372036854775807, \
                                value=\x00\x00\x01L\x857\x9D\x0C
1 row(s) in 0.2810 seconds

hbase(main):002:0> Time.at(Bytes.toLong( \
  "\x00\x00\x01L\x857\x9D\x0C".to_java_bytes) / 1000)
=> Sat Apr 04 18:15:56 +0200 2015

This requires an existing table, because trying to issue a get call to a nonexistent table will raise an error, before the actual get operation is executed. Also, the example does not set the bypass flag, in which case something like the following could happen:

hbase(main):003:0> create 'testtable2', 'colfam1'
0 row(s) in 0.6630 seconds

=> Hbase::Table - testtable2
hbase(main):004:0> put 'testtable2', '@@@GETTIME@@@', \
  'colfam1:qual1', 'Hello there!'
0 row(s) in 0.0930 seconds

hbase(main):005:0> get 'testtable2', '@@@GETTIME@@@'
COLUMN                          CELL
 @@@GETTIME@@@:@@@GETTIME@@@    timestamp=9223372036854775807, \
                                value=\x00\x00\x01L\x85M\xEC{
 colfam1:qual1                  timestamp=1428165601622, value=Hello there!
2 row(s) in 0.0220 seconds

A new table is created and a row with the special row key is inserted. Subsequently, the row is retrieved. You can see how the artificial column is mixed with the actual one stored earlier. To avoid this issue, Example 4-37 adds the necessary e.bypass() call.

Example 4-37. Example region observer checking for special get requests and bypassing further processing
    if (Bytes.equals(get.getRow(), FIXED_ROW)) {
      long time = System.currentTimeMillis();
      Cell cell = CellUtil.createCell(get.getRow(), FIXED_ROW, FIXED_ROW, 1
        time, KeyValue.Type.Put.getCode(), Bytes.toBytes(time));
      results.add(cell);
      e.bypass(); 2
    }
1

Create cell directly using the supplied utility.

2

Once the special cell is inserted all subsequent coprocessors are skipped.

Note

You need to adjust the hbase-site.xml file to point to the new example:

<property>
  <name>hbase.coprocessor.user.region.classes</name>
  <value>coprocessor.RegionObserverWithBypassExample</value>
</property>

Just as before, please restart HBase after making these adjustments.

As expected, and using the shell once more, the result is now different:

hbase(main):006:0> get 'testtable2', '@@@GETTIME@@@'
COLUMN                          CELL
 @@@GETTIME@@@:@@@GETTIME@@@    timestamp=1428166075865, \
                                value=\x00\x00\x01L\x85T\xE5\xD9
1 row(s) in 0.2840 seconds

Only the artificial column is returned, and since the default get operation is bypassed, it is the only column retrieved. Also note how the timestamp of this column is 9223372036854775807--which is Long.MAX_VALUE-- for the first example, and 1428166075865 for the second. The former does not set the timestamp explicitly when it creates the Cell instance, causing it to be set to HConstants.LATEST_TIMESTAMP (by default), and that is, in turn, set to Long.MAX_VALUE. The second example uses the CellUtil class to create a cell instance, which requires a timestamp to be specified (for the particular method used, there are others that allow omitting it), and we set it to the same server time as the value is set to.

Using e.complete() instead of the shown e.bypass() makes little difference here, since no other coprocessor is in the chain. The online code repository has an example that you can use to experiment with either flag, and both together.

The MasterObserver Class

The second observer subclass of Coprocessor discussed handles all possible callbacks the master server may initiate. The operations and API calls are explained in Chapter 5, though they can be classified as data-manipulation operations, similar to DDL used in relational database systems. For that reason, the MasterObserver class provides the following hooks:

Table 4-19. Callbacks for master API functions
API Call Shell Call Pre-Hook Post-Hook

createTable()

create

preCreateTable(...), preCreateTableHandler(...)

postCreateTable(...)

deleteTable(), deleteTables()

drop

preDeleteTable(...), preDeleteTableHandler(...)

postDeleteTableHandler(...), postDeleteTable(...)

modifyTable()

alter

preModifyTable(...), preModifyTableHandler(...)

postModifyTableHandler(...), postModifyTable(...)

modifyTable()

alter

preAddColumn(...), preAddColumnHandler(...)

postAddColumnHandler(...), postAddColumn(...)

modifyTable()

alter

preDeleteColumn(...), preDeleteColumnHandler(...)

postDeleteColumnHandler(...), postDeleteColumn(...)

modifyTable()

alter

preModifyColumn(...), preModifyColumnHandler(...)

postModifyColumnHandler(...), postModifyColumn(...)

enableTable(), enableTables()

enable

preEnableTable(...), preEnableTableHandler(...)

postEnableTableHandler(...), postEnableTable(...)

disableTable(), disableTables()

disable

preDisableTable(...), preDisableTableHandler(...)

postDisableTableHandler(...), postDisableTable(...)

flush()

flush

preTableFlush(...)

postTableFlush(...)

truncateTable()

truncate

preTruncateTable(...), preTruncateTableHandler(...)

postTruncateTableHandler(...), postTruncateTable(...)

move()

move

preMove(...)

postMove(...)

assign()

assign

preAssign(...)

postAssign(...)

unassign()

unassign

preUnassign(...)

postUnassign(...)

offline()

n/a

preRegionOffline(...)

postRegionOffline(...)

balancer()

balancer

preBalance(...)

postBalance(...)

setBalancerRunning()

balance_switch

preBalanceSwitch(...)

postBalanceSwitch(...)

listTableNames()

list

preGetTableNames(...)

postGetTableNames(...)

getTableDescriptors(), listTables()

list

preGetTableDescriptors(...)

postGetTableDescriptors(...)

createNamespace()

create_namespace

preCreateNamespace(...)

postCreateNamespace(...)

deleteNamespace()

drop_namespace

preDeleteNamespace(...)

postDeleteNamespace(...)

getNamespaceDescriptor()

describe_namespace

preGetNamespaceDescriptor(...)

postGetNamespaceDescriptor(...)

listNamespaceDescriptors()

list_namespace

preListNamespaceDescriptors(...)

postListNamespaceDescriptors(...)

modifyNamespace()

alter_namespace

preModifyNamespace(...)

postModifyNamespace(...)

cloneSnapshot()

clone_snapshot

preCloneSnapshot(...)

postCloneSnapshot(...)

deleteSnapshot(), deleteSnapshots()

delete_snapshot, delete_all_snapshot

preDeleteSnapshot(...)

postDeleteSnapshot(...)

restoreSnapshot()

restore_snapshot

preRestoreSnapshot(...)

postRestoreSnapshot(...)

snapshot()

snapshot

preSnapshot(...)

postSnapshot(...)

shutdown()

n/a

void preShutdown(...)

n/aa

stopMaster()

n/a

preStopMaster(...)

n/ab

n/a

n/a

preMasterInitialization(...)

postStartMaster(...)

a There is no post hook, because after the shutdown, there is no longer a cluster to invoke the callback.

b There is no post hook, because after the master has stopped, there is no longer a process to invoke the callback.

Most of these methods are self-explanatory, since their name matches the admin API function. They are grouped roughly into table and region, namespace, snapshot, and server related calls. You will note that some API calls trigger more than one callback. There are special pre/postXYZHandler hooks, that indicate the asynchronous nature of the call. The Handler instance is needed to hand off the work to an executor thread pool. And as before, some pre hooks cannot honor the bypass flag, so please, as before, read the online API reference carefully!

The MasterCoprocessorEnvironment Class

Similar to how the RegionCoprocessorEnvironment is enclosing a single RegionObserver coprocessor, the MasterCoprocessorEnvironment is wrapping MasterObserver instances. It also implements the CoprocessorEnvironment interface, thus giving you, for instance, access to the getTable() call to access data from within your own implementation.

On top of the provided methods, the more specific, master-oriented subclass adds the one method described in Table 4-20.

Table 4-20. Specific method provided by the MasterCoprocessorEnvironment class
Method Description

getMasterServices()

Provides access to the shared MasterServices instance.

Your code can access the shared master services instance, which exposes many functions of the Master admin API, as described in Chapter 5. For the sake of not duplicating the description of each, I have grouped them here by purpose, but refrain from explaining them. First are the table related calls:

createTable(HTableDescriptor, byte[][])
deleteTable(TableName)
modifyTable(TableName, HTableDescriptor)
enableTable(TableName)
disableTable(TableName)
getTableDescriptors()
truncateTable(TableName, boolean)

addColumn(TableName, HColumnDescriptor)
deleteColumn(TableName, byte[])
modifyColumn(TableName, HColumnDescriptor)

This is continued by namespace related methods:

createNamespace(NamespaceDescriptor)
deleteNamespace(String)
modifyNamespace(NamespaceDescriptor)
getNamespaceDescriptor(String)
listNamespaceDescriptors()
listTableDescriptorsByNamespace(String)
listTableNamesByNamespace(String)

Finally, Table 4-21 lists the more specific calls with a short description.

Table 4-21. Methods provided by the MasterServices class
Method Description

abort()

Allows aborting the entire server process, shutting down the instance with the given reason.

checkTableModifiable()

Convenient to check if a table exists and is offline so that it can be altered.

dispatchMergingRegions()

Flags two regions to be merged, which is performed on the region servers.

getAssignmentManager()

Gives you access to the assignment manager instance. It is responsible for all region assignment operations, such as assign, unassign, balance, and so on.

getConfiguration()

Returns the current server configuration.

getConnection()

Provides access to the shared connection instance.

getCoordinatedStateManager()

Access to the shared state manager, gives access to the TableStateManager, which in turn can be used to check on the state of a table.

getExecutorService()

Used by the master to schedule system-wide events.

getMasterCoprocessorHost()

Returns the enclosing host instance.

getMasterFileSystem()

Provides you with an abstraction layer for all filesystem-related operations the master is involved in—for example, creating directories for table files and log files.

getMetaTableLocator()

The method returns a class providing system table related functionality.

getServerManager()

Returns the server manager instance. With it you have access to the list of servers, live or considered dead, and more.

getServerName()

The server name, which is unique for every region server process.

getTableLockManager()

Gives access to the lock manager. Can be used to acquire read and write locks for the entire table.

getZooKeeper()

Returns a reference to the ZooKeeper watcher instance.

isAborted()

Flag is true when abort() was called previously.

isInitialized()

After the server process is operational, this call will return true.

isServerShutdownHandlerEnabled()

When an optional shutdown handler was set, this check returns true.

isStopped()

Returns true when stop() (inherited from Stoppable) was called beforehand.

registerService()

Registers a new custom service. Called when server starts and coprocessors are loaded.

stop()

Stops the server gracefully.

Even though I am listing all the master services methods, I will not be discussing all the details on the provided functionality, and instead refer you to the Java API documentation once more.10

The BaseMasterObserver Class

Either you can base your efforts to implement a MasterObserver on the interface directly, or you can extend the BaseMasterObserver class instead. It implements the interface while leaving all callback functions empty. If you were to use this class unchanged, it would not yield any kind of reaction.

Adding functionality is achieved by overriding the appropriate event methods. You have the choice of hooking your code into the pre and/or post calls. Example 4-38 uses the post hook after a table was created to perform additional tasks.

Example 4-38. Example master observer that creates a separate directory on the file system when a table is created.
public class MasterObserverExample extends BaseMasterObserver {

  @Override
  public void postCreateTable(
    ObserverContext<MasterCoprocessorEnvironment> ctx,
    HTableDescriptor desc, HRegionInfo[] regions)
    throws IOException {
    TableName tableName = desc.getTableName(); 1

    MasterServices services = ctx.getEnvironment().getMasterServices();
    MasterFileSystem masterFileSystem = services.getMasterFileSystem(); 2
    FileSystem fileSystem = masterFileSystem.getFileSystem();

    Path blobPath = new Path(tableName.getQualifierAsString() + "-blobs"); 3
    fileSystem.mkdirs(blobPath);

  }
}
1

Get the new table’s name from the table descriptor.

2

Get the available services and retrieve a reference to the actual file system.

3

Create a new directory that will store binary data from the client application.

Note

You need to add the following to the hbase-site.xml file for the coprocessor to be loaded by the master process:

<property>
  <name>hbase.coprocessor.master.classes</name>
  <value>coprocessor.MasterObserverExample</value>
</property>

Just as before, restart HBase after making these adjustments.

Once you have activated the coprocessor, it is listening to the said events and will trigger your code automatically. The example is using the supplied services to create a directory on the filesystem. A fictitious application, for instance, could use it to store very large binary objects (known as blobs) outside of HBase.

To trigger the event, you can use the shell like so:

hbase(main):001:0> create 'testtable3', 'colfam1'
0 row(s) in 0.6740 seconds

This creates the table and afterward calls the coprocessor’s postCreateTable() method. The Hadoop command-line tool can be used to verify the results:

$ bin/hadoop dfs -ls
Found 1 items
drwxr-xr-x   - larsgeorge supergroup          0 ... testtable3-blobs

There are many things you can implement with the MasterObserver coprocessor. Since you have access to most of the shared master resources through the MasterServices instance, you should be careful what you do, as it can potentially wreak havoc.

Finally, because the environment is wrapped in an ObserverContext, you have the same extra flow controls, exposed by the bypass() and complete() methods. You can use them to explicitly disable certain operations or skip subsequent coprocessor execution, respectively.

The BaseMasterAndRegionObserver Class

There is another, related base class provided by HBase, the BaseMasterAndRegionObserver. It is a combination of two things: the BaseRegionObserver, as described in “The BaseRegionObserver Class”, and the MasterObserver interface:

public abstract class BaseMasterAndRegionObserver
  extends BaseRegionObserver implements MasterObserver {
  ...
}

In effect, this is like combining the previous BaseMasterObserver and BaseRegionObserver classes into one. This class is only useful to run on the HBase Master since it provides both, a region server and master implementation. This is used to host the system tables directly on the master.11 Otherwise the functionality of both have been described above, therefore we can move on to the next coprocessor subclass.

The RegionServerObserver Class

You have seen how to run code next to regions, and within the master processes. The same is possible within the region servers using the RegionServerObserver class. It exposes well-defined hooks that pertain to the server functionality, that is, spanning many regions and tables. For that reason, the following hooks are provided:

postCreateReplicationEndPoint(...)

Invoked after the server has created a replication endpoint (not to be confused with coprocessor endpoints).

preMerge(...), postMerge(...)

Called when two regions are merged.

preMergeCommit(...), postMergeCommit(...)

Same as above, but with narrower scope. Called after preMerge() and before postMerge().

preRollBackMerge(...), postRollBackMerge(...)

These are invoked when a region merge fails, and the merge transaction has to be rolled back.

preReplicateLogEntries(...), postReplicateLogEntries(...)

Tied into the WAL entry replay process, allows special treatment of each log entry.

preRollWALWriterRequest(...), postRollWALWriterRequest(...)

Wrap the rolling of WAL files, which will happen based on size, time, or manual request.

preStopRegionServer(...)

This pre-only hook is called when the from Stoppable inherited method stop() is called. The environment allows access to that method on a region server.

The RegionServerCoprocessorEnvironment Class

Similar to how the MasterCoprocessorEnvironment is enclosing a single MasterObserver coprocessor, the RegionServerCoprocessorEnvironment is wrapping RegionServerObserver instances. It also implements the CoprocessorEnvironment interface, thus giving you, for instance, access to the getTable() call to access data from within your own implementation.

On top of the provided methods, the specific, region server-oriented subclass adds the one method described in Table 4-20.

Table 4-22. Specific method provided by the RegionServerCoprocessorEnvironment class
Method Description

getRegionServerServices()

Provides access to the shared RegionServerServices instance.

We have discussed this class in “The RegionCoprocessorEnvironment Class” before, and refer you to Table 4-18, which lists the available methods.

The BaseRegionServerObserver Class

Just with the other base observer classes you have seen, the BaseRegionServerObserver is an empty implementation of the RegionServerObserver interface, saving you time and effort to otherwise implement the many callback methods. Here you can focus on what you really need, and overwrite the necessary methods only. The available callbacks are very advanced, and we refrain from constructing a simple example at this point. Please refer to the source code if you need to implement at this low level.

The WALObserver Class

The next observer class we are going to address is related to the write-ahead log, or WAL for short. It offers a manageable list of callbacks, namely the following two:

preWALWrite(...), postWALWrite(...)

Wrap the writing of log entries to the WAL, allowing access to the full edit record.

Since you receive the entire record in these methods, you can influence what is written to the log. For example, an advanced use-case might be to add extra cells to the edit, so that during a potential log replay the cells could help fine tune the reconstruction process. You could add information that trigger external message queueing, so that other systems could react appropriately to the replay. Or you could use this information to create auxiliary data upon seeing the special cells later on.

The WALCoprocessorEnvironment Class

Once again, there is a specialized environment that is provided as part of the callbacks. Here it is an instance of the WALCoprocessorEnvironment class. It also extends the CoprocessorEnvironment interface, thus giving you, for instance, access to the getTable() call to access data from within your own implementation.

On top of the provided methods, the specific, WAL-oriented subclass adds the one method described in Table 4-23.

Table 4-23. Specific method provided by the WALCoprocessorEnvironment class
Method Description

getWAL()

Provides access to the shared WAL instance.

With the reference to the WAL you can roll the current writer, in other words, close the current log file and create a new one. You could also call the sync() method to force the edit records into the persistence layer. Here are the methods available from the WAL interface:

void registerWALActionsListener(final WALActionsListener listener)
boolean unregisterWALActionsListener(final WALActionsListener listener)
byte[][] rollWriter() throws FailedLogCloseException, IOException
byte[][] rollWriter(boolean force) throws FailedLogCloseException, IOException
void shutdown() throws IOException
void close() throws IOException
long append(HTableDescriptor htd, HRegionInfo info, WALKey key, WALEdit edits,
  AtomicLong sequenceId, boolean inMemstore, List<Cell> memstoreKVs)
  throws IOException
void sync() throws IOException
void sync(long txid) throws IOException
boolean startCacheFlush(final byte[] encodedRegionName)
void completeCacheFlush(final byte[] encodedRegionName)
void abortCacheFlush(byte[] encodedRegionName)
WALCoprocessorHost getCoprocessorHost()
long getEarliestMemstoreSeqNum(byte[] encodedRegionName)

Once again, this is very low-level functionality, and at that point you most likely have read large parts of the code already. We will defer the explanation of each method to the online Java documentation.

The BaseWALObserver Class

The BaseWALObserver class implements the WALObserver interface. This is mainly done to help along with a pending (as of this writing, for HBase 1.0) deprecation process of other variants of the same callback methods. You can use this class to implement your own, or implement the interface directly.

The BulkLoadObserver Class

This observer class is used during bulk loading operations, as triggered by the HBase supplied completebulkload tool, contained in the server JAR file. Using the Hadoop JAR support, you can see the list of tools like so:

$ bin/hadoop jar /usr/local/hbase-1.0.0-bin/lib/hbase-server-1.0.0.jar
An example program must be given as the first argument.
Valid program names are:
  CellCounter: Count cells in HBase table
  completebulkload: Complete a bulk data load.
  copytable: Export a table from local cluster to peer cluster
  export: Write table data to HDFS.
  import: Import data written by Export.
  importtsv: Import data in TSV format.
  rowcounter: Count rows in HBase table
  verifyrep: Compare the data from tables in two different clusters.
    WARNING: It doesn't work for incrementColumnValues'd cells since the
    timestamp is changed after being appended to the log.

Once the completebulkload tool is run, it will attempt to move all staged bulk load files into place (more on this in Chapter 11, so for now please bear with me). During that operation the available callbacks are triggered:

prePrepareBulkLoad(...)

Invoked before the bulk load operation takes place.

preCleanupBulkLoad(...)

Called when the bulk load is complete and clean up tasks are performed.

Both callbacks cannot skip the default processing using the bypass flag. They are merely invoked but their actions take no effect on the further bulk loading process. The observer does not have its own environment, instead it uses the RegionCoprocessorEnvironment explained in “The RegionCoprocessorEnvironment Class”.

The EndPointObserver Class

The final observer is equally manageable, since it does not employ its own environment, but also shares the RegionCoprocessorEnvironment (see “The RegionCoprocessorEnvironment Class”). This makes sense, because endpoints run in the context of a region. The available callback methods are:

preEndpointInvocation(...), postEndpointInvocation(...)

Whenever an endpoint method is called upon from a client, these callbacks wrap the server side execution.

The client can replace (for the pre hook) or modify (for the post hook, using the provided Message.Builder instance) the given Message instance to modify the outcome of the endpoint method. If an exception is thrown during the pre hook, then the server-side call is aborted completely.

1 The various filter methods are discussed in “Custom Filters”.

2 See Table 4-9 for an overview of compatible filters.

3 For users of older, pre-Protocol Buffer based HBase, please see “Migrate Custom Filters to post HBase 0.96” for a migration guide.

4 This was changed in the final 0.92 release (after the book went into print) from enums to constants in HBASE-4048.

5 The use of HTableInterface is an API remnant from before HBase 1.0. For HBase 2.0 and later this is changed to the proper `Table in HBASE-12586.

6 As of this writing, there is an error thrown when using null keys. See HBASE-13417 for details.

7 See the RegionServer documentation.

8 Sometimes inconsistently named "c" instead.

9 The Java HBase classes are documented online.

10 The Java HBase classes are documented online.

11 As of this writing, there are discussions to remove—or at least disable—this functionality in future releases. See HBASE-11165 for details.

With Safari, you learn the way you learn best. Get unlimited access to videos, live online training, learning paths, books, interactive tutorials, and more.

Start Free Trial

No credit card required