Friday, September 27, 2013

Hadoop - Map Reduce Framework

Map Reduce Framework
  • isSplittable method is passed in InputFormat on each fileName. Based on the booleanExpression true / false in the files mapper spawns the task. You can write custom FileInputFormat overriding isSpilttable to false to execute 1 single mapper for multiple storage blocks for the given file.
  • Key Value pair on the MapReduce should always be used Writable Interface with Serialized and deserialize format in hadoop to write the data in the context (Eg : Text implements WritableInterface, IntWritable, NullWritable..)
  • When MapReduce job is running the diskI/O and network I/O will increase. Because data written to local disk (hdfs) and data transferred between reducer to different nodes through network.   
  • MapReduce framework programming model isolated execution of tasks with one-time copying shuffle and sort data to reducer. Between reducer task there won't be any communication. You can RMI or messaging (Apache Kafka) externally mechanism to communicate between among tasks. 
Map  
  • You can pass 1 file or multiple file or directories to Mapper, based on the N number of blocks stored in hdfs(Default block size 64MB) for the input files, the number of map task will get initiate.  1 single map task is processed 1 inputSplit message. Some times 1 block message if the block size is big then it can be split multiple by inputSplit for better performance.
  • If there is zero reducer than mapper can write each mapTask in the configured output directory. Based on N number of mapTasks N number of output files will get generated in HDFS.
  • Each MapTask generate 1 single file and written the data to local disk (HDFS). Later reducer will pick the data from disk to process. 
  • (output of mappers - data path intermediate K/V place )  Intermediate key-value pairs are written to local disk of the machine running mapTask and later copied to machine running reduce tasks.
Reducer
  • If the reducer is set 1 then 1 reducer takes all the output from the mapper and write the output of the reducer in the single file in HDFS
  • Reducer collects all the values for the given key and pass the key with all the values to reduce method to process upon map tasks 100% completion reduce() method kick. If reducer shows 10% while map task 85% then reduce task copying / transferring map files to process.
  • Reducer writes the output file to HDFS same as mapper. Mapper does if reducer is set to 0 or no reducer class. Both number of files will be same as map task count or reduce task count.
Infrastructure
1) Lets say you have 10 datanode and taskTracker (node1..node10). If a single reducer runs in node4 and it produce output for single block first as per block placement policy the data block will be written in node4 and as per replication factor the data will replicated to other nodes. Always in which node the reducer task runs the output will be kept in the same node first and get replicates

2) Hadoop has "speculative execution feature" that if  one task tracker / node fails after task submits jobtracker will smartly assign the task more and communicate with available task trackers.

3) If the file (block) is stored in 3 datanodes. And even if 2 datanode fails and if the data available even in 1 datanode then  hadoop gets data to process.
  
 Hadoop Job
   If you not configured your linux machine with hadoop configuration pointing nameNode, dataNode..then hadoop job will run in localJobRunner mode using your local fileSystem

NameNode
The NameNode role is to transfer the block location according to the fileName given. So, to make the activity faster NameNode use RAM to store the fileName, block location and other file related metaData in RAM.

Combiner
For associative and commutative you should use combiner in your MapReduce jobs.In job driver use job.setCombinerClass(WordCountCombiner.class). It increase the efficiency of your mapReduce job. Each individual machine mapTask completion this combiner class will invoke and process aggregate and store the data for the reducer. Combiners perform local aggregation of word counts, thereby reducing the number of key-value pairs that need to be shuffled across the network to the reducers.
Ex : Lets say wordCount program split multiple map task. Each mapTask output we will have tuples [the, 1][the,1] for reducer. If you have combiner then combiner will combine [the,2] upon mapTask output immediate and keep for reducer. Now, mutliple mapTask executes will have '[the,2], '[the,3]' as combiner combiner already. So, reducer can easily do '[the,5]' because of only less data combiner already did most of the job. Combiner is not alternate for reducer. It can help for reducer. Reducer output can written to the file context. combiner output will be given to reducer and input take from mapper.

Partitioner 
Partitioner is mainly supported to use for small files, you can specify input to reducer to process specific partition. Partitioner will slows down your process so justify yourself on your usage.
Ex: You have customer sales report 1 huge files for 12 months data and customers wants 12 files of each month data separate. In this case mapper can provide unique key as datetime you can use partitioner 12 with 12 reducer job.setNumofReducers(12) and pass each partition month data to each reducer. So, each reducer will produce 1 file (12 files total) with each month record.

TaskTracker
After a taskTracker has been assigned a task it creates a TaskRunner instance to manage the task. The taskRunner prepares the environment and launches the new JVM for the task. This process for every task assigned to task tracker.

Based on reading the topic partitioning Map / Reduce job, the Number of maps is usually driven by the number of DFS blocks in the input files. Basically, the input file (Scan records of hbase inputTable is scanning DeviceMaster table and the table has only 1 block. So, the Map was created for only 1 task).

Every taskTracker will periodically sends heartbeat to jobTracker. If jobTracker generates a task it assign task to first responded heartbeat taskTracker. Based on number of pending task if the taskTracker slot is free it submits the pending task to free slots. If pending task is less then assign all task to the taskTracker.Second choice is ideally the taskTracker gets task which has  the data locally.
Especially, if job is submitted that will process a single input split, that job will contain a single map task then the heartbeat taskTracker will be assigned to process the task by jobTracker.

HDFS
HDFS is optimized for write-once. streaming access relatively to large files. You can append the data file if require. But, the better usage is to read files from the hdfs written once.

Block Data orphan
If the block data is orphan and some data is available in other block in other node, while the node process this record from 1 block it request other block in other node to provide the remaining information to process.  Lets say block 1 in DN1 has some information and block 2 in DN2 has more information of the same fileName (record) data then the request will goes to DN2 requesting the record remaining information and data will get transfer to DN1 to process the complete record.

SequenceFile
SequenceFile contains name of the classes used for the key and values as the header in the file. You can read sequenceFile in hadoop using command. "-text" will convert sequenceFile content to toString and display. -cat will display the raw data of the sequence file which is not human readable format.
$ hadoop fs -text sequenceFile

You can see in hbaseMaster Web UI (http://devnamenode:60010/master-status) for each table if you click you can see how many blocks the data is split-ted. You can split the blocks as required to increase the Map Task.

Note : You can also use JobConf Class (hadoop lib) to set the NumOfMapTask and NumOfReduceTask. But, in general leave it to hbase and it will do efficiently.

Ref : http://wiki.apache.org/hadoop/HowManyMapsAndReduces

Increase the map count when we use hbase.

In our System we use hbase db and have MapReduce job on top of it and scan results are the input parameters to map. I see map triggered only 1 Task in dev env. due to lesser data. In hbase Master page (http://devnamenode:60010/master-status) on the particular table i click and i split the regions into multiple by clicking split. Spitted into  3 regions. Then i  did major_compact on the same table. Once done i submit the job and the job spawned 4 Maps based on the regions.

If the scan output comes in multiple regions based on the same the mapper will trigger for each region output.

Powerful Hadoop CommandLine to manage files on HDFS.

Some useful Hadoop Commands
Hadoop replication factor
Command to increase the hadoop replication factor from 3 to 4.
Below  commands available if you want to replicate f1 file

$ hadoop fs -setrep 4 f1
$ hadoop fs -Ddfs.replication=4 -cp f1 f1.tmp; hadoop fs -rm f1; hadoop fs -mv f1.tmp f1
 
View the number of blocks for a given file
$ hadoop fsck [path] [options]

$ hadoop fsck /path/to/file -files -blocks
 
Create file in hdfs on the fly (Once you done type EOF Enter)
$ hadoop fs -put - file4.txt << EOF

CDH - Hadoop - Hbase - Adding a host to an existing cluster

1. Install CentOS version matches to the existing hadoop clusters. 
2. Configure static Ip, DNS, gateway and hostnames.
3. Enable ssh and disable firewall. 
4. In /etc/selinux/config file disable the SELINUX
5. In /etc/hosts file add the distributed cluster machines
6. In /etc/fstab hadoop partition add noatime with default on ext4 http://www.howtoforge.com/reducing-disk-io-by-mounting-partitions-with-noatime)
7. Install "yum install ntp" and start the service and add the service to chkconfig ntpd on. Sync the time with server time pool "ntpdate pool.ntp.org"
8. Login into cloudera Click 'Hosts'
9. Add Hosts
10. Type the hostname and search
11. select and click 'Install CDH on machine' 
12. Before you select which version to install check in master CDH version and do the same. (go to master and type hbase shell). In hbase start it will show hbase and CDH version.
13. Copy the .bashrc file settings from other server to the new server
14. Go to each and every service (TaskTracker, DataNodes and RegionServer and add the new server). Make sure Master is not checked while adding regionServer
15. Copy the library of (hadoop_lib_jars from svn into $HADOOP_HOME/lib) and restart MapReduce in CDH.
16. Restart client like azkaban once you modified the zookeeper quorum adding this host cluster

HBASE - RegionServer - Hbase Master failed to reach RegionServer

RegionServer was failed to respond hbase Master. Basically, in regionServer the zookeeper failed to respond due to GC happens and java stop-the-world.

Read the below blog which explains very very clever how to get rid of GC failure.

http://blog.cloudera.com/blog/2011/02/avoiding-full-gcs-in-hbase-with-memstore-local-allocation-buffers-part-1/

Below is some configuration changes we did on our side to avoid the issue as a system load test.

(Hbase configuration) Hbase zookeeper session timeout increased to 90 seconds from 40seconds and default 60seconds as per hbase guide max : 3minutes you can have. To collect GC on 1 GB on an avg. system takes 8 to 10 seconds. Since, we have HeapMemory configured 8 GB and GC can collect @ around 7 GB we may ended up failure on connection time out.

Pass java arguments : -XX:+UseParNewGC -XX:+UseConcMarkSweepGC -XX:CMSInitiatingOccupancyFraction=60 -XX:PrintFLSStatistics=1 -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCTimeStamps -Xloggc:/usr/lib/hbase/logs/logs/gc-$(hostname)-hbase.log

Enable MSLAB Allocation Scheme with default flush values - In hbase-0.92 its enabled by default.

Cluster should be in odd number. Because, zookeeper multi-server suggest to have in odd numbers.

HBASE - Corrupted Blocks

If you see in your dfshealth if any blocks corrupted warnings as below. Run the command and check your hbase health.

http://devnamenode:50070/dfshealth.jsp

WARNING : There are about 1 missing blocks. Please check the log or run fsck. 

Run the below command in hdfs to see the corrupted files status :

hadoop fsck /

Note : Check the corrupted files in hdfs / hbase and based you can see hdfs job or hbase blocks. See below samples : 

1.  /user/hdfs/.staging/job_201307121242_42849/job.split: MISSING 1 blocks of total size 81

Above Corrupted in hdfs when it run the job job_201307121242_42849 (2013 07 12 @ 12:42)

2. /hbase/.corrupt/ednwavlhd01%3A60020.1349814163628: CORRUPT block blk_-7209961989095415639

/hbase/.corrupt/ednwavlhd01%3A60020.1349814163628:  Under replicated blk_-7209961989095415639_403425. Target Replicas is 3 but found 1 replica(s).

Above blocks Corrupted in hbase when it replication. We have epochTime before :CORRUPT (1349814163628). See the date of the data corrupted and find out table belongs to this blocks and delete if not required.

To Delete the corrupted block run the below command

hadoop fsck -delete /

To move the corrupted block run the below command 

hadoop fsck -move /

-move option moves under /lost+found in hdfs partition. You may delete once move command move the files to this directory.
// Below script tag for SyntaxHighLighter