Thursday 10 December 2015

Faster way to count number of lines in a file/dir using Map Reduce Framework


In this site you can see one way to count number of lines in a file.
They are emitting count as one for each record in each map. So if 1 map holds 10,000 lines 10,000 values will be passed to reducer.If more than one mapper that many read-writes will happen.
Lets reduce the intermediate writes.

Below is an optimized way to count no of lines in a file/dir
Changes are done in
1. Mapper
Instead of emitting 'one' for each record, we increment line count in map and emit them in cleanup() phase.
public class LineCntMapper extends
  Mapper<LongWritable, Text, Text, IntWritable> {

 Text keyEmit = new Text("Total Lines");
 IntWritable valEmit = new IntWritable();
 int partialSum = 0;

 public void map(LongWritable key, Text value, Context context) {
  partialSum++;
 }

 public void cleanup(Context context) {
  valEmit.set(partialSum);
  try {
   context.write(keyEmit, valEmit);
  } catch (IOException e) {
   // TODO Auto-generated catch block
   e.printStackTrace();
   System.exit(0);
  } catch (InterruptedException e) {
   // TODO Auto-generated catch block
   e.printStackTrace();
   System.exit(0);
  }
 }
}
So if we have 5 map tasks we will only emit 5 intermediate key-value pair.

2. Driver
In Driver we will include a combiner also
job.setMapperClass(LineCntMapper.class);
job.setCombinerClass(LineCntReducer.class);
job.setReducerClass(LineCntReducer.class);
Combiner doesnt do nothing more than Reducer. we can use reducer as combiner itself.
Reducer doesnt need any change.

If you run this code you will get the results faster than the previous mentioned code in this site .

Working code is here

Happy Hadooping........

DoubleArrayWritable in Hadoop


Lets see how to emit double arrays from mapper and process them in reducer

DoubleArrayWritable class
public static class DoubleArrayWritable extends ArrayWritable {
 public DoubleArrayWritable() {
  super(DoubleWritable.class);
 }
}

Driver()
job.setMapOutputKeyClass(IntWritable.class);
job.setMapOutputValueClass(DoubleArrayWritable.class);

job.setOutputKeyClass(NullWritable.class);
job.setOutputValueClass(DoubleArrayWritable.class);

map()
import mywritable.DoubleArrayWritable;

public class MyMapper extends
Mapper<Object, Text, IntWritable, DoubleArrayWritable> {

 public void map(Object key, Text value, Context context)
 {
  //Do something............
  double[] arr = new double[size];

  DoubleArrayWritable arrWritable = new DoubleArrayWritable();
  DoubleWritable[] data = new DoubleWritable[size];
  for (int k = 0; k < size; k++) {
   data[k] = new DoubleWritable(arr[k]);
  }
  arrWritable.set(data);

   context.write(mykey, arrWritable);
 }
}

reduce()
import mywritable.DoubleArrayWritable;

public class MyReducer  extends
Reducer<IntWritable, DoubleArrayWritable, NullWritable, Text> {

 public void reduce(IntWritable key,
   Iterable<DoubleArrayWritable> values, Context context){

  double[] sum = new double[size];
  for (DoubleArrayWritable c : values) {
    Writable[] temp = new DoubleWritable[size];
    temp = (c.get());
    for (int i = 0; i < size; i++) {
         sum[i] += Double.parseDouble(temp[i].toString());
    }

   //Do something and emit values ..................

   context.write(out, new Text(emit));
  }
 }
}

Happy Hadooping.......

Wednesday 9 December 2015

Partitioning Data Using Hadoop MultipleOutputs

There may be cases where we need to partition our data based on certion condition.
Say for example, Consider this Employee data
EmpId,EmpName,Age,Gender,Salary
1201,gopal,45,Male,50000
1202,manisha,40,Female,51000
1203,khaleel,34,Male,30000
1204,prasanth,30,Male,31000
1205,kiran,20,Male,40000
1206,laxmi,25,Female,35000
1207,bhavya,20,Female,15000
1208,reshma,19,Female,14000
1209,kranthi,22,Male,22000
1210,Satish,24,Male,25000
1211,Krishna,25,Male,26000
1212,Arshad,28,Male,20000
1213,lavanya,18,Female,8000
Lets assume one condition.
We need to seperate above data based on Gender (there can be more scenarios)
Expected outcome will be like this 
Female

1213,lavanya,18,Female,8000
1202,manisha,40,Female,51000
1206,laxmi,25,Female,35000
1207,bhavya,20,Female,15000
1208,reshma,19,Female,14000
Male

1211,Krishna,25,Male,26000
1212,Arshad,28,Male,20000
1201,gopal,45,Male,50000
1209,kranthi,22,Male,22000
1210,Satish,24,Male,25000
1203,khaleel,34,Male,30000
1204,prasanth,30,Male,31000
1205,kiran,20,Male,40000
This can be achieved by using MultipleOutputs in Hadoop.
The name itself gives an idea on what MultipleOutputs is - writes output data to multiple outputs

Lets see how to implement this
Driver Class
public class PartitionerDriver extends Configured implements Tool {

 /**
  * @param args
  * @throws Exception
  */
 public static void main(String[] args) {
  // TODO Auto-generated method stub
  Configuration conf = new Configuration();
  try {
   int res = ToolRunner.run(conf, new PartitionerDriver(), args);
  } catch (Exception e) {
   // TODO Auto-generated catch block
   e.printStackTrace();
  }
 }

 public int run(String[] args) {
  // TODO Auto-generated method stub
  System.out.println("Partitioning File Based on Gender...........");
  if (args.length != 3) {
   System.err
     .println("Usage:File Partitioner <input> <output> <delimiter> ");
   System.exit(0);
  }
  Configuration conf = new Configuration();
  /*
   * Arguments
   */
  String source = args[0];
  String dest = args[1];
  String delimiter = args[2];
  
  //conf objects
  conf.set("delimiter", delimiter);
  
  FileSystem fs = null;
  try {
   fs = FileSystem.get(conf);
  } catch (IOException e) {
   // TODO Auto-generated catch block
   e.printStackTrace();
  }
  
  Path in = new Path(source);
  Path out = new Path(dest);
  
  Job job0 = null;
  try {
   job0 = new Job(conf, "Partition Records");
  } catch (IOException e) {
   // TODO Auto-generated catch block
   e.printStackTrace();
  }
  job0.setJarByClass(PartitionerDriver.class);
  job0.setMapperClass(PartitionMapper.class);
  job0.setReducerClass(PartitionReducer.class);
  job0.setMapOutputKeyClass(Text.class);
  job0.setMapOutputValueClass(Text.class);
  job0.setOutputKeyClass(Text.class);
  job0.setOutputValueClass(Text.class);
  try {
   TextInputFormat.addInputPath(job0, in);
  } catch (IOException e) {
   // TODO Auto-generated catch block
   e.printStackTrace();
  }
  /*
   * Delete output dir if exist
   */
  try {
   if (fs.exists(out)) {
    fs.delete(out, true);
   }
  } catch (IOException e) {
   // TODO Auto-generated catch block
   e.printStackTrace();
  }
  
  TextOutputFormat.setOutputPath(job0, out);
  try {
   job0.waitForCompletion(true);
  } catch (ClassNotFoundException e) {
   // TODO Auto-generated catch block
   e.printStackTrace();
  } catch (IOException e) {
   // TODO Auto-generated catch block
   e.printStackTrace();
  } catch (InterruptedException e) {
   // TODO Auto-generated catch block
   e.printStackTrace();
  }
  
  System.out.println("Successfully partitioned Data based on Gender!");
  return 0;
 }
}


Mapper Class
Mapper class gets each record and split them using delimiter.
Key will be Gender and value will be the record

protected void map(LongWritable key, Text value, Context context) {

  Configuration conf = context.getConfiguration();
  String delim = conf.get("delimiter");
  String line = value.toString();
  
  String[] record = line.split(delim);
  keyEmit.set(record[3]);
   try {
    context.write(keyEmit, value);
   } catch (IOException e) {
    // TODO Auto-generated catch block
    e.printStackTrace();
   } catch (InterruptedException e) {
    // TODO Auto-generated catch block
    e.printStackTrace();
  }
 }

Reducer Class
MultipleOutputs<NullWritable, Text> mos;
 NullWritable out = NullWritable.get();

 @Override
 protected void setup(Context context) {
  mos = new MultipleOutputs(context);
 }

 public void reduce(Text key, Iterable<Text> values, Context context) {
  for (Text value : values) {
   try {
    mos.write(out, value, key.toString());
   } catch (IOException e) {
    // TODO Auto-generated catch block
    e.printStackTrace();
   } catch (InterruptedException e) {
    // TODO Auto-generated catch block
    e.printStackTrace();
   }
  }
 }

 @Override
 protected void cleanup(Context context) {
  try {
   mos.close();
  } catch (IOException e) {
   // TODO Auto-generated catch block
   e.printStackTrace();
  } catch (InterruptedException e) {
   // TODO Auto-generated catch block
   e.printStackTrace();
  }
 }

Here we define MultipleOutput as
MultipleOutputs<NullWritable, Text> mos;
Our file doesnot need a key.We are only interested to get the data. So key will be NullWritable and Value will be Text.
We have a setup() method where we initialize out MultipleOutput.
mos = new MultipleOutputs(context);
Lets see reduce()
As we know reducer aggregates values based on key. Key from mapper was Gender and we have 2 genders Male and female . so we will recieve 2 keys follwed by its value.
for (Text value : values) {
 mos.write(out, value, key.toString());
}
In MultipleOutputs we have 3 arguments
1. key
2. value and 
3. File Name.
Here our key is NullWritable ,Value will be the record for each key. And we named our file with key.

Once you run this code. You can see an output as below


Two files 
1. Female-r-00000 
2. Male-r-00000 
Lets see the contents



You can find the code here . 

Happy Hadooping..............