Fundamentals of MapReduce (New to MapReduce?)

So people have been asking me to give some details on MapReduce concept. This is a very interesting topic to write about. If you have read my previous post, you would have seen my introduction to Big Data and Hadoop. Now I am going to talk about MapReduce has the heart of Hadoop. Some of you might be new to this, but do not worry, it is going to be described in a way you will quickly understand. To Java developers, it might be much easier, but if you do not have experience in java skills, you can still learn some basic java and master MapReduce.

MapReduce is a programming framework that allows performance of distributed and parallel processing on large data sets in a distributed environment. I am talking massive scalability across hundreds or thousands of servers in a Hadoop cluster. Just imagine that for a second.

If you see in the diagram above, we have the “Input, Map task, Reduce task and Output”. MapReduce consist of major two tasks; Map task and the Reduce task. Just like the name "MapReduce.". They make use of what is called key-value pairs. The Mapper or map job (key-value pair) is input to the Reducer. The reducer receives the key-value pair from multiple map jobs and then, the reducer aggregates those intermediate Data tuples (intermediate key-value pair) into a smaller set of tuples (key-value pairs) which is the final output as represented in the above diagram.

In the MapReduce program or code, we have one more thing called the Driver. In the driver phase; we set the configuration of our MapReduce job to run in Hadoop. We specify the name of the job, data type of input/output of the mapper and reducer, path of the input and output folder, names of the mapper and reducer classes.

Let me explain a little further the logic behind the 'key-value pair.' For example: We are going to use the common word count example. Imagine a file with the following detail: "Map task and the Reduce task".

MapReduce word count example:

Input/Source -> "The Map task and the Reduce task"
Mapper-> (The, 1) (Map, 1) (task, 1) (and, 1) (the, 1) (Reduce, 1) (task, 1)
Reducer-> (The, 2) (Map, 1) (task, 2) (and, 1) (Reduce, 1)
Output/Destination->
The = 2
Map = 1
task = 2
and = 1
Reduce = 1

MapReduce code for word count:

import java.io.IOException;
import java.util.StringTokenizer;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.fs.Path;

public class WordCount
{

//Mapper code:
public static class Map extends Mapper {
public void map(LongWritable key, Text value,Context context) throws IOException,InterruptedException{
String line = value.toString();
StringTokenizer tokenizer = new StringTokenizer(line);
while (tokenizer.hasMoreTokens()) {
value.set(tokenizer.nextToken());
context.write(value, new IntWritable(1));
}
}
}
//Reducer Code:
public static class Reduce extends Reducer {
public void reduce(Text key, Iterable values,Context context) throws IOException,InterruptedException {
int sum=0;
for(IntWritable x: values)
{
sum+=x.get();
}
context.write(key, new IntWritable(sum));
}
}
public static void main(String[] args) throws Exception {
//Driver Code:
Configuration conf= new Configuration();
Job job = new Job(conf,"My Word Count Program");
job.setJarByClass(WordCount.class);
job.setMapperClass(Map.class);
job.setReducerClass(Reduce.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
Path outputPath = new Path(args[1]);

//Configuring the input/output path from the filesystem into the job
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));

//Deleting the output path automatically from hdfs so that we don't have to delete it explicitly
outputPath.getFileSystem(conf).delete(outputPath);

//Exiting the job only if the flag value becomes false
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}

The command for running a MapReduce code is:

hadoop jar hadoop-mapreduce-test.jar WordCount /sample/input /sample/output

Comments

  1. I had completely gothrow your post and it is full of innovative information.thanks for sharing this.
    hire ASP.NET developer

    ReplyDelete

Post a Comment

Popular posts from this blog

AWS: Benefits of using Amazon S3

Google To Acquire Looker For $2.6 Billion

Python - GUI - Tkinter(Bar & Pie Chart)