HBase: How to scan salted tables with region specific key ranges in MapReduce

Author: Pengyu Wang


Salted HBase table with pre-split is a proven effective HBase solution to provide uniform workload distribution across region servers and prevent hot spots during bulk writes. In this design, a row key is made with a logical key plus salt at the beginning. One way of generating salt is by calculating n (number of regions) modulo on the hash code of the logical row key (date, etc).

Salting Row Keys

For example, a table accepting data load on a daily basis might use logical row keys starting with a date, and we want to pre-split this table into 1000 regions. In this case, we expect to generate 1000 different salts. The salt can be generated, for example, as:

StringUtils.leftPad(Integer.toString(Math.abs(keyCore.hashCode() % 1000)), 3, "0") + "|" + logicalKey

logicalKey = 2015-04-26|abc
rowKey = 893|2015-04-26|abc

The output from hashCode() with modulo provides randomness for salt value from “000” to “999”. With this key transform, the table is pre-split on the salt boundaries as it’s created. This will make row volumes uniformly distributed while loading the HFiles with MapReduce bulkload. It guarantees that row keys with same salt fall into the same region.

In many use cases, like data archiving, you need to scan or copy the data over a particular logical key range (date range) using MapReduce job. Standard table MapReduce jobs are setup by providing the Scan instance with key range attributes.

Scan scan = new Scan();
scan.setCaching(1000);
scan.setCacheBlocks(false);
scan.setBatch(1000);
scan.setMaxVersions(1);
scan.setStartRow(Bytes.toBytes("2015-04-26"));
scan.setStopRow(Bytes.toBytes("2015-04-27"));

/* Setup the table mapper job */
TableMapReduceUtil.initTableMapperJob(
tablename,
scan,
DataScanMapper.class,
ImmutableBytesWritable.class,
KeyValue.class,
job,
true,
TableInputFormat.class
);

However, setup of such a job becomes challenging for salted pre-splitted tables. Start and stop row keys will be different for each region because each has a unique salt. And we can’t specify multiple ranges to one Scan instance.

To solve this problem, we need to look into how Table MapReduce works. Generally, the MapReduce framework creates one map task to read and process each input split. Each split is generated in InputFormat class base, by method getSplits().

In HBase table MapReduce job, TableInputFormat is used as InputFormat. Inside the implementation, the getSplits() method is overridden to retrieve the start and stop row keys from the Scan instance. As the start and stop row keys span across multiple regions, the range is divided by region boundaries and returns the list of TableSplit objects which covers the scan key range. Instead of being based by HDFS block, TableSplits are based on region. By overwriting the getSplits() method, we are able to control the TableSplit.

Building Custom TableInputFormat

To change the behavior of the getSplits() method, a custom class extending TableInputFormat is required. The purpose of getSplits() here is to cover the logical key range in each region, construct their row key range with their unique salt. The HTable class provides method getStartEndKeys() which returns start and end row keys for each region. From each start key, parse the corresponding salt for the region.

Pair keys = table.getStartEndKeys();
for (int i = 0; i < keys.getFirst().length; i++) {

// The first 3 bytes is the salt, for the first region, start key is empty, so apply “000”
if (keys.getFirst()[i].length == 0) {
regionSalt = "000";
} else {
regionSalt = Bytes.toString(keys.getFirst()[i]).substring(0, 3);
}

}

Job Configuration Passes Logical Key Range

TableInputFormat retrieves the start and stop key from Scan instance. Since we cannot use Scan in our MapReduce job, we could use Configuration instead to pass these two variables and only logical start and stop key is good enough (a variable could be a date or other business information). The getSplits() method has JobContext argument, The configuration instance can be read as context.getConfiguration().

In MapReduce driver:

Configuration conf = getConf();
conf = HBaseConfiguration.addHbaseResources(conf);

conf.set("logical.scan.start", "2015-04-26");
conf.set("logical.scan.stop", "2015-04-27");

In Custom TableInputFormat:

@Override
public List getSplits(JobContext context) throws IOException {
conf = context.getConfiguration();
String scanStart = conf.get("logical.scan.start");
String scanStop = conf.get("logical.scan.stop");

}

Reconstruct the Salted Key Range by Region

Now that we have the salt and logical start/stop key for each region, we can rebuild the actual row key range.

byte[] startRowKey = Bytes.toBytes(regionSalt + "|" + scanStart);
byte[] endRowKey = Bytes.toBytes(regionSalt + "|" + scanStop);

Creating a TableSplit for Each Region

With row key range, we can now initialize TableSplit instance for the region.

List splits = new ArrayList(keys.getFirst().length);
for (int i = 0; i < keys.getFirst().length; i++) {

byte[] startRowKey = Bytes.toBytes(regionSalt + "|" + scanStart);
byte[] endRowKey = Bytes.toBytes(regionSalt + "|" + scanStop);

InputSplit split = new TableSplit(table.getTableName(), startRowKey, endRowKey, regionLocation);
splits.add(split);
}

One more thing to look at is data locality. The framework uses location information in each input split to assign a map task in its local host. For our TableInputFormat, we use the method getTableRegionLocation() to retrieve the region location serving the row key.

This location is then passed to the TableSplit constructor. This will assure that the mapper processing the table split is on the same region server. One method, called DNS.reverseDns(), requires the address for the HBase name server. This attribute is stored in configuration "hbase.nameserver.address".

this.nameServer = context.getConfiguration().get("hbase.nameserver.address", null);


public String getTableRegionLocation(HTable table, byte[] rowKey) throws IOException {
HServerAddress regionServerAddress = table.getRegionLocation(rowKey).getServerAddress();
InetAddress regionAddress = regionServerAddress.getInetSocketAddress().getAddress();
String regionLocation;
try {
regionLocation = reverseDNS(regionAddress);
} catch (NamingException e) {
regionLocation = regionServerAddress.getHostname();
}
return regionLocation;
}

protected String reverseDNS(InetAddress ipAddress) throws NamingException {
String hostName = this.reverseDNSCacheMap.get(ipAddress);
if (hostName == null) {
hostName = Strings.domainNamePointerToHostName(DNS.reverseDns(ipAddress, this.nameServer));
this.reverseDNSCacheMap.put(ipAddress, hostName);
}
return hostName;
}

A complete code of getSplits will look like this:

@Override
public List getSplits(JobContext context) throws IOException {
conf = context.getConfiguration();
table = getHTable(conf);
if (table == null) {
throw new IOException("No table was provided.");
}

// Get the name server address and the default value is null.
this.nameServer = conf.get("hbase.nameserver.address", null);
String scanStart = conf.get("region.scan.start");
String scanStop = conf.get("region.scan.stop");

Pair keys = table.getStartEndKeys();
if (keys == null || keys.getFirst() == null || keys.getFirst().length == 0) {
throw new RuntimeException("At least one region is expected");
}
List splits = new ArrayList(keys.getFirst().length);
for (int i = 0; i < keys.getFirst().length; i++) {

String regionLocation = getTableRegionLocation(table, keys.getFirst()[i]);

String regionSalt = null;
if (keys.getFirst()[i].length == 0) {
regionSalt = "000";
} else {
regionSalt = Bytes.toString(keys.getFirst()[i]).substring(0, 3);
}
byte[] startRowKey = Bytes.toBytes(regionSalt + "|" + scanStart);
byte[] endRowKey = Bytes.toBytes(regionSalt + "|" + scanStop);

InputSplit split = new TableSplit(table.getTableName(), startRowKey, endRowKey, regionLocation);
splits.add(split);
}
log.info("Total table splits: " + splits.size());
return splits;
}

Use the Custom TableInputFormat in the MapReduce Driver

Now we need to replace the TableInputFormat class with the custom build we used for table MapReduce job setup.

Configuration conf = getConf();
conf = HBaseConfiguration.addHbaseResources(conf);
HTableInterface status_table = new HTable(conf, status_tablename);

conf.set("logical.scan.start", "2015-04-26");
conf.set("logical.scan.stop", "2015-04-27");

Scan scan = new Scan();
scan.setCaching(1000);
scan.setCacheBlocks(false);
scan.setBatch(1000);
scan.setMaxVersions(1);

/* Setup the table mapper job */
TableMapReduceUtil.initTableMapperJob(
tablename,
scan,
DataScanMapper.class,
ImmutableBytesWritable.class,
KeyValue.class,
job,
true,
MultiRangeTableInputFormat.class
);

The approach of custom TableInputFormat provides an efficient and scalable scan capability for HBase tables which are designed to use salt for a balanced data load. Since the scan can bypass any unrelated row keys, regardless of how big the table is, the scan’s complexity is limited only to the size of the target data. In most use cases, this can guarantee relatively consistent processing time as the table grows.