Tuesday, 22 April 2014

Chaining Jobs in Hadoop MapReduce


There are cases where we need to write more than one MapReduce Job.
Map1--Reduce1--Map2--Reduce2
How do you manage the jobs so they are executed in order? There are several approaches, Here is an approach to easily chain jobs together by writing multiple driver methods, one for each job:

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

/**
 * @author Unmesha SreeVeni U.B
 * 
 */
public class ChainJobs extends Configured implements Tool {

 private static final String OUTPUT_PATH = "intermediate_output";

 @Override
 public int run(String[] args) throws Exception {
  /*
   * Job 1
   */
  Configuration conf = getConf();
  FileSystem fs = FileSystem.get(conf);
  Job job = new Job(conf, "Job1");
  job.setJarByClass(ChainJobs.class);

  job.setMapperClass(MyMapper1.class);
  job.setReducerClass(MyReducer1.class);

  job.setOutputKeyClass(Text.class);
  job.setOutputValueClass(IntWritable.class);

  job.setInputFormatClass(TextInputFormat.class);
  job.setOutputFormatClass(TextOutputFormat.class);

  TextInputFormat.addInputPath(job, new Path(args[0]));
  TextOutputFormat.setOutputPath(job, new Path(OUTPUT_PATH));

  job.waitForCompletion(true);

  /*
   * Job 2
   */
  
  Job job2 = new Job(conf, "Job 2");
  job2.setJarByClass(ChainJobs.class);

  job2.setMapperClass(MyMapper2.class);
  job2.setReducerClass(MyReducer2.class);

  job2.setOutputKeyClass(Text.class);
  job2.setOutputValueClass(Text.class);

  job2.setInputFormatClass(TextInputFormat.class);
  job2.setOutputFormatClass(TextOutputFormat.class);

  TextInputFormat.addInputPath(job2, new Path(OUTPUT_PATH));
  TextOutputFormat.setOutputPath(job2, new Path(args[1]));

  return job2.waitForCompletion(true) ? 0 : 1;
 }

 /**
  * Method Name: main Return type: none Purpose:Read the arguments from
  * command line and run the Job till completion
  * 
  */
 public static void main(String[] args) throws Exception {
  // TODO Auto-generated method stub
  if (args.length != 2) {
   System.err.println("Enter valid number of arguments <Inputdirectory>  <Outputlocation>");
   System.exit(0);
  }
  ToolRunner.run(new Configuration(), new ChainJobs(), args);
 }
}

The above code has 2 jobs named job1 and job2
private static final String OUTPUT_PATH = "intermediate_output";
String "OUTPUT_PATH" is used to write the output for first job.
TextInputFormat.addInputPath(job, new Path(args[0]));
TextOutputFormat.setOutputPath(job, new Path(OUTPUT_PATH));
So in first job our input will be args[0] and output will be new Path(OUTPUT_PATH).

First Job Configuration


  /*
   * Job 1
   */
  Configuration conf = getConf();
  FileSystem fs = FileSystem.get(conf);
  Job job = new Job(conf, "Job1");
  job.setJarByClass(ChainJobs1.class);

  job.setMapperClass(MyMapper1.class);
  job.setReducerClass(MyReducer1.class);

  job.setOutputKeyClass(Text.class);
  job.setOutputValueClass(IntWritable.class);

  job.setInputFormatClass(TextInputFormat.class);
  job.setOutputFormatClass(TextOutputFormat.class);

  TextInputFormat.addInputPath(job, new Path(args[0]));
  TextOutputFormat.setOutputPath(job, new Path(OUTPUT_PATH));

  job.waitForCompletion(true);

Once the first job has executed successfully  "OUTPUT_PATH" is served as the input to second job and the output of job2 is written to args[1].
TextInputFormat.addInputPath(job2, new Path(OUTPUT_PATH));
TextOutputFormat.setOutputPath(job2, new Path(args[1]));

Second Job Configuration

  /*
   * Job 2
   */
 
  Job job2 = new Job(conf, "Job 2");
  job2.setJarByClass(ChainJobs1.class);

  job2.setMapperClass(MyMapper2.class);
  job2.setReducerClass(MyReducer2.class);

  job2.setOutputKeyClass(Text.class);
  job2.setOutputValueClass(Text.class);

  job2.setInputFormatClass(TextInputFormat.class);
  job2.setOutputFormatClass(TextOutputFormat.class);

  TextInputFormat.addInputPath(job2, new Path(OUTPUT_PATH));
  TextOutputFormat.setOutputPath(job2, new Path(args[1]));

  return job2.waitForCompletion(true) ? 0 : 1;

Happy Hadooping . . .

21 comments:

  1. Where is the code for ChainJobs1.java and ChainJobs2.java?

    ReplyDelete
    Replies
    1. There is no ChainJobs2.java. Apologies for confusing and Thanks for pointing out. I updated the post.

      Delete
  2. Sorry I am new at Hadoop. Could you please give some examples on how to read the file from map/ reduce function? Do you just do fs.open(), or is there any build in magic from TextInputFormat.addInputPath()?
    Thanks!

    ReplyDelete
    Replies
    1. You can read files in MapReduce job using TextInputFormat. Supply your file in TextInputFormat and read them in map function. You can also read files from Distributed cache in setup function.
      Let me know if you have further doubts.

      Delete
    2. Thank you very much!

      Delete
  3. Thank you very much for such a helpful post..
    Keep posting such stuffs in Hadoop.
    Nishit

    ReplyDelete
  4. The second job doesnt seem to run for me.. THe mapper setup runs but not the map function within the second mapper. Is it because of format issues. Coz otherwise there doesnt seem to be anything wrong in my program

    ReplyDelete
    Replies
    1. could u please share ur code? Or else you can ping me in unmeshabiju@gmail.com

      Delete
  5. Hi,

    I am running a hadoop chainjobs. While running it with low data sets(i.e. 10-20 files) it is working perfectly but while running with more than 30 files after the first job the second job gets an error connection refuse. Already tried 2 times something like that. Can you please let me know why I am facing this issue. I have also gone with adddepending job but with that the output path for the job2 is not getting validated.

    Thanks,
    Shuvankar

    ReplyDelete
  6. Hi unmesha sreeveni, great post! you saved me! :D
    I found some errors, like fileNotFoundException. and i solved it adding "/part-r-00000" (the name of the outputfile)

    I my application i am trying to do the GIM-V algorithm that basicly is multiply a matrix by a vector, and again by the vector result and again and so on.

    finally i did a cycle for all the new jobs, something like this, check.

    Configuration conf = getConf();
    Job job = new Job(conf, "matrix-multiply-vector");
    // See Amareshwari Sri Ramadasu's comment in this thread...
    // http://lucene.472066.n3.nabble.com/Distributed-Cache-with-New-API-td722187.html
    // you need to do job.getConfiguration() instead of conf.
    DistributedCache.addCacheFile(new Path(args[1]).toUri(),
    job.getConfiguration());
    job.setJarByClass(MatrixMultiplyVector.class);

    job.setMapperClass(Mapper1.class);
    job.setReducerClass(Reducer1.class);

    job.setMapOutputKeyClass(LongWritable.class);
    job.setMapOutputValueClass(DoubleWritable.class);

    job.setInputFormatClass(TextInputFormat.class);
    //setoutputFormat...

    FileInputFormat.addInputPath(job, new Path(args[0]));
    FileOutputFormat.setOutputPath(job, new Path(args[2]));

    boolean succ = job.waitForCompletion(true);
    int nroRepeticiones =Integer.parseInt(args[3]);
    String salida = args[2];
    String nuevaSalida=salida;
    for(int i=1;i<nroRepeticiones;i++){
    Configuration conf2 = new Configuration();
    Job job2 = new Job(conf2, "ENCADENADOJOB");
    // See Amareshwari Sri Ramadasu's comment in this thread...
    // http://lucene.472066.n3.nabble.com/Distributed-Cache-with-New-API-td722187.html
    // you need to do job.getConfiguration() instead of conf.
    DistributedCache.addCacheFile(new Path(nuevaSalida+"/part-r-00000").toUri(),
    job2.getConfiguration());
    job2.setJarByClass(MatrixMultiplyVector.class);

    job2.setMapperClass(Mapper1.class);
    job2.setReducerClass(Reducer1.class);

    job2.setMapOutputKeyClass(LongWritable.class);
    job2.setMapOutputValueClass(DoubleWritable.class);

    job2.setInputFormatClass(TextInputFormat.class);
    //setoutputFormat...
    nuevaSalida = salida+"-"+String.valueOf(i);

    FileInputFormat.addInputPath(job2, new Path(args[0]));
    FileOutputFormat.setOutputPath(job2, new Path(nuevaSalida));
    System.out.println("-----iteracion:"+i);
    succ = job2.waitForCompletion(true);
    }
    return 5;

    Thank you again :D

    ReplyDelete
    Replies
    1. Thanks.
      Yes for distributed cache you need to mention the part file aswell, but if you are writing a MR job you need to only specify the folder.

      Delete
  7. Nice work Unmesha. I will try out the code, meanwhile I have few question.
    1. As the OUTPUT_PATH is intermediate output, where does it store, HDFS or Local Disk (Like mappers).
    2. Does it persist or gets deleted after job finishes. If it persists can we see the file contents (will it be serialized)

    ReplyDelete
    Replies
    1. The intermediate output is written into HDFS only , that is how you can use the output path of the first job as the input for the next

      Delete
  8. Thanks for the blog its really helpful.The chaining job is very interesting one.Thanks for the nice blog.Besant Technologies Reviews | Besant Technologies Reviews

    ReplyDelete
  9. For latest and updated Cloudera certification dumps in PDF format contact us at completeexamcollection@gmail.com.
    Refer our blog for more details http://completeexamcollection.blogspot.in/2015/04/cloudera-hadoop-certification-dumps.html

    ReplyDelete
  10. Nice example. But if I need to chain n jobs where n is not predefined, then what should be done? Let's say for an iterative algorithm that terminates only when certain conditions are met.

    ReplyDelete
  11. I am using the same example but when it is executing second job. It is saying input file not found. Also output file not getting created after first job executed successfully.

    xception in thread "main" org.apache.hadoop.mapreduce.lib.input.InvalidInputException: Input path does not exist: hdfs://localhost:54310/user/output1232
    at org.apache.hadoop.mapreduce.lib.input.FileInputFormat.singleThreadedListStatus(FileInputFormat.java:321)
    at org.apache.hadoop.mapreduce.lib.input.FileInputFormat.listStatus(FileInputFormat.java:264)
    at org.apache.hadoop.mapreduce.lib.input.FileInputFormat.getSplits(FileInputFormat.java:385)
    at org.apache.hadoop.mapreduce.lib.input.DelegatingInputFormat.getSplits(DelegatingInputFormat.java:115)
    at org.apache.hadoop.mapreduce.JobSubmitter.writeNewSplits(JobSubmitter.java:597)
    at org.apache.hadoop.mapreduce.JobSubmitter.writeSplits(JobSubmitter.java:614)
    at org.apache.hadoop.mapreduce.JobSubmitter.submitJobInternal(JobSubmitter.java:492)
    at org.apache.hadoop.mapreduce.Job$10.run(Job.java:1296)
    at org.apache.hadoop.mapreduce.Job$10.run(Job.java:1293)
    at java.security.AccessController.doPrivileged(Native Method)
    at javax.security.auth.Subject.doAs(Subject.java:422)
    at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1628)
    at org.apache.hadoop.mapreduce.Job.submit(Job.java:1293)
    at org.apache.hadoop.mapreduce.Job.waitForCompletion(Job.java:1314)
    at com.hadoop.intellipaat.JoinClickImpressionDetailJob.run(JoinClickImpressionDetailJob.java:418)
    at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:70)
    at com.hadoop.intellipaat.JoinClickImpressionDetailJob.main(JoinClickImpressionDetailJob.java:422)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.apache.hadoop.util.RunJar.run(RunJar.java:221)
    at org.apache.hadoop.util.RunJar.main(RunJar.java:136)

    ReplyDelete
    Replies
    1. Can u post your driver class code snippet?

      Delete