Use coprocessors in HBase

Coprocessors overview

The HBase coprocessors feature is an extension mechanism (framework) that allows you to perform distributed computations by executing user code directly at where the data is stored.

Observer coprocessor

Observer coprocessor (or observer) is intended to create hooks allowing to monitor table operations from the client side. Those operations include get, put, scan, delete, and other requests. Three kinds of observers are presented in HBase:

  • RegionObserver — provides hooks for data manipulation requests like get, put, scan, delete, and others. An instance of RegionObserver is allocated for every table region. The scope of each instance is restricted to its region.

  • WALObserver — provides hooks for the operations of the write ahead log (WAL). WAL writing and reconstruction events can be observed or intercepted by the WALObserver. There is one WAL processing context per region server, with one WALObserver residing in it.

  • MasterObserver — provides hooks for the operations related to data definition like create table, delete table, modify table, and others. A MasterObserver resides in the HBase master context.

An observer can observe and meddle in the actions performed by the client in the region. The following functions are available for an observer.

Observer functions
Function Description

preGet, postGet

Called before and after the client makes a get request, respectively

preExists, postExists

Called before and after the client checks the existence of the data by using a get request, respectively

prePut, postPut

Called before and after the client writes the data by using a put request, respectively

preDelete, postDelete

Called before and after the client deletes the data by using a delete request, respectively

preOpen, postOpen

Called before and after the region is reported as online to the HBase master, respectively

preFlush, postFlush

Called before and after the memstore is flushed to the new store file, respectively

preScannerOpen, postScannerOpen

Called before and after the client invokes a new scanner by using a scan request, respectively

preScannerNext, postScannerNext

Called before and after the next row is requested by the client in an opened scanner, respectively

preScannerClose, postScannerClose

Called before and after the client closes a scanner, respectively

preCheckAndPut, postCheckAndPut

Called before and after the client calls the checkAndPut() function, respectively

preCheckAndDelete, postCheckAndDelete

Called before and after the client calls the checkAndDelete() function, respectively

Below is the code for a simple observer. It checks the user information for the get request by injecting code at certain preGet hooks. It will throw a CoprocessorException if the user is not allowed to access the resource, which will in turn deny the client request.

package org.apache.hadoop.hbase.coprocessor;

import org.apache.hadoop.hbase.client.Get;

public class AccessControlCoprocessor extends BaseRegionObserverCoprocessor {

  public Get preGet(CoprocessorEnvironment e, Get get)
      throws CoprocessorException {

    if (access_not_allowed)  {
      throw new AccessDeniedException("User access denied");
    }
    return get;
  }
}

Endpoint coprocessor

Endpoint coprocessor (or endpoint) allows you to run custom code at a region when triggered. For example, you can perform column aggregation at any given region server.

An endpoint allows you to define a custom dynamic RPC protocol for communication between the client and the region server. For example, it is possible to create a method that specifies custom request parameters and return types. RPC methods exposed by an endpoint can be triggered by calling client side dynamic RPC functions.

Coprocessor loading and unloading

A coprocessor needs to be loaded before the HBase can use it. Static loading is done through the HBase configuration and dynamic loading is done using the HBase shell or the Java API.

Static coprocessor loading

To statically load a coprocessor, do the following:

  1. Place the JAR file with the coprocessor code into the lib/ directory of the HBase installation path on all cluster hosts where the coprocessor is required.

  2. Go to ADCM UI and select your ADH cluster.

  3. Navigate to Services → HBase → Primary configuration and toggle Show advanced.

  4. Open the Custom hbase-site.xml section and click Add property.

  5. Specify one of the following names:

    • hbase.coprocessor.region.classes — for a RegionObserver or an endpoint;

    • hbase.coprocessor.wal.classes — for a WALObserver;

    • hbase.coprocessor.master.classes — for a MasterObserver.

  6. Specify the fully-qualified class name of the coprocessor implementation class as a property value. Multiple classes should have their names separated by commas.

  7. Save the configuration by clicking Save → Create and restart the service by clicking Actions → Reconfig and graceful restart.

Statically loaded coprocessors are active on all regions of all tables. The first coprocessor has the priority Coprocessor.Priority.SYSTEM. All others have integer priorities in the order of their declaration. You can override the priority value by adding the new value to the class name separated by the pipe character. Example: org.myname.hbase.coprocessor.endpoint.SumEndPoint|21.

Static coprocessor unloading

To statically unload a coprocessor, do the following:

  1. Delete the property from the Custom hbase-site.xml section that was created for the coprocessor.

  2. Save the configuration by clicking Save → Create and restart the service by clicking Actions → Reconfig and graceful restart.

  3. (Optionally) Delete the JAR file from the lib/ directory of the HBase installation.

Dynamic coprocessor loading

To dynamically load a coprocessor, go to HBase shell and execute a command of the following kind:

alter '<table_name>', METHOD => 'table_att', 'coprocessor'=>'hdfs://<namenode>:<port>/<coprocessor.jar>|<class_name>|<priority>|<args>'

where:

  • <table_name> — the name of the table for which the coprocessor is being loaded;

  • <namenode>:<port> — network address of the node containing the coprocessor code JAR file. If the file is stored on the same node, you can just drop this part, minding the triple slash character: hdfs:///;

  • <coprocessor.jar> — full path to the JAR file on the filesystem;

  • <class_name> — full class name of the coprocessor;

  • <priority> — an integer representing the coprocessor priority;

  • <args> — arguments taken by the coprocessor implementation.

Example:

alter 't1', METHOD => 'table_att', 'coprocessor'=>'hdfs://<namenode>:<port>/user/<hadoop-user>/coprocessor.jar|org.myname.hbase.Coprocessor.RegionObserverExample|1037|arg1=1,arg2=2'

To verify that the coprocessor has loaded, use the describe command. The coprocessor info should appear in the output right after the table name.

Dynamic coprocessor unloading

To dynamically unload a coprocessor, go to HBase shell and execute a command of the following kind:

alter '<table_name>', METHOD => 'table_att_unset', NAME => 'coprocessor$1'
Found a mistake? Seleсt text and press Ctrl+Enter to report it