How to run hadoop pipes with C++ program? - Stack Overflow

admin2025-05-01  1

I am trying to implement a MapReducer using hadoop for calculating total revenue over a number of csv files, I started from example file i found on github which is just a word counting program.

#include <string>

#include  "stdint.h"  // <--- to prevent uint64_t errors!

#include "Pipes.hh"
#include "TemplateFactory.hh"
#include "StringUtils.hh"

class WordCountMapper : public HadoopPipes::Mapper {
 public:
  // Constructor: does nothing
  WordCountMapper(HadoopPipes::TaskContext& context) {}
  // Map function: Receives a line, outputs (word,"1") to reducer.
  void map(HadoopPipes::MapContext& context) {
    // Get line of text
    std::string line = context.getInputValue();
    // Split into words
    std::vector<std::string> words = HadoopUtils::splitString(line, " ");
    // Emit each word tuple (word, "1" )
    for (unsigned int i = 0; i < words.size(); i++) {
      context.emit(words[i], HadoopUtils::toString(1));
    }
  }
};

class WordCountReducer : public HadoopPipes::Reducer {
 public:
  // Constructor: does nothing
  WordCountReducer(HadoopPipes::TaskContext& context) {}
  // Reduce function
  void reduce(HadoopPipes::ReduceContext& context) {
    int count = 0;
    // Get all tuples with the same key, and count their numbers
    while (context.nextValue()) {
      count += HadoopUtils::toInt(context.getInputValue());
    }
    // Emit (word, count)
    context.emit(context.getInputKey(), HadoopUtils::toString(count));
  }
};

int main(int argc, char *argv[]) {
  return HadoopPipes::runTask(
    HadoopPipes::TemplateFactory<WordCountMapper, WordCountReducer>()
  );
}

After it compiled without any issues I tried to run it using this:

bin/mapred pipes -D hadoop.pipes.java.recordreader=true \
             -D hadoop.pipes.java.recordwriter=true \
             -input input/wordcount/sotu_2015.txt   \
             -output output                         \
             -program input/wordcount/wordcount

But all i get is:

...
java.lang.Exception: java.lang.NullPointerException
    at org.apache.hadoop.mapred.LocalJobRunner$Job.runTasks(LocalJobRunner.java:492)
    at org.apache.hadoop.mapred.LocalJobRunner$Job.run(LocalJobRunner.java:552)
Caused by: java.lang.NullPointerException
    at org.apache.hadoop.mapred.pipes.Application.<init>(Application.java:109)
    at org.apache.hadoop.mapred.pipes.PipesMapRunner.run(PipesMapRunner.java:72)
    at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:466)
    at org.apache.hadoop.mapred.MapTask.run(MapTask.java:350)
    at org.apache.hadoop.mapred.LocalJobRunner$Job$MapTaskRunnable.run(LocalJobRunner.java:271)
    at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
    at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
    at java.base/java.lang.Thread.run(Thread.java:829)
2025-01-02 23:31:20,556 INFO mapreduce.Job: Job job_local830471764_0001 running in uber mode : false
2025-01-02 23:31:20,560 INFO mapreduce.Job:  map 0% reduce 0%
2025-01-02 23:31:20,564 INFO mapreduce.Job: Job job_local830471764_0001 failed with state FAILED due to: NA
2025-01-02 23:31:20,567 INFO mapreduce.Job: Counters: 0
Exception in thread "main" java.io.IOException: Job failed!
    at org.apache.hadoop.mapred.JobClient.runJob(JobClient.java:875)
    at org.apache.hadoop.mapred.pipes.Submitter.runJob(Submitter.java:264)
    at org.apache.hadoop.mapred.pipes.Submitter.run(Submitter.java:505)
    at org.apache.hadoop.mapred.pipes.Submitter.main(Submitter.java:520)

I have not been able to find anything related to this error (specifically C++).I am not using docker or anything, i untared hadoop and working inside this directory, what is the correct way to run this code?

转载请注明原文地址:http://anycun.com/QandA/1746097456a91630.html