Map Reduce Framework
1) Lets say you have 10 datanode and taskTracker (node1..node10). If a single reducer runs in node4 and it produce output for single block first as per block placement policy the data block will be written in node4 and as per replication factor the data will replicated to other nodes. Always in which node the reducer task runs the output will be kept in the same node first and get replicates
2) Hadoop has "speculative execution feature" that if one task tracker / node fails after task submits jobtracker will smartly assign the task more and communicate with available task trackers.
3) If the file (block) is stored in 3 datanodes. And even if 2 datanode fails and if the data available even in 1 datanode then hadoop gets data to process.
Hadoop Job
If you not configured your linux machine with hadoop configuration pointing nameNode, dataNode..then hadoop job will run in localJobRunner mode using your local fileSystem
NameNode
The NameNode role is to transfer the block location according to the fileName given. So, to make the activity faster NameNode use RAM to store the fileName, block location and other file related metaData in RAM.
Combiner
For associative and commutative you should use combiner in your MapReduce jobs.In job driver use job.setCombinerClass(WordCountCombiner.class). It increase the efficiency of your mapReduce job. Each individual machine mapTask completion this combiner class will invoke and process aggregate and store the data for the reducer. Combiners perform local aggregation of word counts, thereby reducing the number of key-value pairs that need to be shuffled across the network to the reducers.
Ex : Lets say wordCount program split multiple map task. Each mapTask output we will have tuples [the, 1][the,1] for reducer. If you have combiner then combiner will combine [the,2] upon mapTask output immediate and keep for reducer. Now, mutliple mapTask executes will have '[the,2], '[the,3]' as combiner combiner already. So, reducer can easily do '[the,5]' because of only less data combiner already did most of the job. Combiner is not alternate for reducer. It can help for reducer. Reducer output can written to the file context. combiner output will be given to reducer and input take from mapper.
Partitioner
Partitioner is mainly supported to use for small files, you can specify input to reducer to process specific partition. Partitioner will slows down your process so justify yourself on your usage.
Ex: You have customer sales report 1 huge files for 12 months data and customers wants 12 files of each month data separate. In this case mapper can provide unique key as datetime you can use partitioner 12 with 12 reducer job.setNumofReducers(12) and pass each partition month data to each reducer. So, each reducer will produce 1 file (12 files total) with each month record.
TaskTracker
After a taskTracker has been assigned a task it creates a TaskRunner instance to manage the task. The taskRunner prepares the environment and launches the new JVM for the task. This process for every task assigned to task tracker.
Based on reading the topic partitioning Map / Reduce job, the Number of maps is usually driven by the number of DFS blocks in the input files. Basically, the input file (Scan records of hbase inputTable is scanning DeviceMaster table and the table has only 1 block. So, the Map was created for only 1 task).
Every taskTracker will periodically sends heartbeat to jobTracker. If jobTracker generates a task it assign task to first responded heartbeat taskTracker. Based on number of pending task if the taskTracker slot is free it submits the pending task to free slots. If pending task is less then assign all task to the taskTracker.Second choice is ideally the taskTracker gets task which has the data locally.
Especially, if job is submitted that will process a single input split, that job will contain a single map task then the heartbeat taskTracker will be assigned to process the task by jobTracker.
HDFS
HDFS is optimized for write-once. streaming access relatively to large files. You can append the data file if require. But, the better usage is to read files from the hdfs written once.
Block Data orphan
If the block data is orphan and some data is available in other block in other node, while the node process this record from 1 block it request other block in other node to provide the remaining information to process. Lets say block 1 in DN1 has some information and block 2 in DN2 has more information of the same fileName (record) data then the request will goes to DN2 requesting the record remaining information and data will get transfer to DN1 to process the complete record.
SequenceFile
SequenceFile contains name of the classes used for the key and values as the header in the file. You can read sequenceFile in hadoop using command. "-text" will convert sequenceFile content to toString and display. -cat will display the raw data of the sequence file which is not human readable format.
You can see in hbaseMaster Web UI (http://devnamenode:60010/master-status) for each table if you click you can see how many blocks the data is split-ted. You can split the blocks as required to increase the Map Task.- isSplittable method is passed in InputFormat on each fileName. Based on the booleanExpression true / false in the files mapper spawns the task. You can write custom FileInputFormat overriding isSpilttable to false to execute 1 single mapper for multiple storage blocks for the given file.
- Key Value pair on the MapReduce should always be used Writable Interface with Serialized and deserialize format in hadoop to write the data in the context (Eg : Text implements WritableInterface, IntWritable, NullWritable..)
- When MapReduce job is running the diskI/O and network I/O will increase. Because data written to local disk (hdfs) and data transferred between reducer to different nodes through network.
- MapReduce framework programming model isolated execution of tasks with one-time copying shuffle and sort data to reducer. Between reducer task there won't be any communication. You can RMI or messaging (Apache Kafka) externally mechanism to communicate between among tasks.
- You can pass 1 file or multiple file or directories to Mapper, based on the N number of blocks stored in hdfs(Default block size 64MB) for the input files, the number of map task will get initiate. 1 single map task is processed 1 inputSplit message. Some times 1 block message if the block size is big then it can be split multiple by inputSplit for better performance.
- If there is zero reducer than mapper can write each mapTask in the configured output directory. Based on N number of mapTasks N number of output files will get generated in HDFS.
- Each MapTask generate 1 single file and written the data to local disk (HDFS). Later reducer will pick the data from disk to process.
- (output of mappers - data path intermediate K/V place ) Intermediate key-value pairs are written to local disk of the machine running mapTask and later copied to machine running reduce tasks.
- If the reducer is set 1 then 1 reducer takes all the output from the mapper and write the output of the reducer in the single file in HDFS
- Reducer collects all the values for the given key and pass the key with all the values to reduce method to process upon map tasks 100% completion reduce() method kick. If reducer shows 10% while map task 85% then reduce task copying / transferring map files to process.
- Reducer writes the output file to HDFS same as mapper. Mapper does if reducer is set to 0 or no reducer class. Both number of files will be same as map task count or reduce task count.
1) Lets say you have 10 datanode and taskTracker (node1..node10). If a single reducer runs in node4 and it produce output for single block first as per block placement policy the data block will be written in node4 and as per replication factor the data will replicated to other nodes. Always in which node the reducer task runs the output will be kept in the same node first and get replicates
2) Hadoop has "speculative execution feature" that if one task tracker / node fails after task submits jobtracker will smartly assign the task more and communicate with available task trackers.
3) If the file (block) is stored in 3 datanodes. And even if 2 datanode fails and if the data available even in 1 datanode then hadoop gets data to process.
Hadoop Job
If you not configured your linux machine with hadoop configuration pointing nameNode, dataNode..then hadoop job will run in localJobRunner mode using your local fileSystem
NameNode
The NameNode role is to transfer the block location according to the fileName given. So, to make the activity faster NameNode use RAM to store the fileName, block location and other file related metaData in RAM.
Combiner
For associative and commutative you should use combiner in your MapReduce jobs.In job driver use job.setCombinerClass(WordCountCombiner.class). It increase the efficiency of your mapReduce job. Each individual machine mapTask completion this combiner class will invoke and process aggregate and store the data for the reducer. Combiners perform local aggregation of word counts, thereby reducing the number of key-value pairs that need to be shuffled across the network to the reducers.
Ex : Lets say wordCount program split multiple map task. Each mapTask output we will have tuples [the, 1][the,1] for reducer. If you have combiner then combiner will combine [the,2] upon mapTask output immediate and keep for reducer. Now, mutliple mapTask executes will have '[the,2], '[the,3]' as combiner combiner already. So, reducer can easily do '[the,5]' because of only less data combiner already did most of the job. Combiner is not alternate for reducer. It can help for reducer. Reducer output can written to the file context. combiner output will be given to reducer and input take from mapper.
Partitioner
Partitioner is mainly supported to use for small files, you can specify input to reducer to process specific partition. Partitioner will slows down your process so justify yourself on your usage.
Ex: You have customer sales report 1 huge files for 12 months data and customers wants 12 files of each month data separate. In this case mapper can provide unique key as datetime you can use partitioner 12 with 12 reducer job.setNumofReducers(12) and pass each partition month data to each reducer. So, each reducer will produce 1 file (12 files total) with each month record.
TaskTracker
After a taskTracker has been assigned a task it creates a TaskRunner instance to manage the task. The taskRunner prepares the environment and launches the new JVM for the task. This process for every task assigned to task tracker.
Based on reading the topic partitioning Map / Reduce job, the Number of maps is usually driven by the number of DFS blocks in the input files. Basically, the input file (Scan records of hbase inputTable is scanning DeviceMaster table and the table has only 1 block. So, the Map was created for only 1 task).
Every taskTracker will periodically sends heartbeat to jobTracker. If jobTracker generates a task it assign task to first responded heartbeat taskTracker. Based on number of pending task if the taskTracker slot is free it submits the pending task to free slots. If pending task is less then assign all task to the taskTracker.Second choice is ideally the taskTracker gets task which has the data locally.
Especially, if job is submitted that will process a single input split, that job will contain a single map task then the heartbeat taskTracker will be assigned to process the task by jobTracker.
HDFS
HDFS is optimized for write-once. streaming access relatively to large files. You can append the data file if require. But, the better usage is to read files from the hdfs written once.
Block Data orphan
If the block data is orphan and some data is available in other block in other node, while the node process this record from 1 block it request other block in other node to provide the remaining information to process. Lets say block 1 in DN1 has some information and block 2 in DN2 has more information of the same fileName (record) data then the request will goes to DN2 requesting the record remaining information and data will get transfer to DN1 to process the complete record.
SequenceFile
SequenceFile contains name of the classes used for the key and values as the header in the file. You can read sequenceFile in hadoop using command. "-text" will convert sequenceFile content to toString and display. -cat will display the raw data of the sequence file which is not human readable format.
$ hadoop fs -text sequenceFile
Note : You can also use JobConf Class (hadoop lib) to set the NumOfMapTask and NumOfReduceTask. But, in general leave it to hbase and it will do efficiently.
Ref : http://wiki.apache.org/hadoop/HowManyMapsAndReduces
Increase the map count when we use hbase.
In our System we use hbase db and have MapReduce job on top of it and scan results are the input parameters to map. I see map triggered only 1 Task in dev env. due to lesser data. In hbase Master page (http://devnamenode:60010/master-status) on the particular table i click and i split the regions into multiple by clicking split. Spitted into 3 regions. Then i did major_compact on the same table. Once done i submit the job and the job spawned 4 Maps based on the regions.
If the scan output comes in multiple regions based on the same the mapper will trigger for each region output.
Powerful Hadoop CommandLine to manage files on HDFS.
Some useful Hadoop Commands
Hadoop replication factor
Command to increase the hadoop replication factor from 3 to 4.
Below commands available if you want to replicate f1 file
$ hadoop fs -setrep 4 f1 $ hadoop fs -Ddfs.replication=4 -cp f1 f1.tmp; hadoop fs -rm f1; hadoop fs -mv f1.tmp f1
View the number of blocks for a given file
$ hadoop fsck [path] [options]
$ hadoop fsck /path/to/file -files -blocks
Create file in hdfs on the fly (Once you done type EOF Enter)
$ hadoop fs -put - file4.txt << EOF