You’ve covered a lot of ground in understanding HBase and how to build applications effectively. We also looked at how to deploy HBase clusters in a fully distributed fashion, what kind of hardware to choose, the various distribution options, and how to configure the cluster. All this information is useful in enabling you to take your application and HBase into production. But there is one last piece of the puzzle left to be covered—operations. As a developer of the application, you wouldn’t be expected to operate the underlying HBase cluster when everything is in production and the machines are churning full speed. But in the initial part of your project’s HBase adoption, chances are that you’ll be playing an integral role in the operations and helping the ops team get up to speed with all the aspects of operating an HBase cluster in production successfully.
Operations is a broad topic. Our goal for this chapter is to touch on the basic operational concepts pertaining to HBase. This will enable you to successfully operate your cluster and have your application serve the end users that it was built to serve. To do this, we’ll start with covering the concepts of monitoring and metrics as they pertain to HBase. This will consist of the different ways you can monitor your HBase deployment and the metrics you need to monitor.
Monitoring is an important step, and once you have that in place, you’ll be in a good place to start thinking about performance testing your HBase cluster and your application. There’s no point making all that effort and taking a system to production if it can’t sustain the load of all the users who want to use it!
We’ll then cover common management and operations tasks that you’ll need during the course of operating a cluster. These include things like starting and stopping services, upgrades, and detecting and fixing inconsistencies. The last topic in the chapter pertains to backup and replication of HBase clusters. This is important for business-continuity purposes when disaster strikes.
This chapter covers topics that are relevant to the 0.92 release. Some of the recommendations may change with future releases, and we encourage you to look into those if you’re using a later release.
Without further ado, let’s jump right in.
A critical aspect of any production system is the ability of its operators to monitor its state and behavior. When issues happen, the last thing an operator wants to do is to sift through GBs and TBs of logs to make sense of the state of the system and the root cause of the issue. Not many people are champions at reading thousands of log lines across multiple servers to make sense of what’s going on. That’s where recording detailed metrics comes into play. Many things are happening in a production-quality database like HBase, and each of them can be measured in different ways. These measurements are exposed by the system and can be captured by external frameworks that are designed to record them and make them available to operators in a consumable fashion.
Operations is particularly hard in distributed systems because many more components are involved, in terms of both the different pieces that make up the system and the scale at which they operate.
Collecting and graphing metrics isn’t unique to HBase and can be found in any successful system, large or small scale. The way different systems implement this may differ, though. In this section, we’ll talk about how HBase exposes metrics and the frameworks that are available to you to capture these metrics and use them to make sense of how your cluster is performing. We’ll also talk about the metrics HBase exposes, what they mean, and how you can use them to alert you about issues when they happen.
We recommend that you set up your full metrics collection, graphing, and monitoring stack even in the prototyping stage of your HBase adoption. This will enable you to become familiar with the various aspects of operating HBase and will make the transition to production much smoother. Plus, it’s fun to see pretty graphs showing requests hitting the system when they do. It will also help you in the process of building your application because you’ll know more about what’s going on in the underlying system when your application interacts with it.
The metrics framework is another of the many ways that HBase depends on Hadoop. HBase is tightly integrated with Hadoop and uses Hadoop’s underlying metrics framework to expose its metrics. At the time of writing this manuscript, HBase was still using the metrics framework v1.[1] Efforts are underway to have HBase use the latest and greatest,[2] but that hasn’t been implemented yet.
1 Hadoop metrics framework v1, Apache Software Foundation, http://mng.bz/J92f.
2 Hadoop metrics framework v2, Apache Software Foundation, http://mng.bz/aOEI.
It isn’t necessary to delve deeply into how the metrics frameworks are implemented unless you want to get involved in the development of these frameworks. If that’s your intention, by all means dive right into the code. If you’re just interested in getting metrics out of HBase that you can use for your application, all you need to know is how to configure the framework and the ways it will expose the metrics, which we’ll talk about next.
The metrics framework works by outputting metrics based on a context implementation that implements the MetricsContext interface. A couple of implementations come out of the box that you can use: Ganglia context and File context. In addition to these contexts, HBase also exposes metrics using Java Management Extensions (JMX).[3]
3 Qusay H. Mahmoud, “Getting Started with Java Management Extensions (JMX): Developing Management and Monitoring Solutions,” Oracle Sun Developer Network, January 6, 2004, http://mng.bz/619L.
Metrics solutions involve two aspects: collection and graphing. Typically these are both built into the same framework, but that’s not a requirement. Collection frameworks collect the metrics being generated by the system that is being monitored and store them efficiently so they can be used later. These frameworks also do things like roll-ups on a daily, monthly, or yearly basis. For the most part, granular metrics that are a year old aren’t as useful as a yearly summary of the same metrics.
Graphing tools use the data captured and stored by collection frameworks and make it easily consumable for the end user in the form of graphs and pretty pictures. These graphs are what the operator looks at to quickly get insight into the status of the system. Add to these graphs things like thresholds, and you can easily find out if the system isn’t performing in the expected range of operation. And based on these, you can take actions to prevent the end application from being impacted when Murphy strikes.[4]
4 You’ve certainly heard of Murphy’s law: http://en.wikipedia.org/wiki/Murphy’s_law.
Numerous collection and graphing tools are available. But not all of them are tightly integrated with how Hadoop and HBase expose metrics. You’re limited to Ganglia (which has native support from the Hadoop metrics framework) or to frameworks that can collect metrics via JMX.
Ganglia (http://ganglia.sourceforge.net/)[5] is a distributed monitoring framework designed to monitor clusters. It was developed at UC Berkeley and open-sourced. The Hadoop and HBase communities have been using it as the de facto solution to monitor clusters.
5Monitoring with Ganglia, by Matt Massie et al., is expected to release in November 2012 and will be a handy resource for all things monitoring and Ganglia. See http://mng.bz/Pzw8.
To configure HBase to output metrics to Ganglia, you have to set the parameters in the hadoop-metrics.properties file, which resides in the $HBASE_HOME/conf/ directory. The context you’ll configure depends on the version of Ganglia you choose to use. For versions older than 3.1, the GangliaContext should be used. For 3.1 and newer, GangliaContext31 should be used. The hadoop-metrics.properties file configured for Ganglia 3.1 or later looks like the following:
hbase.class=org.apache.hadoop.metrics.ganglia.GangliaContext31 hbase.period=10 hbase.servers=GMETADHOST_IP:PORT jvm.class=org.apache.hadoop.metrics.ganglia. GangliaContext31 jvm.period=10 jvm.servers= GMETADHOST_IP:PORT rpc.class=org.apache.hadoop.metrics.ganglia. GangliaContext31 rpc.period=10 rpc.servers= GMETADHOST_IP:PORT
Once you have Ganglia set up and the HBase daemons started with these configuration properties, the metrics list in Ganglia will show metrics being spewed out by HBase, as shown in figure 10.1.
Apart from exposing metrics using the Hadoop metrics framework, HBase also exposes metrics via JMX. Several open source tools such as Cacti and OpenTSDB can be used to collect metrics via JMX. JMX metrics can also be viewed as JSON from the Master and RegionServer web UI:
The default port for the Master is 60010 and for the RegionServer is 60030.
HBase can also be configured to output metrics into a flat file. Every time a metric is to be output, it’s appended to that file. This can be done with or without timestamps, depending on the context. File-based metrics aren’t a useful way of recording metrics because they’re hard to consume thereafter. Although we haven’t come across any production system where metrics are recorded into files for active monitoring purposes, it’s still an option for recording metrics for later analysis:
To enable metrics logging to files, the hadoop-metrics.properties file looks like this:
hbase.class=org.apache.hadoop.hbase.metrics.file.TimeStampingFileContext hbase.period=10 hbase.fileName=/tmp/metrics_hbase.log jvm.class=org.apache.hadoop.hbase.metrics.file.TimeStampingFileContext jvm.period=10 jvm.fileName=/tmp/metrics_jvm.log rpc.class=org.apache.hadoop.hbase.metrics.file.TimeStampingFileContext rpc.period=10 rpc.fileName=/tmp/metrics_rpc.log
Let’s look at the metrics that HBase exposes that you can use to get insights into the health and performance of your cluster.
The Master and RegionServers expose metrics. You don’t need to look at the HBase code to understand these, but if you’re curious and want to learn about how they’re reported and the inner workings of the metrics framework, we encourage you to browse through the code. Getting your hands dirty with the code never hurts.
The metrics of interest depend on the workload the cluster is sustaining, and we’ll categorize them accordingly. First we’ll cover the general metrics that are relevant regardless of the workload, and then we’ll look at metrics that are relevant to writes and reads independently.
Metrics related to the system load, network statistics, RPCs, alive regions, JVM heap, and JVM threads are of interest regardless of the kind of workload being run; they can be used to explain the system’s behavior. The Master UI shows the heap usage and the requests per second being served by the RegionServers (figure 10.2).
HBase metrics are important, but so are the metrics from dependency systems—HDFS, underlying OS, hardware, and the network. Often the root cause for behavior that is out of the normal range lies in the way the underlying systems are functioning. Issues there typically result in a cascading effect on the rest of the stack and end up impacting the client. The client either doesn’t perform properly or fails due to unexpected behavior. This is even more pronounced in distributed systems that have more components that can fail and more dependencies. Covering detailed metrics and monitoring for all dependencies is beyond the scope of this book, but plenty of resources exist that you can use to study those.[6]
6Hadoop Operations, by Eric Sammer, is a good resource for all things related to operating Hadoop in production. It’s expected to release in fall 2012. See http://mng.bz/iO24.
The important bits that you absolutely need to monitor are as follows:
System- and network-level information can be seen from Ganglia (figure 10.3) and from several Linux tools such as lsof, top, iostat, netstat, and so on. These are handy tools to learn if you’re administering HBase.
One interesting metric to keep an eye on is the CPU I/O wait percentage. This indicates the amount of time the CPU spends waiting for disk I/O and is a good indicator of whether your system is I/O bound. If it is I/O bound, you need more disks in almost all cases. Ganglia graphs for CPU I/O wait percentage from a cluster running a heavy write workload are shown in figure 10.4. This metric is useful when the read I/O is high as well.
We’ve talked about some of the generic metrics that are of interest in a running cluster. We’ll now go into write- and read-specific metrics.
To understand the system state during writes, the metrics of interest are the ones that are collected as data is written into the system. This translates into metrics related to MemStore, flushes, compactions, garbage collection, and HDFS I/O.
During writes, the ideal MemStore metrics graph should look like saw teeth. That indicates smooth flushing of the MemStore and predictable garbage collection overhead. Figure 10.5 shows the MemStore size metrics from Ganglia during heavy writes.
To understand HDFS write latencies, the fsWriteLatency and fsSyncLatency metrics are useful. The write-latency metric includes the latency while writing HFiles as well as the WAL. The sync-latency metrics are only for the WALs.
Write latencies going up typically also causes the compaction queues to increase (figure 10.6).
Garbage-collection metrics are exposed by the JVM through the metric context; you can find them in Ganglia as jvm.metrics.gc*. Another useful way of finding out what’s going on with garbage collection is to enable garbage-collection logging by putting the -Xloggc:/my/logs/directory/hbase-regionserver-gc.log flag in the Java options (in hbase-env.sh) for the RegionServers. This is useful information when dealing with unresponsive RegionServer processes during heavy writes. A common cause for that is long garbage-collection pauses, which typically means garbage collection isn’t tuned properly.
Reads are different than writes, and so are the metrics you should monitor to understand them. During reads, the metrics of interest relate to the block cache primarily, apart from the general metrics that we covered initially. The block-cache metrics for cache hits, evictions, and cache size are useful in understanding the read performance; you can tune your cache and table properties accordingly. Figure 10.7 shows cache-size metrics during a read-heavy workload.
Tools monitoring HBase may be giving you great-looking graphs, and everything at the system level may be running stably. But that doesn’t mean your entire application stack is running well. In a production environment, we recommend that you add to the system-level monitoring that Ganglia and other tools provide and also monitor how HBase looks from your application’s perspective. This is likely to be a custom implementation based on how your application is using HBase. The HBase community has not yet come up with templates for doing this, but that may change over time. You could well contribute to that initiative.
The following can be useful while monitoring HBase as seen by the application:
Checks like these enable you to keep track of your application’s view of HBase, and you can correlate that with HBase-level metrics to better understand the application’s behavior. This is a solution for which you’ll have to work with your system administrators and operations team. Invest time and effort in it. It will benefit you in the long run when operating your application in production.
Performance of any database is measured in terms of the response times of the operations that it supports. This is important to measure in the context of your application so you can set the right expectations for users. For instance, a user of an application backed by an HBase cluster shouldn’t have to wait for tens of seconds to get a response when they click a button. Ideally, it should happen in milliseconds. Of course, this isn’t a general rule and will depend a lot on the type of interaction the user is engaged in.
To make sure your HBase cluster is performing within the expected SLAs, you must test performance thoroughly and tune the cluster to extract the maximum performance you can get out of it. This section will cover the various ways you can test the performance of your cluster and then will look at what impacts the performance. From there, we’ll cover the various knobs that are available to you to tune the system.
There are different ways you can test the performance of your HBase cluster. The best way is to put it under a real workload that emulates what your application is likely to see in production. But it’s not always possible to test with real workloads without launching a beta version of the application where a select few users interact with it. Ideally, you’ll want to do some level of testing before that so you can be confident of the performance to some degree. You can use a couple of options to achieve that.
Having a monitoring framework in place before testing the performance of your cluster is useful. Install it! You’ll be able to get much more insight into the system’s behavior with it than without it.
HBase ships with a tool called PerformanceEvaluation, which you can use to evaluate the performance of your HBase cluster in terms of various operations. It’s based on the performance-evaluation tool described in the original Bigtable paper by Google. To get its usage details, you can run the tool without any arguments:
$ $HBASE_HOME/bin/hbase org.apache.hadoop.hbase.PerformanceEvaluation Usage: java org.apache.hadoop.hbase.PerformanceEvaluation [--miniCluster] [--nomapred] [--rows=ROWS] <command> <nclients> Options: miniCluster Run the test on an HBaseMiniCluster nomapred Run multiple clients using threads (rather than use mapreduce) rows Rows each client runs. Default: One million flushCommits Used to determine if the test should flush the table. Default: false writeToWAL Set writeToWAL on puts. Default: True Command: filterScan Run scan test using a filter to find a specific row based on its value (make sure to use --rows=20) randomRead Run random read test randomSeekScan Run random seek and scan 100 test randomWrite Run random write test scan Run scan test (read every row) scanRange10 Run random seek scan with both start and stop row (max 10 rows) scanRange100 Run random seek scan with both start and stop row (max 100 rows) scanRange1000 Run random seek scan with both start and stop row (max 1000 rows) scanRange10000 Run random seek scan with both start and stop row (max 10000 rows) sequentialRead Run sequential read test sequentialWrite Run sequential write test Args: nclients Integer. Required. Total number of clients (and HRegionServers) running: 1 <= value <= 500 Examples: To run a single evaluation client: $ bin/hbase org.apache.hadoop.hbase.PerformanceEvaluation sequentialWrite 1
As you can see from the usage details, you can run all kinds of tests using this tool. They all run as MapReduce jobs unless you set the number of clients as 1, in which case they run as a single-threaded client. You can configure the number of rows to be written/read per client and the number of clients. Run the sequentialWrite or the randomWrite commands first so they create a table and put some data in it. That table and data can thereafter be used for read tests like randomRead, scan, and sequential-Read. The tool doesn’t need you to create a table manually; it does that on its own when you run the commands to write data into HBase.
If you care about random reads and writes only, you can run this tool from anywhere outside the cluster as long as the HBase JARs and configs are deployed there. MapReduce jobs will run from wherever the MapReduce framework is installed, which ideally shouldn’t be collocated with the HBase cluster (as we talked about previously).
A sample run of this tool looks like this:
$ hbase org.apache.hadoop.hbase.PerformanceEvaluation --rows=10 sequentialWrite 1 12/06/18 15:59:29 WARN conf.Configuration: hadoop.native.lib is deprecated. Instead, use io.native.lib.available 12/06/18 15:59:29 INFO zookeeper.ZooKeeper: Client environment:zookeeper.version=3.4.3-cdh4.0.0--1, built on 06/04/2012 23:16 GMT ... ... ... 12/06/18 15:59:29 INFO hbase.PerformanceEvaluation: 0/9/10 12/06/18 15:59:29 INFO hbase.PerformanceEvaluation: Finished class org.apache.hadoop.hbase.PerformanceEvaluation$SequentialWriteTest in 14ms at offset 0 for 10 rows
This run wrote 10 rows sequentially from a single thread and took 14 ms to do so.
The limitation of this testing utility is that you can’t run mixed workloads without coding it up yourself. The test has to be one of the bundled ones, and they have to be run individually as separate runs. If your workload consists of Scans and Gets and Puts happening at the same time, this tool doesn’t give you the ability to truly test your cluster by mixing it all up. That brings us to our next testing utility.
In chapter 1, we talked about NoSQL systems that were developed at various companies to solve their data-management problems. This led to flame wars and bake-offs about who was better than whom. Although it was fun to watch, it also made things unclear when it came to comparing the performance of the different systems. That’s a hard task to do in general because these systems are designed for different use cases and with different trade-offs. But we need a standardized way of comparing them, and the industry still lacks that.
Yahoo! funded research to come up with a standard performance-testing tool that could be used to compare different databases. The company called it Yahoo! Cloud Serving Benchmark (YCSB). YCSB is the closest we have come to having a standard benchmarking tool that can be used to measure and compare the performance of different distributed databases. Although YCSB is built for comparing systems, you can use it to test the performance of any of the databases it supports, including HBase. YCSB consists of the YCSB client, which is an extensible workload generator, and the core workloads, which are a set of workloads that comes prepackaged and can be generated by the YCSB client.
YCSB is available from the project’s GitHub repository (http://github.com/brianfrankcooper/YCSB/). You have to compile it using Maven.
To start, clone the Git repository:
$ git clone git://github.com/brianfrankcooper/YCSB.git Cloning into YCSB... ... ... Resolving deltas: 100% (906/906), done.
Once cloned, compile the code:
$ cd YCSB $ mvn -DskipTests package
Once YCSB is compiled, put your HBase cluster’s configuration in hbase/src/main/ conf/hbase-site.xml. You only need to put the hbase.zookeeper.quorum property in the config file so YCSB can use it as the entry point for the cluster. Now you’re ready to run workloads to test your cluster. YCSB comes with a few sample workloads that you can find in the workloads directory. We’ll use one of those for this example, but you can create your own workloads based on what you want to test from your cluster.
Before running the workload, you need to create the HBase table YCSB will write to. You can do that from the shell:
hbase(main):002:0> create 'mytable', 'myfamily'
After that, you’re ready to test your cluster:
$ bin/ycsb load hbase -P workloads/workloada -p columnfamily=myfamily -p table=mytable
You can do all sorts of fancy stuff with YCSB workloads, including configuring multiple clients, configuring multiple threads, and running mixed workloads with different statistical distributions of the data.
You now know a couple of ways to test the performance of your HBase cluster; you’ll likely do this testing before taking the cluster to production. There may be areas in which you can improve the performance of the cluster. To understand that, it’s important to be familiar with all the factors that impact HBase’s performance.[8]
8 Yahoo! Cloud Serving Benchmark: http://research.yahoo.com/Web_Information_Management/YCSB.
HBase is a distributed database and is tightly coupled with Hadoop. That makes it susceptible to the entire stack under it (figure 10.8) when it comes to performance.
Performance is affected by everything from the underlying hardware that makes up the boxes in the cluster to the network connecting them to the OS (specifically the file system) to the JVM to HDFS. The state of the HBase system matters too. For instance, performance is different during a compaction or during MemStore flushes compared to when nothing is going on in the cluster. Your application’s performance depends on how it interacts with HBase, and your schema design plays an integral role as much as anything else.
When looking at HBase performance, all of these factors matter; and when you tune your cluster, you need to look into all of them. Going into tuning each of those layers is beyond the scope of this text. We covered JVM tuning (garbage collection specifically) in chapter 9. We’ll discuss some key aspects of tuning your HBase cluster next.
Tuning an HBase cluster to extract maximum performance involves tuning all dependencies. There’s not a lot you need to do with things like the hardware and OS if you choose them wisely and install them correctly, based on the best practices outlined by the HBase community and highlighted by us in chapter 9. We’ll touch on them here as well. We recommend working with your system administrators on these to make sure you get them right.
We’ll start with the most basic building block of your HBase cluster—the hardware. Make sure to choose the hardware based on the recommendations we provided in chapter 9. We won’t repeat all the recommendations here. But to sum it up, get enough disks and RAM, but don’t go overboard shopping for the state of the art. Buy commodity, but choose quantity over quality. Scaling out pays off much better in the case of Hadoop and HBase clusters.
Any self-respecting distributed system based on current-generation hardware is network bound. HBase is no different. 10GbE networks between the nodes and the TOR switches are recommended. Don’t oversubscribe the network a lot, or you’ll see performance impact during high load.
Linux has been the choice of OS as far as Hadoop and HBase systems go. There have been successful deployments on both Red Hat-based (Red Hat Enterprise Linux [RHEL], CentOS) and Debian-based (Ubuntu and so on) flavors of Linux. Choose the one that you have good support for.
Local Linux file systems play an important role in the stack and impact the performance of HBase significantly. Although Ext4 is the recommended file system, Ext3 and XFS have been successfully used in production systems as well. Tune the file systems based on our recommendations in chapter 9.
HDFS performance is key for a well-performing HBase cluster. There’s not a lot to tune if you have the underlying network, disks, and local file system configured correctly.
The one additional configuration you may consider is short-circuiting local client reads. This feature is new with Hadoop 1.0 and allows an HDFS client to read blocks directly from the local file system when possible. This feature is particularly relevant to both read-heavy and mixed workloads. Enable it by setting dfs.client.read.short-circuit to true in hdfs-site.xml. All you need beyond that is to tune the data xcievers, which we highlighted in chapter 9.
Tuning an HBase cluster typically involves tuning multiple different configuration parameters to suit the workload that you plan to put on the cluster. Do this as you do performance testing on the cluster, and use the configurations mentioned in chapter 9 to get your combination right. No out-of-the-box recipes are available to configure HBase for certain workloads, but you can attempt to categorize them as one of the following:
Each of these workloads demands a different kind of configuration tuning, and we recommend that you experiment to figure out the best combination for you. Here are a few guidelines for you to work with when trying to tune your cluster based on the categories mentioned.
For random-read-heavy workloads, effective use of the cache and better indexing will get you higher performance. Pay attention to the configuration parameters listing in table 10.1.
For sequential-read-heavy workloads, the read cache doesn’t buy you a lot; chances are you’ll be hitting the disk more often than not unless the sequential reads are small in size and are limited to a particular key range. Pay attention to the configuration parameters in table 10.2.
Configuration parameter |
Description |
Recommendation |
---|---|---|
HFile block size | This is the parameter you set as a part of the column family configuration for a given table like this:
hbase(main):002:0> create 'mytable', {NAME => 'colfam1', BLOCKSIZE => '65536'} |
Higher block size gives you more data read per disk seek. 64 KB is a good place to start, but you should test with higher values to see if performance improves. Very high values compromise performance in finding the start key for your scans. |
hbase.client.scanner .caching |
This defines the number of rows that will be fetched when the next method is called on a scanner. The higher the number, the fewer remote calls the client needs to make to the RegionServer during scans. A higher number also means more memory consumption at the client side. This can be set on a per-cli-ent basis in the configuration object as well. | Set a higher scanner-caching value so the scanner gets more rows back per RPC request while doing large sequential reads. The default value is 1. Increase it to a slightly higher number than what you expect to read in every scan iteration. Depending on your application logic and the size of the rows returned over the wire, potential values could be 50 or 1000. You can also set this on a per-Scan instance basis using the Scan .setCaching(int) method. |
Disable caching blocks through scanner API Scan.setCacheBlocks(..) | This setting defines whether the blocks being scanned should be put into the BlockCache. | Loading all blocks read by a scanner into the BlockCache causes a lot of cache churn. For large scans, you can disable the caching of blocks by setting this value to false. |
Disable caching on table | Column families can be configured to not be cached into the block cache at read time like this:
hbase(main):002:0> create 'mytable', {NAME => 'colfam1', BLOCKCACHE => 'false'} |
If the table is primarily accessed using large scans, the cache most likely won’t buy you much performance. Instead you’ll be churning the cache and impacting other tables that are being accessed for smaller random reads. You can disable the BlockCache so it doesn’t churn the cache on every scan. |
Write-heavy workloads need different tuning than read-heavy ones. The cache doesn’t play an important role anymore. Writes always go into the MemStore and are flushed to form new HFiles, which later are compacted. The way to get good write performance is by not flushing, compacting, or splitting too often because the I/O load goes up during that time, slowing the system. The configuration parameters in table 10.3 are of interest while tuning for a write-heavy workload.
Configuration parameter |
Description |
Recommendation |
---|---|---|
hbase.hregion.max.filesize | This determines the maximum size of the underlying store files (HStoreFile). The region size is defined by this parameter. If any store file of any column family exceeds this size, the region is split. | Larger regions mean fewer splits at write time. Increase this number, and see where you get optimal performance for your use case. We have come across region sizes ranging from 256 MB to 4 GB. 1 GB is a good place to begin experimenting. |
hbase.hregion.memstore .flush.size |
This parameter defines the size of the MemStore and is configured in bytes. The Mem-Store is flushed to disk when it exceeds this size. A thread that runs periodically checks the size of the MemStore. | Flushing more data to HDFS and creating larger HFiles reduce the number of compactions required by reducing the number of files created during writes. |
hbase.regionserver.global .memstore.lowerLimit and hbase.regionserver.global .memstore.upperLimit |
upperLimit defines the maximum percentage of the heap on a RegionServer that the MemStores combined can use. The moment upper-Limit is hit, MemStores are flushed until lowerLimit is hit. Setting these values equal to each other means that a minimum amount of flushing happens when writes are blocked because of upperLimit being hit. This minimizes pauses during writes but also causes more frequent flushing. | You can increase the percentage of heap allocated to the MemStore on every Region-Server. Don’t go overboard with this because it can cause garbage-collection issues. Configure upperLimit such that it can accommodate the Mem-Store per region multiplied by the number of expected regions per RegionServer. |
Garbage collection tuning | Java garbage collection plays an important role when it comes to the write performance of an HBase cluster. See the recommendations provided in chapter 9, and tune based on them. | |
hbase.hregion.memstore .mslab.enabled |
MemStore-Local Allocation Buffer is a feature in HBase that helps prevent heap fragmentation when there are heavy writes going on. In some cases, enabling this feature can help alleviate issues of long garbage-collection pauses if the heaps are too large. The default value of this parameter is true. | Enabling this feature can give you better write performance and more stable operations. |
With completely mixed workloads, tuning becomes slightly trickier. You have to tweak a mix of the parameters described earlier to achieve the optimal combination. Iterate over various combinations, and run performance tests to see where you get the best results.
Outside of the previously mentioned configuration, the following impact performance in general:
During the course of running a production system, management tasks need to be performed at different stages. Even though HBase is a distributed system with various failure-resistance techniques and high availability built into it, it still needs a moderate amount of care on a daily basis. Things like starting or stopping the cluster, upgrading the OS on the nodes, replacing bad hardware, and backing up data are important tasks and need to be done right to keep the cluster running smoothly. Sometimes these tasks are in response to events like hardware going bad, and other times they’re purely to stay up to date with the latest and greatest releases.
This section highlights some of the important tasks you may need to perform and teaches how to do them. HBase is a fast-evolving system, and not all problems are solved. Until recently, it was operated mostly by people intimately familiar with the internals, including some of the committers. There wasn’t a lot of focus on making automated management tools that simplify life on the operations side. Therefore, some things that we’ll cover in this section require more manual intervention than others. These will likely go into an operations manual that cluster administrators can refer to when required. Get ready to get your hands dirty.
Starting and stopping the HBase daemons will probably be more common than you expect, especially in the early stages of setting up the cluster and getting things going. Configuration changes are the most common reason for this activity. You can do this different ways, but the underlying principles are the same. The order in which the HBase daemons are stopped and started matters only to the extent that the dependency systems (HDFS and ZooKeeper) need to be up before HBase is started and should be shut down only after HBase has shut down.
Different distributions come with different scripts to start/stop daemons. The stock Apache distribution has the following scripts (in the $HBASE_HOME/bin directory) available that you can use:
CDH comes with init scripts and doesn’t use the scripts that come with the stock Apache release. These scripts are located in /etc/init.d/hbase-<daemon>.sh and can be used to start, stop, or restart the daemon process.
Cluster-management frameworks like Puppet and Chef can be used to manage the starting and stopping of daemons from a central location. Proprietary tools like Cloudera Manager can also be used for this purpose. Typically, there are security concerns associated with passwordless SSH, and many system administrators try to find alternate solutions.
When you need to shut down daemons on individual servers for any management purpose (upgrading, replacing hardware, and so on), you need to ensure that the rest of the cluster keeps working fine and there is minimal outage as seen by client applications. This entails moving the regions being served by that RegionServer to some other RegionServer proactively rather than having HBase react to a RegionServer going down. HBase will recover from a RegionServer going down, but it will wait for the RegionServer to be detected as down and then start reassigning the regions elsewhere. Meanwhile, the application may possibly experience a slightly degraded availability. Moving the regions proactively to other RegionServers and then killing the RegionServer makes the process safer.
To do this, HBase comes with the graceful-stop.sh script. Like the other scripts we’ve talked about, this script is also located in the $HBASE_HOME/bin directory:
$ bin/graceful_stop.sh Usage: graceful_stop.sh [--config <conf-dir>] [--restart] [--reload] [--thrift] [--rest] <hostname> thrift If we should stop/start thrift before/after the hbase stop/start rest If we should stop/start rest before/after the hbase stop/start restart If we should restart after graceful stop reload Move offloaded regions back on to the stopped server debug Move offloaded regions back on to the stopped server hostname Hostname of server we are to stop
The script follows these steps (in order) to gracefully stop a RegionServer:
This script also needs passwordless SSH from the node you’re running it on to the Region-Server node you’re trying to stop. If passwordless SSH isn’t an option, you can look at the source code of the script and implement one that works for your environment.
Decommissioning nodes is an important management task, and using the graceful-shutdown mechanism to cleanly shut down the RegionServer is the first part. Thereafter, you need to remove the node from the list of nodes where the RegionServer process is expected to run so your scripts and automated-management software don’t start the process again.
As your application gets more successful or more use cases crop up, chances are you’ll need to scale up your HBase cluster. It could also be that you’re replacing a node for some reason. The process to add a node to the HBase cluster is the same in both cases.
Presumably, you’re running the HDFS DataNode on the same physical node. The first part of adding a RegionServer to the cluster is to add the DataNode to HDFS. Depending on how you’re managing your cluster (using the provided start/stop scripts or using centralized management software), start the DataNode process and wait for it to join the HDFS cluster. Once that is done, start the HBase RegionServer process. You’ll see the node be added to the list of nodes in the Master UI. After this, if you want to balance out the regions being served by each node and move some load onto the newly added RegionServer, run the balancer using the following:
echo "balancer" | hbase shell
This will move some regions from all RegionServers to the new RegionServer and balance the load across the cluster. The downside of running the balancer is that you’ll likely lose data locality for the regions that are moved. But this will be taken care of during the next round of major compactions.
It’s not rare to patch or upgrade Hadoop and HBase releases in running clusters—especially if you want to incorporate the latest and greatest features and performance improvements. In production systems, upgrades can be tricky. Often, it isn’t possible to take downtime on the cluster to do upgrades. In some cases, the only option is to take downtime. This generally happens when you’re looking to upgrade between major releases where the RPC protocol doesn’t match the older releases, or other changes aren’t backward compatible. When this happens, you have no choice but to plan a scheduled downtime and do the upgrade.
But not all upgrades are between major releases and require downtime. When the upgrade doesn’t have backward-incompatible changes, you can do rolling upgrades. This means you upgrade one node at a time without bringing down the cluster. The idea is to shut down one node cleanly, upgrade it, and then bring it back up to join the cluster. This way, your application SLAs aren’t impacted, assuming you have ample spare capacity to serve the same traffic when one node is taken down for the upgrade. In an ideal world, there would be scripts you could run for this purpose. HBase does ship with some scripts that can help, but they’re naïve implementations of the concept[9] and we recommend you implement custom scripts based on your environment’s requirements. To do upgrades without taking a downtime, follow these steps:
9 This is true as of the 0.92.1 release. There may be more sophisticated implementations in future releases.
When these steps are finished, your cluster is running with the upgraded HBase version. These steps assume that you’ve taken care of upgrading the underlying HDFS.
You can use the same steps to do a rolling restart for any other purpose as well.
Throughout the book, you have used the shell to interact with HBase. Chapter 6 also covered scripting of the shell commands and extending the shell using JRuby. These are useful tools to have for managing your cluster on an everyday basis. The shell exposes several commands that come in handy to perform simple operations on the cluster or find out the cluster’s health. Before we go into that, let’s see the options that the bin/hbase script provides, which you use to start the shell. The script basically runs the Java class associated with the command you choose to pass it:
$ $HBASE_HOME/bin/hbase Usage: hbase <command> where <command> an option from one of these categories: DBA TOOLS shell run the HBase shell hbck run the hbase 'fsck' tool hlog write-ahead-log analyzer hfile store file analyzer zkcli run the ZooKeeper shell PROCESS MANAGEMENT master run an HBase HMaster node regionserver run an HBase HRegionServer node zookeeper run a Zookeeper server rest run an HBase REST server thrift run an HBase Thrift server avro run an HBase Avro server PACKAGE MANAGEMENT classpath dump hbase CLASSPATH version print the version or CLASSNAME run the class named CLASSNAME Most commands print help when invoked w/o parameters.
We’ll cover the hbck, hlog, and hfile commands in future sections. For now, let’s start with the shell command. To get a list of commands that the shell has to offer, type help in the shell, and here’s what you’ll see:
hbase(main):001:0> help HBase Shell, version 0.92.1, r039a26b3c8b023cf2e1e5f57ebcd0fde510d74f2, Thu May 31 13:15:39 PDT 2012 Type 'help "COMMAND"', (e.g., 'help "get"' -- the quotes are necessary) for help on a specific command. Commands are grouped. Type 'help "COMMAND_GROUP"', (e.g., 'help "general"') for help on a command group. COMMAND GROUPS: Group name: general Commands: status, version Group name: ddl Commands: alter, alter_async, alter_status, create, describe, disable, disable_all, drop, drop_all, enable, enable_all, exists, is_disabled, is_enabled, list, show_filters Group name: dml Commands: count, delete, deleteall, get, get_counter, incr, put, scan, truncate Group name: tools Commands: assign, balance_switch, balancer, close_region, compact, flush, hlog_roll, major_compact, move, split, unassign, zk_dump Group name: replication Commands: add_peer, disable_peer, enable_peer, list_peers, remove_peer, start_replication, stop_replication Group name: security Commands: grant, revoke, user_permission SHELL USAGE: Quote all names in HBase Shell such as table and column names. Commas delimit command parameters. Type <RETURN> after entering a command to run it. Dictionaries of configuration used in the creation and alteration of tables are Ruby Hashes. They look like this: {'key1' => 'value1', 'key2' => 'value2', ...} and are opened and closed with curly-braces. Key/values are delimited by the '=>' character combination. Usually keys are predefined constants such as NAME, VERSIONS, COMPRESSION, etc. Constants do not need to be quoted. Type 'Object.constants' to see a (messy) list of all constants in the environment. If you are using binary keys or values and need to enter them in the shell, use double-quote'd hexadecimal representation. For example: hbase> get 't1', "keyx03x3fxcd" hbase> get 't1', "key 03 23 11" hbase> put 't1', "testxefxff", 'f1:', "x01x33x40" The HBase shell is the (J)Ruby IRB with the above HBase-specific commands added. For more on the HBase Shell, see http://hbase.apache.org/docs/current/ book.html
We’ll focus on the tools group of commands (shown in bold). To get a description for any command, you can run help 'command_name' in the shell like this:
hbase(main):003:0> help 'status' Show cluster status. Can be 'summary', 'simple', or 'detailed'. The default is 'summary'. Examples: hbase> status hbase> status 'simple' hbase> status 'summary' hbase> status 'detailed'
You can use the zk_dump command to find out the current state of ZooKeeper:
hbase(main):030:0> > zk_dump HBase is rooted at /hbase Master address: 01.mydomain.com:60000 Region server holding ROOT: 06.mydomain.com:60020 Region servers: 06.mydomain.com:60020 04.mydomain.com:60020 02.mydomain.com:60020 05.mydomain.com:60020 03.mydomain.com:60020 Quorum Server Statistics: 03.mydomain.com:2181 Zookeeper version: 3.3.4-cdh3u3--1, built on 01/26/2012 20:09 GMT Clients: ... 02.mydomain.com:2181 Zookeeper version: 3.3.4-cdh3u3--1, built on 01/26/2012 20:09 GMT Clients: ... 01.mydomain.com:2181 Zookeeper version: 3.3.4-cdh3u3--1, built on 01/26/2012 20:09 GMT Clients: ...
This tells you the current active HBase Master, the list of RegionServers that form the cluster, the location of the -ROOT- table, and the list of servers that form the ZooKeeper quorum. ZooKeeper is the starting point of the HBase cluster and the source of truth when it comes to the membership in the cluster. The information spewed out by zk_dump can come in handy while trying to debug issues about the cluster such as finding out which server is the active Master Server or which RegionServer is hosting -ROOT-.
You can use the status command to determine the status of the cluster. This command has three options: simple, summary, and detailed. The default is the summary option. We show all three here, to give you an idea of the information included with each of them:
hbase(main):010:0> status 'summary' 1 servers, 0 dead, 6.0000 average load hbase(main):007:0> status 'simple' 1 live servers localhost:62064 1341201439634 requestsPerSecond=0, numberOfOnlineRegions=6, usedHeapMB=40, maxHeapMB=987 0 dead servers Aggregate load: 0, regions: 6 hbase(main):009:0> status 'detailed' version 0.92.1 0 regionsInTransition master coprocessors: [] 1 live servers localhost:62064 1341201439634 requestsPerSecond=0, numberOfOnlineRegions=6, usedHeapMB=40, maxHeapMB=987 -ROOT-,,0 numberOfStores=1, numberOfStorefiles=2, storefileUncompressedSizeMB=0, storefileSizeMB=0, memstoreSizeMB=0, storefileIndexSizeMB=0, readRequestsCount=48, writeRequestsCount=1, rootIndexSizeKB=0, totalStaticIndexSizeKB=0, totalStaticBloomSizeKB=0, totalCompactingKVs=0, currentCompactedKVs=0, compactionProgressPct=NaN, coprocessors=[] .META.,,1 numberOfStores=1, numberOfStorefiles=1, storefileUncompressedSizeMB=0, storefileSizeMB=0, memstoreSizeMB=0, storefileIndexSizeMB=0, readRequestsCount=36, writeRequestsCount=4, rootIndexSizeKB=0, totalStaticIndexSizeKB=0, totalStaticBloomSizeKB=0, totalCompactingKVs=28, currentCompactedKVs=28, compactionProgressPct=1.0, coprocessors=[] table,,1339354041685.42667e4f00adacec75559f28a5270a56. numberOfStores=1, numberOfStorefiles=1, storefileUncompressedSizeMB=0, storefileSizeMB=0, memstoreSizeMB=0, storefileIndexSizeMB=0, readRequestsCount=0, writeRequestsCount=0, rootIndexSizeKB=0, totalStaticIndexSizeKB=0, totalStaticBloomSizeKB=0, totalCompactingKVs=0, currentCompactedKVs=0, compactionProgressPct=NaN, coprocessors=[] t1,,1339354920986.fba20c93114a81cc72cc447707e6b9ac. numberOfStores=1, numberOfStorefiles=1, storefileUncompressedSizeMB=0, storefileSizeMB=0, memstoreSizeMB=0, storefileIndexSizeMB=0, readRequestsCount=0, writeRequestsCount=0, rootIndexSizeKB=0, totalStaticIndexSizeKB=0, totalStaticBloomSizeKB=0, totalCompactingKVs=0, currentCompactedKVs=0, compactionProgressPct=NaN, coprocessors=[] table1,,1340070923439.f1450e26b69c010ff23e14f83edd36b9. numberOfStores=1, numberOfStorefiles=1, storefileUncompressedSizeMB=0, storefileSizeMB=0, memstoreSizeMB=0, storefileIndexSizeMB=0, readRequestsCount=0, writeRequestsCount=0, rootIndexSizeKB=0, totalStaticIndexSizeKB=0, totalStaticBloomSizeKB=0, totalCompactingKVs=0, currentCompactedKVs=0, compactionProgressPct=NaN, coprocessors=[] ycsb,,1340070872892.2171dad81bfe65e6ac6fe081a66c8dfd. numberOfStores=1, numberOfStorefiles=0, storefileUncompressedSizeMB=0, storefileSizeMB=0, memstoreSizeMB=0, storefileIndexSizeMB=0, readRequestsCount=0, writeRequestsCount=0, rootIndexSizeKB=0, totalStaticIndexSizeKB=0, totalStaticBloomSizeKB=0, totalCompactingKVs=0, currentCompactedKVs=0, compactionProgressPct=NaN, coprocessors=[] 0 dead servers
As you can see, the detailed status command gives out a bunch of information about the RegionServers and the regions they’re serving. This can come in handy when you’re trying to diagnose problems where you need in-depth information about the regions and the servers that are serving them.
Otherwise, the summary option gives you the number of live and dead servers and the average load at that point. This is mostly useful as a sanity check to see if nodes are up and not overloaded.
Triggering compactions from the shell isn’t something you’ll need to do often, but the shell does give you the option to do so if you need it. You can use the shell to trigger compactions, both minor and major, using the compact and major_compact commands, respectively:
hbase(main):011:0> help 'compact' Compact all regions in passed table or pass a region row to compact an individual region
Trigger minor compaction on a table like this:
hbase(main):014:0> compact 't' 0 row(s) in 5.1540 seconds
Trigger minor compaction on a particular region like this:
hbase(main):015:0> compact 't,,1339354041685.42667e4f00adacec75559f28a5270a56.' 0 row(s) in 0.0600 seconds
If you disable automatic major compactions and make it a manual process, this comes in handy; you can script the major compaction and run it as a cron job at a time that’s suitable (when the load on the cluster is low).
The balancer is responsible for making sure all RegionServers are serving an equivalent number of regions. The current implementation of the balancer takes into consideration the number of regions per RegionServer and attempts to redistribute them if the distribution isn’t even. You can run the balancer through the shell like this:
hbase(main):011:0> balancer true 0 row(s) in 0.0200 seconds
The returned value when you run the balancer is true or false, and this pertains to whether the balancer ran.
You can turn off the balancer by using the balance_switch command. When you run the command, it returns true or false. The value it returns represents the state of the balancer before the command is run. To enable the balancer to run automatically, pass true as the argument to balance_switch. To disable the balancer, pass false. For example:
hbase(main):014:0> balance_switch false true 0 row(s) in 0.0200 seconds
This switches off the automatic balancer. It was turned on before the command was run, as shown by the value returned.
The shell gives you the ability to split existing tables. Ideally, this is something you shouldn’t have to do. But there are cases like region hot-spotting where you may need to manually split the region that’s being hot-spotted. Region hot-spotting typically points to another problem, though—bad key design leading to suboptimal load distribution.
The split command can be given a table name, and it will split all the regions in that table; or you can specify a particular region to be split. If you specify the split key, it splits only around that key:
hbase(main):019:0> help 'split'
You can split an entire table or pass a region to split an individual region. With the second parameter, you can specify an explicit split key for the region. Here are some examples:
split 'tableName' split 'regionName' # format: 'tableName,startKey,id' split 'tableName', 'splitKey' split 'regionName', 'splitKey'
The following example splits mytable around the key G:
hbase(main):019:0> split 'mytable' , 'G'
Tables can also be presplit at the time of table creation. You can do this using the shell too. We cover this later in the chapter.
Using the shell, you can alter properties of existing tables. For instance, suppose you want to add compression to some column families or increase the number of versions. For this, you have to disable the table, make the alterations, and re-enable the table, as shown here:
hbase(main):019:0> disable 't' 0 row(s) in 2.0590 seconds hbase(main):020:0> alter 't', NAME => 'f', VERSIONS => 1 Updating all regions with the new schema... 1/1 regions updated. Done. 0 row(s) in 6.3300 seconds hbase(main):021:0> enable 't' 0 row(s) in 2.0550 seconds
You can check that the table properties changed by using the describe 'tablename' command in the shell.
Truncating tables means deleting all the data but preserving the table structure. The table still exists in the system, but it’s empty after the truncate command is run on it. Truncating a table in HBase involves disabling it, dropping it, and re-creating it. The truncate command does all of this for you. On large tables, truncating can take time, because all regions have to be shut down and disabled before they can be deleted:
hbase(main):023:0> truncate 't' Truncating 't' table (it may take a while): - Disabling table... - Dropping table... - Creating table... 0 row(s) in 14.3190 seconds
File systems come with a file-system check utility like fsck that checks for the consistency of a file system. These are typically run periodically to keep track of the state of the file system or especially to check integrity when the system is behaving abnormally. HBase comes with a similar tool called hbck (or HBaseFsck) that checks for the consistency and integrity of the HBase cluster. Hbck recently underwent an overhaul, and the resulting tool was nicknamed uberhbck. This uber version of hbck is available in releases 0.90.7+, 0.92.2+ and 0.94.0+. We’ll describe the functionality that this tool has to offer and where you’ll find it useful.[10]
10 We hope you don’t run into issues that make you need to run this. But as we said earlier, Murphy strikes sometimes, and you have to troubleshoot.
Depending on the release of HBase you’re using, the functionality that hbck provides may differ. We recommend that you read the documentation for your release and understand what the tool provides in your environment. If you’re a savvy user and want more functionality than what’s present in your release but is available in later releases, you could back-port the JIRAs!
Hbck is a tool that helps in checking for inconsistencies in HBase clusters. Inconsistencies can occur at two levels:
Hbck performs two primary functions: detect inconsistencies and fix inconsistencies.
Detecting inconsistencies in your cluster can be done proactively using hbck. You could wait for your application to start spewing exceptions about not finding regions or not knowing what region to write a particular rowkey to, but that costs a lot more than detecting such issues before the application is impacted by them.
You can run the hbck tool to detect inconsistencies as shown here:
$ $HBASE_HOME/bin/hbase hbck
When this command runs, it gives you a list of inconsistencies it found. If all is well, it says OK. Occasionally when you run hbck, it catches inconsistencies that are transient. For instance, during a region split, it looks like more than one region is serving the same rowkey range, which hbck detects as an inconsistency. But the RegionServers know that the daughter regions should get all the requests and the parent region is on its way out, so this isn’t really an inconsistency. Run hbck a few times over a few minutes to see if the inconsistency remains and isn’t just an apparent one caught during a transition in the system. To get more details about the inconsistencies reported, you can run hbck with the -details flag, as shown here:
$ $HBASE_HOME/bin/hbase hbck -details
You can also run hbck on a regular basis in an automated manner to monitor the health of your cluster over time and alert you if hbck is consistently reporting inconsistencies. Running it every 10–15 minutes or so should be sufficient unless you have a lot of load on your cluster that could cause excessive splitting, compactions, and regions moving around. In this case, running it more frequently might be worth considering.
If you find inconsistencies in your HBase cluster, you need to fix them as soon as possible to avoid running into further issues and unexpected behavior. Until recently, there was no automated tool that helped with this. This changed in the newer hbck versions: hbck can now fix inconsistencies in your cluster.
Next, we’ll explain the various types of inconsistencies and how you can use hbck to fix them:
The previously mentioned fixes are low risk and typically run together. To perform them together, run hbck with the -repairHoles flag. That performs all three fixes:
$ $HBASE_HOME/bin/hbase hbck -repairHoles
Inconsistencies can be more complicated than those we have covered so far and may require careful fixing:
Often, if you’re willing to run through all these fixes, you can use the -repair flag rather than specify each of the previous flags individually. You can also limit the repair to particular tables by passing the table name to the repair flag (-repair MyTable).
Fixing inconsistencies in HBase tables is an advanced operational task. We encourage you to read the online manual and also try running hbck on a development environment before running it on your production cluster. Also, it never hurts to read the script’s code.
HBase provides utilities to examine the HFiles and HLogs (WAL) that are being created at write time. The HLogs are located in the .logs directory in the HBase root directory on the file system. You can examine them by using the hlog command of the bin/hbase script, like this:
$ bin/hbase hlog /hbase/.logs/regionserverhostname,60020,1340983114841/ regionserverhostname%2C60020%2C1340983114841.1340996727020 12/07/03 15:31:59 WARN conf.Configuration: fs.default.name is deprecated. Instead, use fs.defaultFS 12/07/03 15:32:00 INFO util.NativeCodeLoader: Loaded the native-hadoop librarySequence 650517 from region a89b462b3b0943daa3017866315b729e in table users Action: row: user8257982797137456856 column: s:field0 at time: Fri Jun 29 12:05:27 PDT 2012 Action: row: user8258088969826208944 column: s:field0 at time: Fri Jun 29 12:05:27 PDT 2012 Action: row: user8258268146936739228 column: s:field0 at time: Fri Jun 29 12:05:27 PDT 2012 Action: row: user825878197280400817 column: s:field0 at time: Fri Jun 29 12:05:27 PDT 2012 ... ... ...
The output is a list of edits that have been recorded in that particular HLog file.
The script has a similar utility for examining the HFiles. To print the help for the command, run the command without any arguments:
$ bin/hbase hfile usage: HFile [-a] [-b] [-e] [-f <arg>] [-k] [-m] [-p] [-r <arg>] [-s] [-v] -a,--checkfamily Enable family check -b,--printblocks Print block index meta data -e,--printkey Print keys -f,--file <arg> File to scan. Pass full-path; e.g., hdfs://a:9000/hbase/.META./12/34 -k,--checkrow Enable row order check; looks for out-of-order keys -m,--printmeta Print meta data of file -p,--printkv Print key/value pairs -r,--region <arg> Region to scan. Pass region name; e.g., '.META.,,1' -s,--stats Print statistics -v,--verbose Verbose output; emits file and meta data delimiters
Here is an example of examining the stats of a particular HFile:
$ bin/hbase hfile -s -f /hbase/users/0a2485f4febcf7a13913b8b040bcacc7/s/ 633132126d7e40b68ae1c12dead82898 Stats: Key length: count: 1504206 min: 35 max: 42 mean: 41.88885963757624 Val length: count: 1504206 min: 90 max: 90 mean: 90.0 Row size (bytes): count: 1312480 min: 133 max: 280 mean: 160.32370931366574 Row size (columns): count: 1312480 min: 1 max: 2 mean: 1.1460791783493844 Key of biggest row: user8257556289221384421
You can see that there is a lot of information about the HFile. Other options can be used to get different bits of information. The ability to examine HLogs and HFiles can be handy if you’re trying to understand the behavior of the system when you run into issues.
Table splitting during heavy write loads can result in increased latencies. Splitting is typically followed by regions moving around to balance the cluster, which adds to the overhead. Presplitting tables is also desirable for bulk loads, which we cover later in the chapter. If the key distribution is well known, you can split the table into the desired number of regions at the time of table creation.
It’s advisable to start with fewer regions. A good place to start is the low tens of regions per RegionServer. That can inform your region size, which you can configure at a system level using the hbase.hregion.max.filesize configuration property. If you set that number to the size you want your regions to be, HBase will split them when they get to that size. But setting that number much higher than the desired region size gives you the ability to manually manage the region size and split it before HBase splits it. That means more work for the system administrator but finer-grained control over the region sizes. Managing table splitting manually is an advanced operations concept and should be done only once you’ve tried it on a development cluster and are comfortable with it. If you oversplit, you’ll end up with lots of small regions. If you don’t split in time, HBase splitting will kick in when your regions reach the configured region size, and that will lead to major compactions taking much longer because the regions would likely be big.
The HBase shell can be used to presplit regions at the time of table creation. The way to do that is to have a list of split keys in a file, with one key per line. Here’s an example:
$ cat ~/splitkeylist A B C D
To create a table with the listed keys as the split boundary, run the following command in the shell:
hbase(main):019:0> create 'mytable' , 'family', {SPLITS_FILE => '~/splitkeylist'}
This creates a table with presplit regions. You can confirm that from your Master web UI (figure 10.10).
Another way to create a table with presplit regions is to use the HBaseAdmin.create-Table(...) API like this:
String tableName = "mytable"; String startKey = "A"; String endKey = "D"; int numOfSplits = 5; HBaseAdmin admin = new HBaseAdmin(conf); HTableDescriptor desc = new HTableDescriptor(tableName); HColumnDescriptor col = new HColumnDescriptor("family"); desc.addFamily(col); admin.createTable(desc, startKey, endKey, numOfSplits);
We have an implementation available for you in the provided code under the utils package. It’s called TablePreSplitter.
Another implementation of creating presplit tables and even splitting them later comes packaged with HBase in the org.apache.hadoop.hbase.util.RegionSplitter class.
We’ve covered a lot of operations and management tasks in this section and equipped you with enough information to run your HBase cluster. Successful operations of a system also include the ability to handle failure scenarios of different kinds and to keep running with minimal degradation when disaster strikes. The next section explores the concept of backups in the context of HBase and where these are important.
Backups tend to be one of the favorite topics of system administrators and the people responsible for operations of a system. In the world of Hadoop and HBase, the conversation changes a little. In traditional systems, backups were done to achieve redundancy in order to safeguard against system failures (hardware and/or software). Failures were considered to be something outside the system, affecting the normal operations of the system. For instance, if a relational database system goes down because the memory on the host fails, the system is unavailable until you replace the memory. If the hard disk crashes, chances are you’ll lose part or all of your data (depending on how the hard disks are configured and how many disks are being used).
Hadoop and HBase are built with failure as a first-class concern, and their design is such that they’re resilient to individual nodes failing. If a DataNode or a RegionServer host falls off the cluster, it’s no problem. Other hosts will take over the workload (stored data or regions being served), and the system will continue to function normally. The entire Hadoop stack as it exists today has high availability, which means that there is no single point of failure within the system that can bring it down or make it unavailable. Individual nodes failing isn’t a problem, but the entire data center hosting your cluster going down will cause the system to go down because Hadoop and HBase don’t span multiple data centers as of today.[11] But if your requirement is to safeguard against that kind of failure, you need some sort of a backup strategy in place.
11 The high network latency between data centers make it impractical. The alternative is to run a replica cluster in another data center. Replicas are covered in the following subsection.
Another reason to have a separate copy of the data available is to do offline processing. As we recommended in chapter 9, collocating real-time and batch-processing workloads on the same HBase cluster impacts the latencies and the performance of the cluster for both the access patterns (as compared to running them independently). By having a second copy of data in another cluster, you can segregate the online access pattern from the batch-processing access pattern and have both of them perform optimally.
There are various ways of achieving backups or second copies of the data, and each has different properties.
Replication as a feature has been in an experimental state until recently, and only savvy users have used it in production. Active development and more user demand are getting the feature to a more stable state. You don’t necessarily have to understand the ins and outs of how replication works, but we recommend that you have a good understanding of it if you plan to use it in production.
One way to copy data from one cluster to another is by replicating the writes as they come into the first cluster. This is a common operating mechanism for relational database systems. Inter-cluster replication in HBase is achieved by log shipping and is done asynchronously. That means the replication is done by sending the edits (Puts and Deletes) that go into the HLog at the time of writes to the secondary cluster to which they have to be replicated. The write to the first cluster doesn’t block on the edits being replicated. The replication happens asynchronously after the writes are done and therefore can be done across data centers because it doesn’t affect the latencies of the writes when they take place.
The instructions and descriptions of replication in this section are true for the Apache HBase 0.92.1 or the CDH4u0 releases. Given that this is a relatively new feature that hasn’t seen a lot of production usage until now, there will still be active development and the addition of new features in the near term. We encourage you to look at the release notes of the release you’re using and not take our description as set in stone.
You configure replication at a column-family level by setting the replication scope to 1 at table-instantiation time or by altering the table:
hbase(main):002:0> create 'mytable', {NAME => 'colfam1', REPLICATION_SCOPE => '1'}
This configures the column family colfam1 to replicate to the secondary cluster when data is written to it. The same table name and column family must exist on the secondary cluster. HBase won’t create it if it doesn’t exist, and replication will fail.
Inter-cluster replication can be of three types:
Depending on your application, you can choose which replication model will work best. If it’s only for a backup purpose or for having a second copy over which to do batch processing, the master-slave model will work fine. Master-master and cyclic replication are useful in special cases where you either want a third cluster with the same data or have data coming in from different sources into different tables and the end goal is to have an identical state across both clusters.
To configure inter-cluster replication, follow these steps:
After setting up replication, you should verify that it’s working as desired before putting any load on the cluster.
The easiest way to test that replication is working is to put a few rows into the table on the master cluster and check whether they exist on the Slave cluster. If the dataset is much larger, this may not be feasible, as can be the case if you enabled replication on a production cluster. HBase ships with a MapReduce job called VerifyReplication that you can run to compare the contents of the two tables:
$ $HBASE_HOME/bin/hbase org.apache.hadoop.hbase.mapreduce.replication.VerifyReplication Usage: verifyrep [--starttime=X] [--stoptime=Y] [--families=A] <peerid> <tablename> Options: starttime beginning of the time range without endtime means from starttime to forever stoptime end of the time range families comma-separated list of families to copy Args: peerid Id of the peer used for verification, must match the one given for replication tablename Name of the table to verify Examples: To verify the data replicated from TestTable for a 1 hour window with peer #5 $ bin/hbase org.apache.hadoop.hbase.mapreduce.replication.VerifyReplication -- starttime=1265875194289 --stoptime=1265878794289 5 TestTable
But if you aren’t running the MapReduce framework, that’s not a choice. You’ll need to manage with a manual scan of the tables on the two clusters to ensure that things are working fine.
There’s not much you need to do to manage replication after it’s enabled on a cluster. To stop replication in a running cluster where it’s configured, you can run the stop_replication command in the HBase shell. To start it back up, run start_replication.
A few gotchas in the current implementation make some management tasks tricky. Replication is handled at the column-family level and is configured in the active HLog file. Thus if you stop replication and then start it again, and the HLogs haven’t rolled, everything that was written between the time you stopped and restarted replication will also be replicated. This is a function of the current implementation of replication, and it may change in future releases.
To remove a peer cluster, you can use the remove_peer command with the peer ID:
hbase> remove_peer '1'
To see a list of the currently configured peers, you can use the list_peers command:
hbase> list_peers
Inter-cluster replication is an advanced feature and can make it easy to keep multiple copies of data. It’s great for maintaining two hot copies of the data: your application can switch to the secondary cluster in case something goes wrong with the primary one. The hot-failover mechanism is something you need to build into your application. This can be done purely in the application logic, or you can use DNS tricks to get the application to talk to the secondary cluster if the primary one goes down. When the primary cluster is back up and running, you can use the same DNS trick and flip back to the primary cluster.
For replication to work properly, the time on the primary and secondary clusters needs to be in sync. As we described earlier, this can be achieved using NTP. Keeping time synchronized across all nodes running HBase is important in ensuring that the system operates reliably.
The issue now is to get the newly written data from the secondary cluster back to the primary one. This can be accomplished using the CopyTable or Export/Import job, which is what we talk about next.
MapReduce jobs can be configured to use HBase tables as the source and sink, as we covered in chapter 3. This ability can come in handy to do point-in-time backups of tables by scanning through them and outputting the data into flat files or other HBase tables.
This is different from inter-cluster replication, which the last section described. Inter-cluster replication is a push mechanism: new edits are pushed to the replica cluster as they come in, albeit asynchronously. Running MapReduce jobs over tables is a pull mechanism: jobs are read from the HBase tables (that is, data is pulled out) and written to a sink of your choice.
There are a couple of ways you can use MapReduce over HBase for backups. HBase ships with prebundled jobs for this purpose, as we explain in detail in appendix B. We’ll explain how you can use them for backups here.
The prebundled Export MapReduce job can be used to export data from HBase tables into flat files. That data can then later be imported into another HBase table on the same or a different cluster using the Import job.
The Export job takes the source table name and the output directory name as inputs. You can also give it the number of versions, start timestamp, end timestamp, and filters to have finer-grained control over what data it reads from the source table. Using the start and end timestamps can come in handy in doing incremental reads from the tables.
The data is written out efficiently in Hadoop SequenceFiles in the specified output directory, which can later be imported into another HBase table using the Import job. The SequenceFiles are keyed from rowkey to Result instances:
$ hbase org.apache.hadoop.hbase.mapreduce.Export Usage: Export [-D<property=value>]* <tablename> <outputdir> [<versions> [<starttime> [<endtime>]] [^[regex pattern] or [Prefix] to filter]] Note: -D properties will be applied to the conf used. For example: -Dmapred.output.compress=true -Dmapred.output.compression.codec=org.apache.hadoop.io.compress.GzipCodec -Dmapred.output.compression.type=BLOCK Additionally, the following SCAN properties can be specified to control/limit what is exported.. -Dhbase.mapreduce.scan.column.family=<familyName>
Here’s an example command to export table mytable to the directory export_out:
$ hbase org.apache.hadoop.hbase.mapreduce.Export mytable export_out 12/07/10 04:21:29 INFO mapred.JobClient: Default number of map tasks: null 12/07/10 04:21:29 INFO mapred.JobClient: Setting default number of map tasks based on cluster size to : 12 ... ...
Let’s examine the contents of the export_out directory. It should contain a bunch of output files from the map tasks:
$ hadoop fs -ls export_out Found 132 items -rw-r--r-- 2 hadoop supergroup 0 2012-07-10 04:39 /user/hadoop/ export_out/_SUCCESS -rw-r--r-- 2 hadoop supergroup 441328058 2012-07-10 04:21 /user/hadoop/ export_out/part-m-00000 -rw-r--r-- 2 hadoop supergroup 470805179 2012-07-10 04:22 /user/hadoop/ export_out/part-m-00001 ... ... -rw-r--r-- 2 hadoop supergroup 536946759 2012-07-10 04:27 /user/hadoop/ export_out/part-m-00130
The Import job is the inverse of the Export job. It reads over the records in the source files, creating Put instances from the persisted Result instances. It then writes those Puts to the target table through the HTable API. Import doesn’t provide any fancy filtering or manipulation of the data along the way. If you want to perform additional manipulation, you’ll need to subclass its Importer implementation and override the map function. A simple tool has a simple invocation:
$ hbase org.apache.hadoop.hbase.mapreduce.Import Usage: Import <tablename> <inputdir>
The command to import the table exported in the earlier example into another table named myimporttable is as follows:
$ hbase org.apache.hadoop.hbase.mapreduce.Import myimporttable export_out
Upon job completion, your target table contains the exported data.
Although Import is a simple complement to Export, ImportTsv is more feature-rich. It allows you to load data from newline-terminated, delimited text files. Most commonly, this is a tab-separated format, but the delimiter is configurable (for loading comma-separated files). You specify a destination table and provide it with a mapping from columns in your data file(s) to columns in HBase:
$ hbase org.apache.hadoop.hbase.mapreduce.ImportTsv Usage: importtsv -Dimporttsv.columns=a,b,c <tablename> <inputdir> Imports the given input directory of TSV data into the specified table. The column names of the TSV data must be specified using the -Dimporttsv.columns option. This option takes the form of comma-separated column names, where each column name is either a simple column family, or a columnfamily:qualifier. The special column name HBASE_ROW_KEY is used to designate that this column should be used as the row key for each imported record. You must specify exactly one column to be the row key, and you must specify a column name for every column that exists in the input data. By default importtsv will load data directly into HBase. To instead generate HFiles of data to prepare for a bulk data load, pass the option: -Dimporttsv.bulk.output=/path/for/output Note: if you do not use this option, then the target table must already exist in HBase Other options that may be specified with -D include: -Dimporttsv.skip.bad.lines=false - fail if encountering an invalid line '-Dimporttsv.separator=|' - eg separate on pipes instead of tabs -Dimporttsv.timestamp=currentTimeAsLong - use the specified timestamp for the import -Dimporttsv.mapper.class=my.Mapper - A user-defined Mapper to use instead of org.apache.hadoop.hbase.mapreduce.TsvImporterMapper
It’s intended to be a flexible utility, allowing you even to override the Mapper class, which is used when parsing input files. You can also have ImportTsv create HFiles instead of executing Puts against the target deployment. This is called bulk import. It bypasses the HTable API, making it faster than the regular import. It does have a runtime requirement of access to the target table. ImportTsv inspects that table’s region boundaries and uses those split delimiters to decide how many HFiles to create.
Once the HFiles are created, they have to be loaded into the table. The LoadIncrementalHFiles utility, also called completebulkload, handles the messy business of installing and activating new HFiles in a table in HBase. The operation is messy because careful consideration must be given to ensure that the new HFiles match the destination table’s configuration. LoadIncrementalHFiles handles this for you by splitting any of the source HFiles so they each fit within the key-range of a single region. The HFiles are moved into place, not copied, so don’t be surprised when your source data disappears after you run this command. With your HFiles staged on the HDFS, run the tool like this:
$ hbase org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles usage: completebulkload /path/to/hfileoutputformat-output tablename
Let’s create a presplit table and bulk-load a tab-separated file into it:
You can use the CopyTable MapReduce job to scan through an HBase table and directly write to another table. It doesn’t create flat files of any sort. The Puts are done directly into the sink table. The sink of the CopyTable job can be another table on the same cluster or a table on an entirely different cluster. The job can also be given start and end timestamps like the Export job, which allows finer-grained control over the data being read. It also supports scenarios where source and destination HBase deployments differ—that is, with different RegionServer implementations.
Executing CopyTable involves running a MapReduce job on the source deployment and populating the destination deployment. Invocation looks like this:
$ hbase org.apache.hadoop.hbase.mapreduce.CopyTable Usage: CopyTable [--rs.class=CLASS] [--rs.impl=IMPL] [--starttime=X] [--endtime=Y] [--new.name=NEW] [--peer.adr=ADR] <tablename> Options: rs.class hbase.regionserver.class of the peer cluster specify if different from current cluster rs.impl hbase.regionserver.impl of the peer cluster starttime beginning of the time range without endtime means from starttime to forever endtime end of the time range new.name new table's name peer.adr Address of the peer cluster given in the format zookeeer.quorum:zookeeper.client.port:zookeeper.znode.parent families comma-separated list of families to copy To copy from cf1 to cf2, give sourceCfName:destCfName. To keep the same name, just give "cfName" Args: tablename Name of the table to copy
Here is an example command for copying table mytable from a cluster to a remote cluster where a table with the same name exists:
$ hbase org.apache.hadoop.hbase.mapreduce.CopyTable --peer.adr=destination-zk:2181:/hbase --families=a mytable
HBase stores its data in the directory specified by the hbase.rootdir configuration property. This directory contains all the region information, all the HFiles for the tables, as well as the WALs for all RegionServers. In essence, this is where everything is persisted. But copying over this directory (using distcp) doesn’t make for a great backup solution, especially in a running system.
When an HBase cluster is up and running, several things are going on: MemStore flushes, region splits, compactions, and so on. All of these cause changes in the underlying stored data, which makes copying the HBase root directory a futile effort. Another factor that plays in is the fact that in a running system, there is data in the MemStore that hasn’t been flushed. Even if nothing else is going on, a copy of the HBase root directory doesn’t necessarily completely represent the current state of the system.
But if you stop the HBase daemons cleanly, the MemStore is flushed and the root directory isn’t altered by any process. At this moment, copying over the entire root directory could be a good point-in-time backup solution. But incremental backups still present challenges, which make this solution less viable. Restoring from the backed-up root directory is as simple as starting HBase when it’s pointing to this new root directory.
Production-quality operations of any software system are learned over time. This chapter covered several aspects of operating HBase in production with the intention of getting you started on the path to understanding the concepts. New tools and scripts probably will be developed by HBase users and will benefit you. These basic concepts of HBase operations will enable you to understand when, where, and how to use them to your advantage.
The first aspect of operations is instrumenting and monitoring the system, and that’s where we began this chapter. We covered the various monitoring systems and mechanisms and then went into the different metrics that are of interest. There are general metrics that you should monitor regardless of the workload being put on the system, and then there are metrics that are specific to the workload (read or write).
From monitoring, the chapter transitioned into talking about performance testing, measuring performance, and tuning HBase for different kinds of workloads. Performance testing is key to understanding how the cluster is tuned and what you can do to extract better performance from the cluster. Tuning HBase involves working with multiple different configuration parameters, and configurations depend on the kind of workload for which you’re planning to use your cluster.
From there we covered a list of common management tasks and how and when to do them. Some of them are common tasks that you perform more often than others, which are more specific to certain situations. The chapter concluded with backup and replication strategies, talking about the common approaches to disaster recovery and what your options are currently.
Mastering HBase operations requires an understanding of the internals and experience gained by working with the system. As much as we’d wish for HBase to be a self-tuning and self-managing system, it isn’t there yet. We hope it gets there soon, and your experience could certainly feed into that goal.