INTRODUCTION TO HADOOP
Q. What is big data?
Big data is the huge amount of data that is beyond the storage and processing capabilities of a single physical machine.
Characteristics of Big Data:
Data that has very large volume, comes from variety of sources and formats and flows into an organization with a great velocity is normally referred to as Big Data.
Figure 1 Characteristics of Big Data
Q. What are the sources generating big data?
Employers àUsers à Machines
Employees: Historically, employees of organizations generated data.
Users: Then a shift occurred where users started generating data. For example, email, social media, photos, videos, audio and e-Commerce.
Machines: Smart phones, intelligent kitchen appliances, CCTV cameras, smart meters, global satellites, and traffic flow sensors
Q. Why do we need a new framework for handling big data?
Most of the traditional data was organized neatly in relational databases. Data sets now are so large and complex that they are beyond the capabilities of traditional storage and processing systems.
The following challenges demand cost-effective and innovative forms of handling big data at scale:
Lots of data
Organizations are increasingly required to store more and more data to survive in today’s highly competitive environment. The sheer volume of the data demands lower storage costs as compared to the expensive commercial relational database options.
Complex nature of data
Relational data model has great properties for structured data but many modern systems don’t fit well in row-column format. Data is now generated by diverse sources in various formats like multimedia, images, text, real-time feeds, and sensor streams. Usually for storage, the data is transformed, aggregated to fit into the structured format resulting in the loss of the original raw data.
New analysis techniques
Previously simple analysis (like average, sum) would prove to be sufficient to predict customer behavior. But now complex analysis needs to be performed to gain insightful understanding of data collected. For example, prediction models for effective micro-segmentation needs to analyse the customer’s purchase history, browsing behavior, likes and reviews on social media website to perform micro-segmentation. These advanced analytic techniques need the framework to run on.
Hadoop to rescue: Framework that provides low-cost storage and complex analytic processing capabilities
Q. Why do we need Hadoop framework, shouldn’t DFS be able to handle large volumes of data already?
Yes, it is true that when the datasets cannot fit in a single physical machine, then Distributed File System (DFS) partitions the data, store and manages the data across different machines. But, DFS lacks the following features for which we need Hadoop framework:
Fault tolerant:
When a lot of machines are involved chances of data loss increases. So, automatic fault tolerance and failure recovery becomes a prime concern.
Move data to computation:
If huge amounts of data are moved from storage to the computation machines then the speed depends on network bandwidth.
Q. What is the difference between traditional RDBMS and Hadoop?
RDBMS
Hadoop
Schema on write
Schema on read
Scale up approach
Scale out approach
Relational tables
Key-value format
Structured queries
Function programming
Online Transactions
Batch processing
Table 1 Difference between RDBMS and Hadoop
Q. What is Hadoop?
Hadoop is a framework that allows distributed processing of large data sets across clusters of computers using simple and fault tolerant programming model. It is designed to scale up from a very few to thousands of machines, each machine provides local computation and storage. The Hadoop software library itself is designed to detect and handle failures at the application layer.
Core components of Hadoop:
HDFS (Storage) + MapReduce/YARN (Processing)
Figure 2 Hadoop Ecosystem
PROGRAM: Installing Cloudera QuickStart VM in Oracle VirtualBox
Introduction
This section describes how to get started with Hadoop by importing the CDH virtual machine in Oracle VirtualBox. This 64-bit VM consists of a single-node Apache Hadoop cluster with lots of examples that is a good starting point for experimenting with Hadoop.
Terminology Used:
Host: The machine where VirtualBox or other Virtualization software in installed.
Guest: The virtual machine supported by the virtualization software is called as guest.
Note:
CDH 5 guest works well with RAM as low as 4GB but for Cloudera Manager to work the memory requirements go up to 8GB.
Software Versions
The software versions used are:
VirtualBox: 4.3.20
CDH: 5.3.x
Steps
1. Oracle VM VirtualBox Installation on Host
Download the platform dependent version of the VirtualBox from the Oracle VirtualBox website.
2. Cloudera CDH 5 VM Guest
Download the virtualization software dependent version of the QuickStart VM from Cloudera's website.
3. Importing the CDH VM
Start the VirtualBox and Click on ‘Import an appliance’ in the menu:
Figure 3 Import the CDH VM in VirtualBox
Select the .ovf file you have downloaded in the previous step. The guest’s setting will be displayed here. Click on the ‘Reinitialize the MAC address of a network cards’ checkbox:
Figure 4 Appliance Settings
4. VM Startup
On startup, Hue will be started automatically in a browser.
Enter the following login details:
Username: cloudera
Password: cloudera
Figure 5 Hue Login
5. View Tutorials
Click on ‘Get Started’ for tutorials on various tutorials:
Figure 6 Getting Started Tutorials
Chapter 2: HDFS
Q. What is HDFS?
Hadoop Distributed File Systems (HDFS) is one of the core components of Hadoop framework. It is a distributed file system for Hadoop. It runs on top of existing file system (ext2, ext3, etc.)
Goals: Automatic recovery from failures, Move Computation than data.
HDFS features:
· Supports storage of very large datasets
· Write once read many access model
· Streaming data access
· Replication using commodity hardware
Q. What is difference between regular file system and HDFS?
Regular File Systems
HDFS
Small block size of data (like 512 bytes)
Large block size (orders of 64mb)
Multiple disk seeks for large files
Reads data sequentially after single seek
Table 2 Regular File Systems Vs. HDFS
Q. What HDFS is not meant for?
HDFS is not good at:
· Applications that requires low latency access to data (in terms of milliseconds)
· Lot of small files
· Multiple writers and file modifications
Q. What is HDFS block size and what did you chose in your project?
By default, the HDFS block size is 64MB. It can be set to higher values as 128MB or 256MB. 128MB is acceptable industry standard.
Q. What is the default replication factor?
Default replication factor is 3
Q. What are different hdfs dfs shell commands to perform copy operation?
$ hadoop fs -copyToLocal
$ hadoop fs -copyFromLocal
$ hadoop fs -put
Q. What are the problems with Hadoop 1.0?
· NameNode: No Horizontal Scalability and No High Availability
· Job Tracker: Overburdened.
· MRv1: It can only understand Map and Reduce tasks
Q. What comes in Hadoop 2.0 and MapReduce V2 (YARN)?
NameNode: HA and Federation
JobTracker: Cluster and application resource
Q. What different type of schedulers and type of scheduler did you use?
Capacity Scheduler
It is designed to run Hadoop applications as a shared, multi-tenant cluster while maximizing the throughput and the utilization of the cluster.
Fair Scheduler
Fair scheduling is a method of assigning resources to applications such that all apps get, on average, an equal share of resources over time.
Q. Steps involved in decommissioning (removing) the nodes in the Hadoop cluster?
· Update the network addresses in the dfs.exclude and mapred.exclude
· $ hadoop dfsadmin -refreshNodes and hadoop mradmin -refreshNodes
· Check Web UI it will show “Decommissioning in Progress”
· Remove the Nodes from include file and then run again the step 2 refreshNodes.
· Remove the Nodes from slave file.
Q. Steps involved in commissioning (adding) the nodes in the Hadoop cluster?
· Update the network addresses in the dfs.include and mapred.include
· $ hadoop dfsadmin -refreshNodes and hadoop mradmin -refreshNodes
· Update the slave file.
· Start the DataNode and NodeManager on the added Node.
Q. How to keep HDFS cluster balanced?
Balancer is a tool that tries to provide a balance to a certain threshold among data nodes by copying block data distribution across the cluster.
Q. What is distcp?
· distcp is the program comes with Hadoop for copying large amount of data to and from Hadoop file systems in parallel.
· It is implemented as MapReduce job where copying is done through maps that run in parallel across the cluster.
· There are no reducers.
Q. What are the daemons of HDFS?
· NameNode
· DataNode
· Secondary NameNode.
Q. Command to format the NameNode?
$ hdfs namenode -format <cluster_name>
Q. What are the functions of NameNode?
The NameNode is mainly responsible for:
Namespace
Maintain metadata about the data
Block Management
Processes block reports and maintain location of blocks.
Supports block related operations
Manages replica placement
Q. What is HDFS Federation?
· HDFS federation allows scaling the name service horizontally; it uses multiple independent NameNodes for different namespaces.
· All the NameNodes use the DataNodes as common storage for blocks.
· Each DataNode registers with all the NameNodes in the cluster.
· DataNodes send periodic heartbeats and block reports and handles commands from the NameNodes
Q. What is HDFS High Availability?
· In HDFS High Availability (HA) cluster; two separate machines are configured as NameNodes.
· But one of the NameNodes is in an Active state; other is in a Standby state.
· The Active NameNode is responsible for all client operations in the cluster, while the Standby is simply acting as a slave, maintaining enough state to provide a fast failover if necessary
· They shared the same storage and all DataNodes connects to both the NameNodes.
Q. How client application interacts with the NameNode?
· Client applications interact using Hadoop HDFS API with the NameNode when it has to locate/add/copy/move/delete a file.
· The NameNode responds the successful requests by returning a list of relevant DataNode servers where the data is residing.
· Client can talk directly to a DataNode after the NameNode has given the location of the data
Q. What is a DataNode?
· A DataNode stores data in the Hadoop File System HDFS is a slave node.
· On startup, a DataNode connects to the NameNode.
· DataNode instances can talk to each other mostly during replication.
Q. What is rack-aware replica placement policy?
· Rack-awareness is used to take a node's physical location into account while scheduling tasks and allocating storage.
· Default replication factor is 3 for a data blocks on HDFS.
· The first two copies are stored on DataNodes located on the same rack while the third copy is stored on a different rack.
Figure 7 HDFS Block Replication
Q. What is the main purpose of HDFS fsck command?
fsck a utility to check health of the file system, to find missing files, over-replicated, under-replicated and corrupted blocks.
Command for finding the blocks for a file:
$ hadoop fsck <name of file> -files -blocks –racks
Q. What is the purpose of DataNode block scanner?
· Block scanner runs on every DataNode, which periodically verifies all the blocks stored on the DataNode.
· If bad blocks are detected it will be fixed before any client reads.
Q. What is the purpose of dfsadmin tool?
· It is used to find information about the state of HDFS
· It performs administrative tasks on HDFS
· Invoked by hadoop dfsadmin command as superuser
Q. What is the command for printing the topology?
It displays a tree of racks and DataNodes attached to the tracks as viewed by the <NameNode>.hdfs dfsadmin -printTopology
Q. What is RAID?
RAID is a way of combining multiple disk drives into a single entity to improve performance and/or reliability. There are a variety of different levels in RAID
For example, In RAID level 1 copy of the same data on two disks increases the read performance by reading alternately from each disk in the mirror.
Q. Does Hadoop requires RAID?
· In DataNodes storage is not using RAID as redundancy can be achieved by replication between the Nodes.
· In NameNode’s disk RAID is recommended.
Q. What are the site-specific configuration files in Hadoop?
· conf/core-site.xml
· conf/hdfs-site.xml
· conf/yarn-site.xml
· conf/mapred-site.xml.
· conf/hadoop-env.sh
· conf/yarn-env.sh
Chapter 3: MAP REDUCE
Q. What is MapReduce?
MapReduce is a programming model for processing on the distributed datasets on the clusters of computer.
MapReduce Features:
· Distributed programming complexity is hidden
· Built in fault-tolerance
· Programming model is language independent
· Parallelization and distribution are automatic
· Enable data local processing
Q. What is the fundamental idea behind YARN?
In YARN (Yet Another Resource Allocator), JobTracker responsibility is split into:
· Resource management
· Job scheduling/monitoring having separate daemons.
Yarn supports additional processing models and implements a more flexible execution engine.
Q. What MapReduce framework consists of?
Figure 8 MapReduce Yarn
ResourceManager (RM)
· Global resource scheduler
· One master RM
NodeManager (NM)
· One slave NM per cluster-node.
Container
· RM creates Containers upon request by AM
· Application runs in one or more containers
ApplicationMaster (AM)
· One AM per application
· Runs in Container
Q. What are different daemons in YARN?
· ResourceManager: Global resource manager.
· NodeManager: One per data node, It manages and monitors usage of the container (resources in terms of Memory, CPU).
· ApplicationMaster: One per application, Tasks are started by NodeManager
Q. What are the two main components of ResourceManager?
Scheduler
It allocates the resources (containers) to various running applications: Container elements such as memory, CPU, disk etc.
ApplicationManager
It accepts job-submissions, negotiating for container for executing the application specific ApplicationMaster and provides the service for restarting the ApplicationMaster container on failure.
Q. What is the function of NodeManager?
The NodeManager is the resource manager for the node (Per machine) and is responsible for containers, monitoring their resource usage (cpu, memory, disk, network) and reporting the same to the ResourceManager
Q. What is the function of ApplicationMaster?
ApplicationMaster is per application and it has the responsibility of negotiating appropriate resource containers from the Scheduler, tracking their status and monitoring for progress.
Q. What are the minimum configuration requirements for a MapReduce application?
The job configuration requires the
· input location
· output location
· map() function
· reduce() functions and
· job parameters.
Q. What are the steps to submit a Hadoop job?
Steps involved in Hadoop job submission:
· Hadoop job client submits the job jar/executable and configuration to the ResourceManager.
· ResourceManager then distributes the software/configuration to the slaves.
· ResourceManager then scheduling tasks and monitoring them.
· Finally, job status and diagnostic information is provided to the client.
Q. How does MapReduce framework view its input internally?
It views the input as a set of <key, value> pairs and produces a set of <key, value> pairs as the output of the job.
Q. Assuming default configurations, how is a file of the size 1 GB (uncompressed) stored in HDFS?
Default block size is 64MB. So, file of 1GB will be stored as 16 blocks. MapReduce job will create 16 input splits; each will be processed with separate map task i.e.
16 mappers.
Q. What are Hadoop Writables?
Hadoop Writables allows Hadoop to read and write the data in a serialized form for transmission as compact binary files. This helps in straightforward random access and higher performance. Hadoop provides in built classes, which implement Writable: Text, IntWritable, LongWritable, FloatWritable, and BooleanWritable.
Q. Why comparison of types is important for MapReduce?
Comparison is important as in the sorting phase the keys are compared with one another. For comparison the WritableComparable interface is implemented.
Q. What is the purpose of RawComparator interface?
RawComparator allows the implementors to compare records read from a stream without deserialization them into objects, so it will be optimized, as there is not overhead of object creation.
Q. What is a NullWritable?
It is a special type of Writable that has zero-length serialization. In MapReduce, a key or a value can be declared as NullWritable if we don’t need that position, storing constant empty value.
Q. What is Avro Serialization System?
Avro is a language-neutral data serialization system. It has data formats that work with different languages. Avro data is described using a language independent schema (usually written in JSON). Avro data files support compression and are splittable.
Avro provides AvroMapper and AvroReducer to run MapReduce programs.
Q. Explain use cases where SequenceFile class can be a good fit?
When the data is of type binary then SequenceFile will provide a persistent structure for binary key-value pairs. SequenceFiles also work well as containers for smaller files as HDFS and MapReduce are optimized for large files.
Q. What is MapFile?
A MapFile is an indexed SequenceFile and it is used for look-ups by key.
Q. What is the core of the job in MapReduce framework?
Core of a job:
Mapper interface: map method
Reducer interface reduce method
Q. What are the steps involved in MapReduce framework?
· Firstly, the mapper input key/value pairs maps to a set of intermediate key/value pairs.
· Maps are the individual tasks that transform input records into intermediate records.
· The transformed intermediate records do not need to be of the same type as the input records.
· A given input pair maps to zero or many output pairs.
· The Hadoop MapReduce framework creates one map task for each InputSplit generated by the InputFormat for the job.
· It then calls map(WritableComparable, Writable, Context) for each key/value pair in the InputSplit for that task.
· All intermediate values associated with a given output key are grouped passed to the Reducers.
Q. Where is the Mapper Output stored?
The mapper output is stored on the Local file system of each individual mapper nodes. The intermediate data is cleaned up after the Hadoop Job completes.
Q. What is a partitioner and how the user can control which key will go to which reducer?
Partitioner controls the partitioning of the keys of the intermediate map-outputs by the default. The key to decide the partition uses hash function. Default partitioner is HashPartitioner.
A custom partitioner is implemented to control, which keys go to which Reducer.
public class SamplePartitioner extends Partitioner<Text, Text> {
@Override
public int getPartition(Text key, Text value, int numReduceTasks) {
}
}
Q. What are combiners and its purpose?
· Combiners are used to increase the efficiency of a MapReduce program. It can be used to aggregate intermediate map output locally on individual mapper outputs.
· Combiners can help reduce the amount of data that needs to be transferred across to the reducers.
· Reducer code as a combiner if the operation performed is commutative and associative.
· Hadoop may or may not execute a combiner.
Q. How number of partitioners and reducers are related?
The total numbers of partitions are the same as the number of reduce tasks for the job.
Q. What is IdentityMapper?
IdentityMapper implements the mapping inputs directly to output. IdentityMapper.class is used as a default value when JobConf.setMapperClass is not set.
Q. What is IdentityReducer?
In IdentityReducer no reduction is performed, writing all input values directly to the output. IdentityReducer.class is used as a default value when JobConf.setReducerClass is not set
Q. What is the reducer and its phases?
Reducer reduces a set of intermediate values, which has same key to a smaller set of values. The framework then calls reduce().
Syntax:
reduce(WritableComparable, Iterable<Writable>, Context) method for each <key, (list of values)> pair in the grouped inputs.
Reducer has three primary phases:
· Shuffle
· Sort
· Reduce
Q. How to set the number of reducers?
The number of reduces for the user sets the job:
· Job.setNumReduceTasks(int)
· -D mapreduce.job.reduces
Q. Detail description of the Reducer phases?
Shuffle:
Sorted output (Mapper) à Input (Reducer). Framework then fetches the relevant partition of the output of all the mappers.
Sort:
The framework groups Reducer inputs by keys. The shuffle and sort phases occur simultaneously; while map-outputs are being fetched they are merged.
Secondary Sort:
Grouping the intermediate keys are required to be different from those for grouping keys before reduction, then Job.setSortComparatorClass(Class).
Reduce:
reduce(WritableComparable, Iterable<Writable>, Context) method is called for each <key, (list of values)> pair in the grouped inputs.
The output of the reduce task is typically written using Context.write(WritableComparable, Writable).
Q. Can there be no Reducer?
Yes, the number of reducer can be zero if no reduction of values is required.
Q. What can be optimum value for Reducer?
Value of Reducers can be: 0.95
1.75 multiplied by (<number of nodes> * < number of maximum containers per node>)
Increasing number of reducers
· Increases the framework overhead
· Increases load balancing
· Lowers the cost of failures
Q. What is a Counter and its purpose?
Counter is a facility for MapReduce applications to report its statistics. They can be used to track job progress in a very easy and flexible manner. It is defined by MapReduce framework or by applications. Each Counter can be of any Enum type. Applications can define counters of type Enum and update them via counters.incrCounter in the map and/or reduce methods.
Q. Define different types of Counters?
Built in Counters:
· Map Reduce Task Counters
· Job Counters
Custom Java Counters:
MapReduce allows users to specify their own counters (using Java enums) for performing their own counting operation.
Q. Why Counter values are shared by all map and reduce tasks across the MapReduce framework?
Counters are global so shared across the MapReduce framework and aggregated at the end of the job across all the tasks.
Q. Explain speculative execution.
· Speculative execution is a way of dealing with individual machine’s performance. As there are lots of machines in the cluster, some machines can have low performance, which affects the performance of the whole job.
· Speculative execution in Hadoop can run multiple copies of the same map or reduce task on different task tracker nodes and the results from first node to finish are used.
Q. What is DistributedCache and its purpose?
DistributedCache is a facility provided by the MapReduce framework to cache files (text, archives, jars etc.) needed by applications. It distributes application-specific, large, read-only files efficiently. The user needs to use DistributedCache to distribute and symlink the script file.
Q. What is the Job interface in MapReduce framework?
Job is the primary interface for a user to describe a MapReduce job to the Hadoop framework for execution. Some basic parameters are configured for example:
· Job.setNumReduceTasks(int)
· Configuration.set(JobContext.NUM_MAPS, int)
· Mapper
· Combiner (if any)
· Partitioner
· Reducer
· InputFormat
· OutputFormat implementations
· setMapSpeculativeExecution(boolean))/ setReduceSpeculativeExecution(boolean))
· Maximum number of attempts per task (setMaxMapAttempts(int)/ setMaxReduceAttempts(int)) etc.
· DistributedCache for large amounts of (read-only) data.
Q. What is the default value of map and reduce max attempts?
Framework will try to execute a map task or reduce task by default 4 times before giving up on it.
Q. Explain InputFormat?
InputFormat describes the input-specification for a MapReduce job. The MapReduce framework depends on the InputFormat of the job to:
· Checks the input-specification of the job.
· It then splits the input file(s) into logical InputSplit instances, each of which is then assigned to an individual Mapper.
· To extract input records from the logical InputSplit for processing by the Mapper it provides the RecordReader implementation.
Default: TextInputFormat
Q. What is InputSplit and RecordReader?
· InputSplit specify the data to be processed by an individual Mapper.
· In general, InputSplit presents a byte-oriented view of the input.
Default: FileSplit
· RecordReader reads <key, value> pairs from an InputSplit, then processes them and presents record-oriented view
Q. Explain the Job OutputFormat?
OutputFormat describes details of the output for a MapReduce job.
The MapReduce framework depends on the OutputFormat of the job to:
· It checks the job output-specification
· To write the output files of the job in the <key, value> pairs, it provides the RecordWriter implementation.
Default: TextOutputFormat
Q. How is the option in Hadoop to skip the bad records?
Hadoop provides an option where a certain set of bad input records can be skipped when processing map inputs. This feature can be controlled through the SkipBadRecords class.
Q. Different ways of debugging a job in MapReduce?
· Add debug statement to log to standard error along with the message to update the task’s status message. Web UI makes it easier to view.
· Create a custom counter, it gives valuable information to deal with the problem dataset
· Task page and task detailed page
· Hadoop Logs
· MRUnit testing
PROGRAM 1: Counting the number of words in an input file
Introduction
This section describes how to get the word count of a sample input file.
Software Versions
The software versions used are:
VirtualBox: 4.3.20
CDH 5.3: Default MapReduce Version
hadoop-core-2.5.0
hadoop-yarn-common-2.5.0
Steps
1. Create the input file
Create the input.txt file with sample text.
$ vi input.txt
Thanks Lord Krishna for helping us write this book
Hare Krishna Hare Krishna Krishna Krishna Hare Hare
Hare Rama Hare Rama Rama Rama Hare Hare
2. Move the input file into HDFS
Use the –put or –copyFromLocal command to move the file into HDFS
$ hadoop fs -put input.txt
3. Code for the MapReduce program
Java files:
WordCountProgram.java // Driver Program
WordMapper.java // Mapper Program
WordReducer.java // Reducer Program
--------------------------------------------------
WordCountProgram.java File: Driver Program
--------------------------------------------------
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
public class WordCountProgram extends Configured implements Tool{
@Override
public int run(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = new Job(conf, "wordcountprogram");
job.setJarByClass(getClass());
// Configure output and input source
TextInputFormat.addInputPath(job, new Path(args[0]));
job.setInputFormatClass(TextInputFormat.class);
job.setMapperClass(WordMapper.class);
job.setReducerClass(WordReducer.class);
// Configure output
TextOutputFormat.setOutputPath(job, new Path(args[1]));
job.setOutputFormatClass(TextOutputFormat.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
return job.waitForCompletion(true) ? 0 : 1;
}
public static void main(String[] args) throws Exception {
int exitCode = ToolRunner.run(new WordCountProgram(), args);
System.exit(exitCode);
}
}
--------------------------------------------------
WordMapper.java File: Mapper Program
--------------------------------------------------
import java.io.IOException;
import java.util.StringTokenizer;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
public class WordMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
private final static IntWritable count = new IntWritable(1);
private final Text nameText = new Text();
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException,
InterruptedException {
StringTokenizer tokenizer = new StringTokenizer(value.toString()," ");
while (tokenizer.hasMoreTokens()) {
nameText.set(tokenizer.nextToken());
context.write(nameText, count);
}
}
}
--------------------------------------------------
WordReducer.java file: Reducer Program
--------------------------------------------------
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
public class WordReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
@Override
protected void reduce(Text t, Iterable<IntWritable> counts, Context context)
throws IOException, InterruptedException {
int sum = 0;
for (IntWritable count : counts) {
sum += count.get();
}
context.write(t, new IntWritable(sum));
}
}
4. Run the MapReduce program
Create the jar of the Code in Step 3 and use the following command to run the MapReduce program
$ hadoop jar WordCount.jar WordCountProgram input.txt output1
Here,
WordCount.jar: Name of jar exported having the all the methods.
WordCountProgram: Driver Program having the entire configuration
input.txt: Input file
output1: Output folder where the output file will be stored
5. View the Output
View the output in the output1 folder
$ hadoop fs -cat /user/cloudera/output1/part-r-00000
Hare 8
Krishna 5
Lord 1
Rama 4
Thanks 1
book 1
for 1
helping 1
this 1
us 1
write 1
PROGRAM 2: Simple Custom Partitioner Program
Introduction
We will use custom partitioning in MapReduce program to display the records in two age categories less than age 15 and greater than 15.In this program our data is partitioned into 2 partitions according to the age field in our data.
Software Versions
The software versions used are:
VirtualBox: 4.3.20
CDH 5.3: Default MapReduce Version
hadoop-core-2.5.0
hadoop-yarn-common-2.5.0
Steps
1. Create the input file
Create the grades.txt file where each record is Name,Age,marks text.
$ vi grades.txt
Shiva,10,90
Mahi,5,95
Shivi,15,99
Hari,10,100
Alex,20,85
2. Move the input file into HDFS
Use the –put or –copyFromLocal command to move the file into HDFS
$ hadoop fs -put input.txt
3. Code for the MapReduce program
Java files:
CustomParitionDriver.java //Driver program
PartitionMapper.java //Mapper program
MarksPartitioner.java //partition the data according to age
PartitionReducer.java //displaying data in 2 partitions
Q. What is big data?
Big data is the huge amount of data that is beyond the storage and processing capabilities of a single physical machine.
Characteristics of Big Data:
Data that has very large volume, comes from variety of sources and formats and flows into an organization with a great velocity is normally referred to as Big Data.
Figure 1 Characteristics of Big Data
Q. What are the sources generating big data?
Employers àUsers à Machines
Employees: Historically, employees of organizations generated data.
Users: Then a shift occurred where users started generating data. For example, email, social media, photos, videos, audio and e-Commerce.
Machines: Smart phones, intelligent kitchen appliances, CCTV cameras, smart meters, global satellites, and traffic flow sensors
Q. Why do we need a new framework for handling big data?
Most of the traditional data was organized neatly in relational databases. Data sets now are so large and complex that they are beyond the capabilities of traditional storage and processing systems.
The following challenges demand cost-effective and innovative forms of handling big data at scale:
Lots of data
Organizations are increasingly required to store more and more data to survive in today’s highly competitive environment. The sheer volume of the data demands lower storage costs as compared to the expensive commercial relational database options.
Complex nature of data
Relational data model has great properties for structured data but many modern systems don’t fit well in row-column format. Data is now generated by diverse sources in various formats like multimedia, images, text, real-time feeds, and sensor streams. Usually for storage, the data is transformed, aggregated to fit into the structured format resulting in the loss of the original raw data.
New analysis techniques
Previously simple analysis (like average, sum) would prove to be sufficient to predict customer behavior. But now complex analysis needs to be performed to gain insightful understanding of data collected. For example, prediction models for effective micro-segmentation needs to analyse the customer’s purchase history, browsing behavior, likes and reviews on social media website to perform micro-segmentation. These advanced analytic techniques need the framework to run on.
Hadoop to rescue: Framework that provides low-cost storage and complex analytic processing capabilities
Q. Why do we need Hadoop framework, shouldn’t DFS be able to handle large volumes of data already?
Yes, it is true that when the datasets cannot fit in a single physical machine, then Distributed File System (DFS) partitions the data, store and manages the data across different machines. But, DFS lacks the following features for which we need Hadoop framework:
Fault tolerant:
When a lot of machines are involved chances of data loss increases. So, automatic fault tolerance and failure recovery becomes a prime concern.
Move data to computation:
If huge amounts of data are moved from storage to the computation machines then the speed depends on network bandwidth.
Q. What is the difference between traditional RDBMS and Hadoop?
RDBMS
Hadoop
Schema on write
Schema on read
Scale up approach
Scale out approach
Relational tables
Key-value format
Structured queries
Function programming
Online Transactions
Batch processing
Table 1 Difference between RDBMS and Hadoop
Q. What is Hadoop?
Hadoop is a framework that allows distributed processing of large data sets across clusters of computers using simple and fault tolerant programming model. It is designed to scale up from a very few to thousands of machines, each machine provides local computation and storage. The Hadoop software library itself is designed to detect and handle failures at the application layer.
Core components of Hadoop:
HDFS (Storage) + MapReduce/YARN (Processing)
Figure 2 Hadoop Ecosystem
PROGRAM: Installing Cloudera QuickStart VM in Oracle VirtualBox
Introduction
This section describes how to get started with Hadoop by importing the CDH virtual machine in Oracle VirtualBox. This 64-bit VM consists of a single-node Apache Hadoop cluster with lots of examples that is a good starting point for experimenting with Hadoop.
Terminology Used:
Host: The machine where VirtualBox or other Virtualization software in installed.
Guest: The virtual machine supported by the virtualization software is called as guest.
Note:
CDH 5 guest works well with RAM as low as 4GB but for Cloudera Manager to work the memory requirements go up to 8GB.
Software Versions
The software versions used are:
VirtualBox: 4.3.20
CDH: 5.3.x
Steps
1. Oracle VM VirtualBox Installation on Host
Download the platform dependent version of the VirtualBox from the Oracle VirtualBox website.
2. Cloudera CDH 5 VM Guest
Download the virtualization software dependent version of the QuickStart VM from Cloudera's website.
3. Importing the CDH VM
Start the VirtualBox and Click on ‘Import an appliance’ in the menu:
Figure 3 Import the CDH VM in VirtualBox
Select the .ovf file you have downloaded in the previous step. The guest’s setting will be displayed here. Click on the ‘Reinitialize the MAC address of a network cards’ checkbox:
Figure 4 Appliance Settings
4. VM Startup
On startup, Hue will be started automatically in a browser.
Enter the following login details:
Username: cloudera
Password: cloudera
Figure 5 Hue Login
5. View Tutorials
Click on ‘Get Started’ for tutorials on various tutorials:
Figure 6 Getting Started Tutorials
Chapter 2: HDFS
Q. What is HDFS?
Hadoop Distributed File Systems (HDFS) is one of the core components of Hadoop framework. It is a distributed file system for Hadoop. It runs on top of existing file system (ext2, ext3, etc.)
Goals: Automatic recovery from failures, Move Computation than data.
HDFS features:
· Supports storage of very large datasets
· Write once read many access model
· Streaming data access
· Replication using commodity hardware
Q. What is difference between regular file system and HDFS?
Regular File Systems
HDFS
Small block size of data (like 512 bytes)
Large block size (orders of 64mb)
Multiple disk seeks for large files
Reads data sequentially after single seek
Table 2 Regular File Systems Vs. HDFS
Q. What HDFS is not meant for?
HDFS is not good at:
· Applications that requires low latency access to data (in terms of milliseconds)
· Lot of small files
· Multiple writers and file modifications
Q. What is HDFS block size and what did you chose in your project?
By default, the HDFS block size is 64MB. It can be set to higher values as 128MB or 256MB. 128MB is acceptable industry standard.
Q. What is the default replication factor?
Default replication factor is 3
Q. What are different hdfs dfs shell commands to perform copy operation?
$ hadoop fs -copyToLocal
$ hadoop fs -copyFromLocal
$ hadoop fs -put
Q. What are the problems with Hadoop 1.0?
· NameNode: No Horizontal Scalability and No High Availability
· Job Tracker: Overburdened.
· MRv1: It can only understand Map and Reduce tasks
Q. What comes in Hadoop 2.0 and MapReduce V2 (YARN)?
NameNode: HA and Federation
JobTracker: Cluster and application resource
Q. What different type of schedulers and type of scheduler did you use?
Capacity Scheduler
It is designed to run Hadoop applications as a shared, multi-tenant cluster while maximizing the throughput and the utilization of the cluster.
Fair Scheduler
Fair scheduling is a method of assigning resources to applications such that all apps get, on average, an equal share of resources over time.
Q. Steps involved in decommissioning (removing) the nodes in the Hadoop cluster?
· Update the network addresses in the dfs.exclude and mapred.exclude
· $ hadoop dfsadmin -refreshNodes and hadoop mradmin -refreshNodes
· Check Web UI it will show “Decommissioning in Progress”
· Remove the Nodes from include file and then run again the step 2 refreshNodes.
· Remove the Nodes from slave file.
Q. Steps involved in commissioning (adding) the nodes in the Hadoop cluster?
· Update the network addresses in the dfs.include and mapred.include
· $ hadoop dfsadmin -refreshNodes and hadoop mradmin -refreshNodes
· Update the slave file.
· Start the DataNode and NodeManager on the added Node.
Q. How to keep HDFS cluster balanced?
Balancer is a tool that tries to provide a balance to a certain threshold among data nodes by copying block data distribution across the cluster.
Q. What is distcp?
· distcp is the program comes with Hadoop for copying large amount of data to and from Hadoop file systems in parallel.
· It is implemented as MapReduce job where copying is done through maps that run in parallel across the cluster.
· There are no reducers.
Q. What are the daemons of HDFS?
· NameNode
· DataNode
· Secondary NameNode.
Q. Command to format the NameNode?
$ hdfs namenode -format <cluster_name>
Q. What are the functions of NameNode?
The NameNode is mainly responsible for:
Namespace
Maintain metadata about the data
Block Management
Processes block reports and maintain location of blocks.
Supports block related operations
Manages replica placement
Q. What is HDFS Federation?
· HDFS federation allows scaling the name service horizontally; it uses multiple independent NameNodes for different namespaces.
· All the NameNodes use the DataNodes as common storage for blocks.
· Each DataNode registers with all the NameNodes in the cluster.
· DataNodes send periodic heartbeats and block reports and handles commands from the NameNodes
Q. What is HDFS High Availability?
· In HDFS High Availability (HA) cluster; two separate machines are configured as NameNodes.
· But one of the NameNodes is in an Active state; other is in a Standby state.
· The Active NameNode is responsible for all client operations in the cluster, while the Standby is simply acting as a slave, maintaining enough state to provide a fast failover if necessary
· They shared the same storage and all DataNodes connects to both the NameNodes.
Q. How client application interacts with the NameNode?
· Client applications interact using Hadoop HDFS API with the NameNode when it has to locate/add/copy/move/delete a file.
· The NameNode responds the successful requests by returning a list of relevant DataNode servers where the data is residing.
· Client can talk directly to a DataNode after the NameNode has given the location of the data
Q. What is a DataNode?
· A DataNode stores data in the Hadoop File System HDFS is a slave node.
· On startup, a DataNode connects to the NameNode.
· DataNode instances can talk to each other mostly during replication.
Q. What is rack-aware replica placement policy?
· Rack-awareness is used to take a node's physical location into account while scheduling tasks and allocating storage.
· Default replication factor is 3 for a data blocks on HDFS.
· The first two copies are stored on DataNodes located on the same rack while the third copy is stored on a different rack.
Figure 7 HDFS Block Replication
Q. What is the main purpose of HDFS fsck command?
fsck a utility to check health of the file system, to find missing files, over-replicated, under-replicated and corrupted blocks.
Command for finding the blocks for a file:
$ hadoop fsck <name of file> -files -blocks –racks
Q. What is the purpose of DataNode block scanner?
· Block scanner runs on every DataNode, which periodically verifies all the blocks stored on the DataNode.
· If bad blocks are detected it will be fixed before any client reads.
Q. What is the purpose of dfsadmin tool?
· It is used to find information about the state of HDFS
· It performs administrative tasks on HDFS
· Invoked by hadoop dfsadmin command as superuser
Q. What is the command for printing the topology?
It displays a tree of racks and DataNodes attached to the tracks as viewed by the <NameNode>.hdfs dfsadmin -printTopology
Q. What is RAID?
RAID is a way of combining multiple disk drives into a single entity to improve performance and/or reliability. There are a variety of different levels in RAID
For example, In RAID level 1 copy of the same data on two disks increases the read performance by reading alternately from each disk in the mirror.
Q. Does Hadoop requires RAID?
· In DataNodes storage is not using RAID as redundancy can be achieved by replication between the Nodes.
· In NameNode’s disk RAID is recommended.
Q. What are the site-specific configuration files in Hadoop?
· conf/core-site.xml
· conf/hdfs-site.xml
· conf/yarn-site.xml
· conf/mapred-site.xml.
· conf/hadoop-env.sh
· conf/yarn-env.sh
Chapter 3: MAP REDUCE
Q. What is MapReduce?
MapReduce is a programming model for processing on the distributed datasets on the clusters of computer.
MapReduce Features:
· Distributed programming complexity is hidden
· Built in fault-tolerance
· Programming model is language independent
· Parallelization and distribution are automatic
· Enable data local processing
Q. What is the fundamental idea behind YARN?
In YARN (Yet Another Resource Allocator), JobTracker responsibility is split into:
· Resource management
· Job scheduling/monitoring having separate daemons.
Yarn supports additional processing models and implements a more flexible execution engine.
Q. What MapReduce framework consists of?
Figure 8 MapReduce Yarn
ResourceManager (RM)
· Global resource scheduler
· One master RM
NodeManager (NM)
· One slave NM per cluster-node.
Container
· RM creates Containers upon request by AM
· Application runs in one or more containers
ApplicationMaster (AM)
· One AM per application
· Runs in Container
Q. What are different daemons in YARN?
· ResourceManager: Global resource manager.
· NodeManager: One per data node, It manages and monitors usage of the container (resources in terms of Memory, CPU).
· ApplicationMaster: One per application, Tasks are started by NodeManager
Q. What are the two main components of ResourceManager?
Scheduler
It allocates the resources (containers) to various running applications: Container elements such as memory, CPU, disk etc.
ApplicationManager
It accepts job-submissions, negotiating for container for executing the application specific ApplicationMaster and provides the service for restarting the ApplicationMaster container on failure.
Q. What is the function of NodeManager?
The NodeManager is the resource manager for the node (Per machine) and is responsible for containers, monitoring their resource usage (cpu, memory, disk, network) and reporting the same to the ResourceManager
Q. What is the function of ApplicationMaster?
ApplicationMaster is per application and it has the responsibility of negotiating appropriate resource containers from the Scheduler, tracking their status and monitoring for progress.
Q. What are the minimum configuration requirements for a MapReduce application?
The job configuration requires the
· input location
· output location
· map() function
· reduce() functions and
· job parameters.
Q. What are the steps to submit a Hadoop job?
Steps involved in Hadoop job submission:
· Hadoop job client submits the job jar/executable and configuration to the ResourceManager.
· ResourceManager then distributes the software/configuration to the slaves.
· ResourceManager then scheduling tasks and monitoring them.
· Finally, job status and diagnostic information is provided to the client.
Q. How does MapReduce framework view its input internally?
It views the input as a set of <key, value> pairs and produces a set of <key, value> pairs as the output of the job.
Q. Assuming default configurations, how is a file of the size 1 GB (uncompressed) stored in HDFS?
Default block size is 64MB. So, file of 1GB will be stored as 16 blocks. MapReduce job will create 16 input splits; each will be processed with separate map task i.e.
16 mappers.
Q. What are Hadoop Writables?
Hadoop Writables allows Hadoop to read and write the data in a serialized form for transmission as compact binary files. This helps in straightforward random access and higher performance. Hadoop provides in built classes, which implement Writable: Text, IntWritable, LongWritable, FloatWritable, and BooleanWritable.
Q. Why comparison of types is important for MapReduce?
Comparison is important as in the sorting phase the keys are compared with one another. For comparison the WritableComparable interface is implemented.
Q. What is the purpose of RawComparator interface?
RawComparator allows the implementors to compare records read from a stream without deserialization them into objects, so it will be optimized, as there is not overhead of object creation.
Q. What is a NullWritable?
It is a special type of Writable that has zero-length serialization. In MapReduce, a key or a value can be declared as NullWritable if we don’t need that position, storing constant empty value.
Q. What is Avro Serialization System?
Avro is a language-neutral data serialization system. It has data formats that work with different languages. Avro data is described using a language independent schema (usually written in JSON). Avro data files support compression and are splittable.
Avro provides AvroMapper and AvroReducer to run MapReduce programs.
Q. Explain use cases where SequenceFile class can be a good fit?
When the data is of type binary then SequenceFile will provide a persistent structure for binary key-value pairs. SequenceFiles also work well as containers for smaller files as HDFS and MapReduce are optimized for large files.
Q. What is MapFile?
A MapFile is an indexed SequenceFile and it is used for look-ups by key.
Q. What is the core of the job in MapReduce framework?
Core of a job:
Mapper interface: map method
Reducer interface reduce method
Q. What are the steps involved in MapReduce framework?
· Firstly, the mapper input key/value pairs maps to a set of intermediate key/value pairs.
· Maps are the individual tasks that transform input records into intermediate records.
· The transformed intermediate records do not need to be of the same type as the input records.
· A given input pair maps to zero or many output pairs.
· The Hadoop MapReduce framework creates one map task for each InputSplit generated by the InputFormat for the job.
· It then calls map(WritableComparable, Writable, Context) for each key/value pair in the InputSplit for that task.
· All intermediate values associated with a given output key are grouped passed to the Reducers.
Q. Where is the Mapper Output stored?
The mapper output is stored on the Local file system of each individual mapper nodes. The intermediate data is cleaned up after the Hadoop Job completes.
Q. What is a partitioner and how the user can control which key will go to which reducer?
Partitioner controls the partitioning of the keys of the intermediate map-outputs by the default. The key to decide the partition uses hash function. Default partitioner is HashPartitioner.
A custom partitioner is implemented to control, which keys go to which Reducer.
public class SamplePartitioner extends Partitioner<Text, Text> {
@Override
public int getPartition(Text key, Text value, int numReduceTasks) {
}
}
Q. What are combiners and its purpose?
· Combiners are used to increase the efficiency of a MapReduce program. It can be used to aggregate intermediate map output locally on individual mapper outputs.
· Combiners can help reduce the amount of data that needs to be transferred across to the reducers.
· Reducer code as a combiner if the operation performed is commutative and associative.
· Hadoop may or may not execute a combiner.
Q. How number of partitioners and reducers are related?
The total numbers of partitions are the same as the number of reduce tasks for the job.
Q. What is IdentityMapper?
IdentityMapper implements the mapping inputs directly to output. IdentityMapper.class is used as a default value when JobConf.setMapperClass is not set.
Q. What is IdentityReducer?
In IdentityReducer no reduction is performed, writing all input values directly to the output. IdentityReducer.class is used as a default value when JobConf.setReducerClass is not set
Q. What is the reducer and its phases?
Reducer reduces a set of intermediate values, which has same key to a smaller set of values. The framework then calls reduce().
Syntax:
reduce(WritableComparable, Iterable<Writable>, Context) method for each <key, (list of values)> pair in the grouped inputs.
Reducer has three primary phases:
· Shuffle
· Sort
· Reduce
Q. How to set the number of reducers?
The number of reduces for the user sets the job:
· Job.setNumReduceTasks(int)
· -D mapreduce.job.reduces
Q. Detail description of the Reducer phases?
Shuffle:
Sorted output (Mapper) à Input (Reducer). Framework then fetches the relevant partition of the output of all the mappers.
Sort:
The framework groups Reducer inputs by keys. The shuffle and sort phases occur simultaneously; while map-outputs are being fetched they are merged.
Secondary Sort:
Grouping the intermediate keys are required to be different from those for grouping keys before reduction, then Job.setSortComparatorClass(Class).
Reduce:
reduce(WritableComparable, Iterable<Writable>, Context) method is called for each <key, (list of values)> pair in the grouped inputs.
The output of the reduce task is typically written using Context.write(WritableComparable, Writable).
Q. Can there be no Reducer?
Yes, the number of reducer can be zero if no reduction of values is required.
Q. What can be optimum value for Reducer?
Value of Reducers can be: 0.95
1.75 multiplied by (<number of nodes> * < number of maximum containers per node>)
Increasing number of reducers
· Increases the framework overhead
· Increases load balancing
· Lowers the cost of failures
Q. What is a Counter and its purpose?
Counter is a facility for MapReduce applications to report its statistics. They can be used to track job progress in a very easy and flexible manner. It is defined by MapReduce framework or by applications. Each Counter can be of any Enum type. Applications can define counters of type Enum and update them via counters.incrCounter in the map and/or reduce methods.
Q. Define different types of Counters?
Built in Counters:
· Map Reduce Task Counters
· Job Counters
Custom Java Counters:
MapReduce allows users to specify their own counters (using Java enums) for performing their own counting operation.
Q. Why Counter values are shared by all map and reduce tasks across the MapReduce framework?
Counters are global so shared across the MapReduce framework and aggregated at the end of the job across all the tasks.
Q. Explain speculative execution.
· Speculative execution is a way of dealing with individual machine’s performance. As there are lots of machines in the cluster, some machines can have low performance, which affects the performance of the whole job.
· Speculative execution in Hadoop can run multiple copies of the same map or reduce task on different task tracker nodes and the results from first node to finish are used.
Q. What is DistributedCache and its purpose?
DistributedCache is a facility provided by the MapReduce framework to cache files (text, archives, jars etc.) needed by applications. It distributes application-specific, large, read-only files efficiently. The user needs to use DistributedCache to distribute and symlink the script file.
Q. What is the Job interface in MapReduce framework?
Job is the primary interface for a user to describe a MapReduce job to the Hadoop framework for execution. Some basic parameters are configured for example:
· Job.setNumReduceTasks(int)
· Configuration.set(JobContext.NUM_MAPS, int)
· Mapper
· Combiner (if any)
· Partitioner
· Reducer
· InputFormat
· OutputFormat implementations
· setMapSpeculativeExecution(boolean))/ setReduceSpeculativeExecution(boolean))
· Maximum number of attempts per task (setMaxMapAttempts(int)/ setMaxReduceAttempts(int)) etc.
· DistributedCache for large amounts of (read-only) data.
Q. What is the default value of map and reduce max attempts?
Framework will try to execute a map task or reduce task by default 4 times before giving up on it.
Q. Explain InputFormat?
InputFormat describes the input-specification for a MapReduce job. The MapReduce framework depends on the InputFormat of the job to:
· Checks the input-specification of the job.
· It then splits the input file(s) into logical InputSplit instances, each of which is then assigned to an individual Mapper.
· To extract input records from the logical InputSplit for processing by the Mapper it provides the RecordReader implementation.
Default: TextInputFormat
Q. What is InputSplit and RecordReader?
· InputSplit specify the data to be processed by an individual Mapper.
· In general, InputSplit presents a byte-oriented view of the input.
Default: FileSplit
· RecordReader reads <key, value> pairs from an InputSplit, then processes them and presents record-oriented view
Q. Explain the Job OutputFormat?
OutputFormat describes details of the output for a MapReduce job.
The MapReduce framework depends on the OutputFormat of the job to:
· It checks the job output-specification
· To write the output files of the job in the <key, value> pairs, it provides the RecordWriter implementation.
Default: TextOutputFormat
Q. How is the option in Hadoop to skip the bad records?
Hadoop provides an option where a certain set of bad input records can be skipped when processing map inputs. This feature can be controlled through the SkipBadRecords class.
Q. Different ways of debugging a job in MapReduce?
· Add debug statement to log to standard error along with the message to update the task’s status message. Web UI makes it easier to view.
· Create a custom counter, it gives valuable information to deal with the problem dataset
· Task page and task detailed page
· Hadoop Logs
· MRUnit testing
PROGRAM 1: Counting the number of words in an input file
Introduction
This section describes how to get the word count of a sample input file.
Software Versions
The software versions used are:
VirtualBox: 4.3.20
CDH 5.3: Default MapReduce Version
hadoop-core-2.5.0
hadoop-yarn-common-2.5.0
Steps
1. Create the input file
Create the input.txt file with sample text.
$ vi input.txt
Thanks Lord Krishna for helping us write this book
Hare Krishna Hare Krishna Krishna Krishna Hare Hare
Hare Rama Hare Rama Rama Rama Hare Hare
2. Move the input file into HDFS
Use the –put or –copyFromLocal command to move the file into HDFS
$ hadoop fs -put input.txt
3. Code for the MapReduce program
Java files:
WordCountProgram.java // Driver Program
WordMapper.java // Mapper Program
WordReducer.java // Reducer Program
--------------------------------------------------
WordCountProgram.java File: Driver Program
--------------------------------------------------
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
public class WordCountProgram extends Configured implements Tool{
@Override
public int run(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = new Job(conf, "wordcountprogram");
job.setJarByClass(getClass());
// Configure output and input source
TextInputFormat.addInputPath(job, new Path(args[0]));
job.setInputFormatClass(TextInputFormat.class);
job.setMapperClass(WordMapper.class);
job.setReducerClass(WordReducer.class);
// Configure output
TextOutputFormat.setOutputPath(job, new Path(args[1]));
job.setOutputFormatClass(TextOutputFormat.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
return job.waitForCompletion(true) ? 0 : 1;
}
public static void main(String[] args) throws Exception {
int exitCode = ToolRunner.run(new WordCountProgram(), args);
System.exit(exitCode);
}
}
--------------------------------------------------
WordMapper.java File: Mapper Program
--------------------------------------------------
import java.io.IOException;
import java.util.StringTokenizer;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
public class WordMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
private final static IntWritable count = new IntWritable(1);
private final Text nameText = new Text();
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException,
InterruptedException {
StringTokenizer tokenizer = new StringTokenizer(value.toString()," ");
while (tokenizer.hasMoreTokens()) {
nameText.set(tokenizer.nextToken());
context.write(nameText, count);
}
}
}
--------------------------------------------------
WordReducer.java file: Reducer Program
--------------------------------------------------
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
public class WordReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
@Override
protected void reduce(Text t, Iterable<IntWritable> counts, Context context)
throws IOException, InterruptedException {
int sum = 0;
for (IntWritable count : counts) {
sum += count.get();
}
context.write(t, new IntWritable(sum));
}
}
4. Run the MapReduce program
Create the jar of the Code in Step 3 and use the following command to run the MapReduce program
$ hadoop jar WordCount.jar WordCountProgram input.txt output1
Here,
WordCount.jar: Name of jar exported having the all the methods.
WordCountProgram: Driver Program having the entire configuration
input.txt: Input file
output1: Output folder where the output file will be stored
5. View the Output
View the output in the output1 folder
$ hadoop fs -cat /user/cloudera/output1/part-r-00000
Hare 8
Krishna 5
Lord 1
Rama 4
Thanks 1
book 1
for 1
helping 1
this 1
us 1
write 1
PROGRAM 2: Simple Custom Partitioner Program
Introduction
We will use custom partitioning in MapReduce program to display the records in two age categories less than age 15 and greater than 15.In this program our data is partitioned into 2 partitions according to the age field in our data.
Software Versions
The software versions used are:
VirtualBox: 4.3.20
CDH 5.3: Default MapReduce Version
hadoop-core-2.5.0
hadoop-yarn-common-2.5.0
Steps
1. Create the input file
Create the grades.txt file where each record is Name,Age,marks text.
$ vi grades.txt
Shiva,10,90
Mahi,5,95
Shivi,15,99
Hari,10,100
Alex,20,85
2. Move the input file into HDFS
Use the –put or –copyFromLocal command to move the file into HDFS
$ hadoop fs -put input.txt
3. Code for the MapReduce program
Java files:
CustomParitionDriver.java //Driver program
PartitionMapper.java //Mapper program
MarksPartitioner.java //partition the data according to age
PartitionReducer.java //displaying data in 2 partitions
"You found the information helpful and want to say thanks? Your donation is enough to inspire us to do more. Thanks a bunch!"
0 comments:
Post a Comment
Note: Only a member of this blog may post a comment.