I am trying to implement a MapReducer using hadoop for calculating total revenue over a number of csv files, I started from example file i found on github which is just a word counting program.
#include <string>
#include "stdint.h" // <--- to prevent uint64_t errors!
#include "Pipes.hh"
#include "TemplateFactory.hh"
#include "StringUtils.hh"
class WordCountMapper : public HadoopPipes::Mapper {
public:
// Constructor: does nothing
WordCountMapper(HadoopPipes::TaskContext& context) {}
// Map function: Receives a line, outputs (word,"1") to reducer.
void map(HadoopPipes::MapContext& context) {
// Get line of text
std::string line = context.getInputValue();
// Split into words
std::vector<std::string> words = HadoopUtils::splitString(line, " ");
// Emit each word tuple (word, "1" )
for (unsigned int i = 0; i < words.size(); i++) {
context.emit(words[i], HadoopUtils::toString(1));
}
}
};
class WordCountReducer : public HadoopPipes::Reducer {
public:
// Constructor: does nothing
WordCountReducer(HadoopPipes::TaskContext& context) {}
// Reduce function
void reduce(HadoopPipes::ReduceContext& context) {
int count = 0;
// Get all tuples with the same key, and count their numbers
while (context.nextValue()) {
count += HadoopUtils::toInt(context.getInputValue());
}
// Emit (word, count)
context.emit(context.getInputKey(), HadoopUtils::toString(count));
}
};
int main(int argc, char *argv[]) {
return HadoopPipes::runTask(
HadoopPipes::TemplateFactory<WordCountMapper, WordCountReducer>()
);
}
After it compiled without any issues I tried to run it using this:
bin/mapred pipes -D hadoop.pipes.java.recordreader=true \
-D hadoop.pipes.java.recordwriter=true \
-input input/wordcount/sotu_2015.txt \
-output output \
-program input/wordcount/wordcount
But all i get is:
...
java.lang.Exception: java.lang.NullPointerException
at org.apache.hadoop.mapred.LocalJobRunner$Job.runTasks(LocalJobRunner.java:492)
at org.apache.hadoop.mapred.LocalJobRunner$Job.run(LocalJobRunner.java:552)
Caused by: java.lang.NullPointerException
at org.apache.hadoop.mapred.pipes.Application.<init>(Application.java:109)
at org.apache.hadoop.mapred.pipes.PipesMapRunner.run(PipesMapRunner.java:72)
at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:466)
at org.apache.hadoop.mapred.MapTask.run(MapTask.java:350)
at org.apache.hadoop.mapred.LocalJobRunner$Job$MapTaskRunnable.run(LocalJobRunner.java:271)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:829)
2025-01-02 23:31:20,556 INFO mapreduce.Job: Job job_local830471764_0001 running in uber mode : false
2025-01-02 23:31:20,560 INFO mapreduce.Job: map 0% reduce 0%
2025-01-02 23:31:20,564 INFO mapreduce.Job: Job job_local830471764_0001 failed with state FAILED due to: NA
2025-01-02 23:31:20,567 INFO mapreduce.Job: Counters: 0
Exception in thread "main" java.io.IOException: Job failed!
at org.apache.hadoop.mapred.JobClient.runJob(JobClient.java:875)
at org.apache.hadoop.mapred.pipes.Submitter.runJob(Submitter.java:264)
at org.apache.hadoop.mapred.pipes.Submitter.run(Submitter.java:505)
at org.apache.hadoop.mapred.pipes.Submitter.main(Submitter.java:520)
I have not been able to find anything related to this error (specifically C++).I am not using docker or anything, i untared hadoop and working inside this directory, what is the correct way to run this code?