Use configurable parameters to monitor and tune the performance of a cloud Hadoop cluster
Hadoop is a popular software framework that enables distributed
manipulation of large amounts of data, thus making it a perfect
companion to cloud computing. In fact, Hadoop MapReduce, the programming
model and software framework used to write applications to rapidly
process vast amounts of data in parallel on large clusters of compute
nodes, is already in play on cloud systems. This article shows you how
to take full advantage of Hadoop by introducing Hadoop configurable
parameters and using them to monitor, analyze, and tune the performance
of your Hadoop cluster.
Introduction
Hadoop is an open-source, flexible Java framework for large-scale data processing on commodity hardware networks. It was inspired by MapReduce and Global File System (GFS) technology originally developed by Google Labs and became increasingly popular for its efficiency, reliability, and scalability. Now as a top-level Apache project, Hadoop is supported and used by many companies such as IBM, Google, Yahoo!, and Facebook, and has become the industry de facto framework for large data processing.What does Hadoop mean for cloud computing? One of the goals of cloud computing is to provide high availability to compute resources at the lowest possible overhead. Hadoop is a perfect tool to achieve this goal with its ability to work with thousands of nodes and petabytes of data and automatically handle job scheduling, partial failure, and load balancing.
To make full use of compute resource, it's important to optimize performance, including CPU, memory, and I/O (both disk and network). Hadoop can work to automatically improve performance, while leaving the interface for users to tune performance according to their specific applications. This article introduces you to the important configurable parameters of Hadoop and the method for analyzing and tuning performance.
Set up the environment
Steps to deploy the Hadoop environment
It is first necessary to build a Hadoop cluster environment before you can do performance tuning. Simply follow these steps:- Prepare your cluster nodes with Linux OS, JDK 1.6, and ssh installed. Make sure sshd is running on each node.
- Access The Apache Software Foundation site and download a stable Hadoop distribution.
- Choose your NameNode (NN), JobTracker (JT), and secondary NameNode (SNN); other nodes are DataNode (DN) and TaskTracker (TT). This article assumes you choose host001 as NN, host002 as JT, and host003 as SNN.
- Enable NN, JT, and SNN to passphraselessly ssh to all DN and TTs.
- Unpack the downloaded Hadoop distribution on each node;
$HADOOP_HOME
is used below to represent the unpack position. - Enter the
$HADOOP_HOME
directory and modify configuration files on NN.- Add host003 into
$HADOOP_HOME/conf/masters
. - Add all DD/TT nodes' IP addresses/host names into
$HADOOP_HOME/conf/slaves
, one host per line. Note: If you use host name, you need to configure the/etc/hosts
file to make sure that each host name is known to all nodes in the cluster. - Add the following property to
$HADOOP_HOME/conf/core-site.xml
to set the NN IP/ port:<property> <name>fs.default.name</name> <value>hdfs://host001:9000</value> </property>
- Add following property to
$HADOOP_HOME/conf/mapred-site.xml
to set the JT IP/port.<property> <name> mapred.job.tracker </name> <value>host002:9001</value> </property>
mapreduce.jobtracker.address
. - Add the following property to
$HADOOP_HOME/conf/hdfs-site.xml
, if you have more than one network interface on your NN:<property> <name>dfs.datanode.dns.nameserver</name> <value>eth1</value> <description>The name of the Network Interface from which a data node should report its IP address. </description> </property>
- Add host003 into
- Copy all configuration files mentioned above from NN to all other nodes in the cluster into the
$HADOOP_HOME/conf/
directory. - Enter the
$HADOOP_HOME/bin
directory on NN.- Format the NN using command:
$./hadoop namenode -format
. - Launch the
start-all.sh
script to start the Hadoop daemons.
- Format the NN using command:
- For more detailed information, refer to Hadoop Common. Note: If you choose to use Hadoop release 0.21.0, then you must use the current JDK, which is tracked by JIRA HADOOP-6941.
Install and configure the nmon performance monitoring tool
The nmon tool is a system administrator, tuner, and benchmark tool that can monitor a huge amount of important performance information in one go. You can use nmon as the monitoring tool throughout the entire performance tuning process. Follow the steps below to install and configure nmon and set up your performance monitoring system:- Download the nmon binary package from the nmon for Linux
site. Find the right version for your Linux OS and spread it to all nodes of the Hadoop cluster.
$NMON_HOME
is used below to represent where you put the binary. - Since NN, JT, and SNN have been enabled to passphraselessly ssh to all other nodes, and all map/reduce jobs will be submitted on JT, choose JT as the central node to collect all nmon data. Log on to the JT node and then do following steps.
- Create a directory on JT (host002), for example,
/home/hadoop/perf_share
, and share it through NFS, using the following commands:- Create the directory:
$mkdir /home/hadoop/perf_share
- Modify the /etc/exports file to include the following line:
/home/hadoop/perf_share *(rw,sync)
- Restart the NFS service:
$/etc/rc.d/init.d/nfs restart
- Create the directory on all other nodes and mount them to the
perf_share
directory on JT:$mkdir/home/hadoop/perf_share $mount host002: /home/hadoop/perf_share /home/hadoop/perf_share
- Create the directory:
- Create the following script to start nmon on all nodes:
hosts=( shihc008 shihc009 shihc010 shihc011 shihc012 shihc013 shihc014 shihc015 shihc016 shihc017) # Remove all data in /home/hadoop/perf_share for host in ${hosts[@]} do ssh $host "cd /home/hadoop/perf_share;rm -rf *" done # Start nmon on all nodes for host in ${hosts[@]} do ssh $host " /usr/bin/nmon -f -m /home/hadoop/perf_share -s 30 -c 360" done
In the last nmon command,-f
means you want the data saved to a file and not displayed on the screen;-m
indicates where to save the data;-s 30
means you want to capture data every 30 seconds; and-c 360
means you want 30 data points or snapshots (the total data collection time would be 30x360 seconds or 3 hours). - Download nmonanalyser (an Excel spreadsheet that takes an output file from nmon and produces some nice graphs to aid in analysis) from the nmonanalyser wiki to analyze the to-be-collected monitoring data.
Detailing Hadoop configurable parameters
Hadoop provides various configuration options to users and administrators for cluster setting and tuning. There are a large number of variables incore/hdfs/mapred-default.xml
that you can override in core/hdfs/mapred-site.xml
. Some specify file paths on your system, but others adjust levers and knobs deep inside Hadoop's guts.For performance tuning, there are mainly four aspects: CPU, memory, disk I/O, and network. This article describes the most relative parameters to these four aspects and leaves the others in
*-defalt.xml
for you to explore using a method
introduced later.CPU-related parameters:
mapred.tasktracker.map
and reduce.tasks.maximum
Decide the maximum number of map/reduce tasks that will be run simultaneously by a task tracker. These two parameters are the most relative ones to CPU utilization. The default value of both parameters is 2. Properly increasing their values according to your cluster condition increases the CPU utilization and therefore improves the performance. For example, assume each node of the cluster has 4 CPUs supporting simultaneous multi-threading, and each CPU has 2 cores; then the total number of daemons should be no more than 4x2x2=16. Considering DN and TT would take 2 slots, there are at most 14 slots for map/reduce tasks, so the best value is 7 for both parameters.
Set this parameter in
mapred-site.xml
.Memory-related parameter:
mapred.child.java.opts
This is the main parameter for JVM tuning. The default value is
-Xmx200m
,
which gives each child task thread 200 MB of memory at most. You can
increase this value if the job is large, but should make sure it won't
cause swap, which significantly reduces performance.Let's examine how this parameter can affect the total memory usage. Assume the maximum number of map/reduce tasks is set to 7, and
mapred.child.java.opts
is left to the default value. Then memory cost of running tasks will be
2x7x200 MB =2800 MB. If each worker node has both DN and TT daemons,
and each daemon costs 1 GB memory by default, the total memory allocated
would be around 4.8 GB.Set this parameter in
mapred-site.xml
.Disk I/O-related parameters:
mapred.compress.map.output
, mapred.output.compress
, and mapred.map.output.compression.codec
These are parameters that control whether to compress the output, in which
mapred.compress.map.output
is for map output compression, mapred.output.compress
is for job output compression, and mapred.map.output.compression.codec
is for compression code. All of these options are turned off by default.Turning on output compression can speed up disk (local/Hadoop Distributed File System (HDFS)) writes and reduce total time of data transfer (in both shuffle and HDFS writing phase), while on the other hand cost additional overhead during the compression/decompression process.
According to personal experience, turning on compression is not effective for sequence filing with random keys/values. One suggestion is to turn on compression only when the data you're dealing with is large and organized (especially natural language data).
Set these parameters in
mapred-site.xml
.io.sort.mb parameter:
This parameter sets the buffer size for map-side sorting, in units of MB, 100 by default. The greater the value, the fewer spills to the disk, thus reducing I/O times on the map side. Notice that increasing this value increases memory required by each map task.
According to experience, when the map output is large, and the map-side I/O is frequent, you should try increasing this value.
Set this parameter in
mapred-site.xml
.io.sort.factor parameter
This parameter sets the number of input streams (files) to be merged at once in both map and reduce tasks. The greater this value, the fewer spills to the disk, thus reducing I/O times on both the map and reduce sides. Notice that increasing this value might cost more garbage collection activities if memory allocated for each task is not large enough.
According to experience, when there is a large number of spills to the disk, and I/O times of the sort and shuffle phase is high, you should try increasing this value.
Set this parameter in
mapred-site.xml
.mapred.job.reduce.input.buffer.percent parameter
This parameter sets the percentage of memory (relative to the maximum heap size) to retain map outputs during the reduce phase. When the shuffle is concluded, any remaining map outputs in memory must consume less than this threshold before the reduce phase can begin, 0 by default. The greater this value is, the less merge on the disk, thus reducing I/O times on the local disk during the reduce phase. Notice that increasing this value might cost more garbage collection activities if memory allocated for each task is not large enough.
According to experience, when map output is large, and local disk I/O is frequent during the reduce through sort phases, you should try increasing this value.
Set this parameter in
mapred-site.xml
.mapred.local.dir and dfs.data.dir parameters
These two parameters decide where to put data in Hadoop, in which
mapred.local.dir
decides where MapReduce intermediate data (map output data) is stored, and dfs.data.dir
decides where HDFS data is stored. According to experience, spreading these locations to all disks on each node can achieve disk I/O balance and therefore greatly improve the disk I/O performance.
Set
mapred.local.dir
in mapred-site.xml
and dfs.data.dir
in hdfs-site.xml
.Network-related parameters:
topology.script.file.name
This is the parameter that points to the user-defined script to determine rack-host mapping to configure rack awareness. Set this parameter in the
core-site.xml
file.Rack awareness is the most important configuration to improve network performance, and it is strongly recommended that you configure it following the instructions at http://hadoop.apache.org/common/docs/current/cluster_setup.html#Hadoop+Rack+Awareness and http://wiki.apache.org/hadoop/topology_rack_awareness_scripts.
mapred.reduce.parallel.copies parameter
This parameter determines the number of threads used to copy map outputs to the reducer, 5 by default. Increasing this value can increase the network flow rate and speed up the process of copying map outputs, while also costing more CPU usage.
According to experience, the effect of increasing this value is kind of slight, and the suggestion is to increase this value only if the map output is very large in your job.
Notes: The parameter names listed above are all in the Hadoop 0.20.x release; if you use the 0.21.0 release, there might be some name changes. Besides Hadoop parameters, there are also some system parameters, such as inter-rack bandwidth, which affects overall performance.
How to tune and improve performance
After the long but necessary preparations above, you have finally arrived here to see how to tune and improve performance. The whole process can be separated into steps.Step 1: Choose testing benchmark
The performance of the whole Hadoop cluster is decided by two aspects: HDFS I/O performance and MapReduce runtime performance. Hadoop itself supplies several benchmarks such asTestDFSIO
and dfsthroughput
for HDFS I/O testing, which are contained in hadoop-*-test.jar
, Sort
for overall hardware testing, which is contained in hadoop-*-examples.jar
, and Gridmix
, which mimics mixed workload in a grid environment, and is under the $HADOOP_HOME/src/benchmarks
directory. You can choose any of these benchmarks according to your testing demand.Among all these benchmarks,
Sort
can reflect both MapReduce runtime performance (during
the procedure of "performing sort") and HDFS I/O performance (during the procedure of
"writing sort results into HDFS") when the input data is large. What's more, Sort
is the
Apache-recommended hardware benchmark. (Find information from the Hadoop Wiki.) So, Sort
is used as the example testing benchmark to show you the performance tuning method.Step 2: Build up the baseline
- Testing environment:
- Benchmark: Sort
- Input data scale: 500 GB
- Hadoop cluster scale: 10 DN/TT nodes
- All nodes are homogeneous
- Node information:
- Linux OS
- Two 4-core processors, support simultaneous multi-threading
- 32 GB memory
- Five 500 GB disks
- Testing scripts:
Following are the scripts used for testing (refer to the Hadoop Wiki for more information about running the Sort benchmark). All scripts should be run on the JT node.
Note: Put the
start_nmon.sh
script mentioned above and the following scripts into the same directory that you choose to store the testing result.
baseline_test.sh
#!/bin/sh # since there are 10 nodes, should write 50 GB file on each fSize=5368709120 $HADOOP_HOME/bin/hadoop jar $HADOOP_HOME/hadoop-0.20.1-examples.jar randomwriter -D test.randomwrite.bytes_per_map=$fSize /rand_$fSize 2>&1 | tee ./testRes/randomwriter_$fSize.out mkdir -p ./testRes/nmonFiles # run three cycles to get a more precise result for runtimes in {a,b,c} do ./ run_sort_baseline.sh $fSize $runtimes done
run_sort_baseline.sh
#!/bin/sh $HADOOP_HOME/bin/hadoop dfs -rmr /rand_$1-sorted ./start_nmon.sh $HADOOP_HOME/bin/hadoop jar $HADOOP_HOME/hadoop-0.20.1-examples.jar sort -r 70 /rand_$1 /rand_$1-sorted 2>&1 |tee ./testRes/sort_baseline_$2.out cp -r /home/hadoop/perf_share ./testRes/nmonFiles/mb$4_$2
- Parameter values for baseline test:
- Hadoop parameter value:
mapred.tasktracker.map.tasks.maximum
= 2 (Default value)mapred.tasktracker.reduce.tasks.maximum
= 2 (Default value)mapred.reduce.parallel.copies
= 5 (Default value)mapred.child.java.opts
= -Xmx200m (Default value)mapred.job.reduce.input.buffer.percent
= 0 (Default value)io.sort.mb
= 100 (Default value)io.sort.factor
= 10 (Default value)mapred.local.dir
=/hadoop/sdb
dfs.data.dir
=/hadoop/sdc
,/hadoop/sdd
,/hadoop/sde
- System parameter value:
Inter-rack bandwidth = 1 Gb
- Hadoop parameter value:
- Baseline test result:
- Execution time: 10051 seconds
- Resource usage summary:
AVG CPU AVG Mem (active) AVG Disk AVG Network (KB/s) Disk read (KB/s) Disk Write (KB/s) IO/sec read write NameNode 0.10% 552.43MB 0.0 18.1 1.7 8.0 31.8 JobTracker 0.30% 822.19MB 0.0 34.1 2.0 8.8 13.0 DataNode 42.5% 6522.32MB 49431.2 37704.0 605.3 6134.9 7126.4 - Detailed chart:
After you get all the nmon data, you can use nmonanalyser to generate
graphic charts. Since nmonanalyser is an Excel spreadsheet, just open it,
click analyse nmon data, and choose the nmon files. Then you can get the analyzed charts.
Figure 1. Analyze nmon data using nmonanalyser
The detailed charts generated by nmonanalyser for the baseline test are as follows:
Figure 2. NameNode charts
Figure 3. JobTracker charts
Figure 4. DataNode/TaskTracker charts
Step 3: Find the bottleneck
You need to carefully explore the system bottleneck from the monitoring data and charts. Since the main workload is assigned to the DN/TT nodes, you should first focus on resource usage of the DN/TT nodes (only the nmon charts for DN/TT are shown below to save space).From the baseline monitoring data and charts, you can see several bottlenecks in the system: The map phase CPU was not fully utilized (most of the time under 40 percent), and disk I/O was quite frequent.
Step 4: Break the bottleneck
First try to increase the CPU utilization in the map phase. From the introduction of Hadoop parameters, you realize that to increase the CPU utilization, you need to increase the value of themapred.tasktracker.map
and reduce.tasks.maximum
parameters.
In the testing environment, each node has two 4-core processors that support simultaneous multi-threading, so you have a total of 16 available slots and can set both parameters to 7.
To make the change, you need to set the
mapred.tasktracker.map
and reduce.tasks.maximum
properties in mapred-site.xml
, restart the cluster, and launch baseline_test.sh
again (since the configuration is made in the mapred-site.xml
file, there is no need to modify the script here). The modified mapred-site.xml
will look like:<configuration> <property> <name>mapred.tasktracker.map.tasks.maximum</name> <value>7</value> </property> <property> <name>mapred.tasktracker.map.tasks.maximum</name> <value>7</value> </property> </configuration>
- Execution time: 8599 seconds
- Resource usage summary:
AVG CPU AVG Mem (active) AVG Disk AVG Network (KB/s) Disk read (KB/s) Disk Write (KB/s) IO/sec read write NameNode 0.10% 520.88MB 0.0 21.2 2.0 6.4 12.7 JobTracker 0.50% 1287.4MB 0.0 22.5 1.6 6.4 5.1 DataNode 48.4% 12466.8MB 51729.07 44060.67 669.9 7462 6865
Figure 5. DataNode/TaskTracker charts after tuning
Step 5: New round of tuning, repeat steps 3 and 4
From the data and charts you get after tuning up the maximum number of map/reduce tasks in each TaskTracker, you can see that the CPU is fully utilized during the map phase. But in the meantime, the disk I/O frequency is still high, so a new round of the tune-monitor-analyze process is needed.You need to repeat these steps until there are no more bottlenecks in the system, and each resource is fully utilized.
Notice that each tuning will not necessarily turn out a performance improvement. If performance reduction occurs, you need to roll back to the previous configurations and try another tuning to break the bottleneck. In this testing, the following optimized results were finally achieved:
- Execution time: 5670 seconds
- Hadoop parameter values:
- System parameter values: Inter-rack bandwidth = 1Gb
- Resource usage summary:
Figure 6. DataNode/TaskTracker charts - 2nd round of tuning
Step 6: Scalability test and improvement
To further verify the tuning result, you need to increase the cluster scale and input the data scale while using the optimized configuration you got to test the scalability of the configuration. More precisely, increase the cluster scale to 30 nodes and the input data scale to 1.5TB and then do the above testing procedure all over again.As space is limited, the detailed tuning procedure won't be described here. The monitor and analyze method is exactly the same as the one mentioned above, and the main bottleneck you saw occurred in the network. The inter-rack bandwidth became insufficient when the input data increased to TB scale. When the inter-rack bandwidth was increased to 4 GB and all other 10-nodes-optimized parameters were left unchanged, the final execution time was 5916 seconds, which is quite close to the 10-nodes-optimized result (5670 seconds).
Conclusion
You now know how to monitor a Hadoop cluster, analyze the system bottleneck using the monitoring data, and tune the performance. Hopefully, this knowledge can help you make full use of your Hadoop cluster and result in finishing your jobs more efficiently. You can use the method described in this article to further explore configurable parameters of Hadoop and find the connection between parameters configuration and different job characteristics.What's more, this parameters-based tuning is kind of "static," because one set of parameters configuration is only optimized for one kind of job. To gain more flexibility, you might be interested in studying the scheduling algorithm of Hadoop and perhaps share your new methods for improving Hadoop performance.
Resources
Learn
- Learn about IBM Big Data and Hadoop Strategy.
- Visit the Apache Hadoop home page.
- Download a stable Hadoop distribution.
- IBM InfoSphere BigInsights Basic Edition -- IBM's Hadoop distribution -- is an integrated, tested and pre-configured, no-charge download for anyone who wants to experiment with and learn about Hadoop.
- Find free courses on Hadoop fundamentals, stream computing, text analytics, and more at Big Data University.
- Download the nmon binary package.
- Download nmonanalyser.
- Information about running the SORT benchmark.
- In the developerWorks cloud developer resources, discover and share knowledge and experience of application and services developers building their projects for cloud deployment.
- Find technical articles, tutorials, communities, wikis, and business resources that should be interesting to you if you work in a specific industry domain on IBM developerWorks Industry zone.
Get products and technologies
- Download IBM InfoSphere BigInsights Basic Edition at no charge and build a solution that turns large, complex volumes of data into insight by combining Apache Hadoop with unique technologies and capabilities from IBM.
Discuss
- Join a cloud computing group on My developerWorks.
- Read all the great cloud blogs on My developerWorks.
- Join the My developerWorks community, a professional network and unified set of community tools for connecting, sharing, and collaborating.
Comments
Will this also work with Pig queries ?
I have been trying to profile pig queries for CPU, RAM, Disk IO usage with no success. I was wondering if you can guide me on this.
I have tried Hprof and Starfish with Pig but neither works.
Any hints, pointer ?
Thanks for reading !!
This comment has been removed by the author.
ReplyDeleteThis comment has been removed by the author.
ReplyDelete