How Do You Run a MapReduce Job (with Python)?

Problem scenario
You have studied what a MapReduce job is conceptually.  But you want to try it out.  You have a Linux server with the Hadoop namenode (aka master node) installed on it.  How do you run a MapReduce job [to understand what it is all about]?

Solution
This is just an example.  We tried to make this as simple as possible.  This assumes that Python has been installed on the Hadoop namenode (aka master node) running on Linux.

1.  Deploy Hadoop if it has not been installed.  See this link to deploy Hadoop to a single server.  If you want to set up a multi-node cluster of Hadoop, see these directions. These directions can work with any distribution of Linux, but we found the with CentOS, it was unreliable. (With CentOS we saw ResourceManager stopping inexplicably, excessive CPU consumption, or some other undefined problem that makes the MR jobs take too long. We do not know why because CentOS is a great distribution of Linux.)

2.  Log into the Hadoop namenode (aka master), and then run this Bash program (to get files ready and prepare two Python programs).

#!/bin/bash
mkdir /tmp/contint/

curl http://www.gutenberg.org/files/2701/2701.txt > /tmp/contint/2701.txt
curl http://www.gutenberg.org/files/2694/2694.txt > /tmp/contint/2694.txt
curl https://www.gutenberg.org/files/74/74-0.txt > /tmp/contint/74-0.txt

echo "#!/usr/bin/env python3
# This file is mapper.py
# This was taken from https://www.michael-noll.com/tutorials/writing-an-hadoop-mapreduce-program-in-python/
# There were slight modifications to it.

import sys

# input comes from STDIN (standard input)
for line in sys.stdin:
        # remove leading and trailing whitespace
        line = line.strip()
        # split the line into words
        words = line.split()
        # increase counters
        for word in words:
                # write the results to STDOUT (standard output);
                # what we output here will be the input for the
                # Reduce step, i.e. the input for reducer.py
                #
                # tab-delimited; the trivial word count is 1
                print ('%s\t%s' % (word, 1))
" > /tmp/mapper.py

chmod +x /tmp/mapper.py

echo "#!/usr/bin/env python3
# This file is reducer.py
# This was mostly taken from https://www.michael-noll.com/tutorials/writing-an-hadoop-mapreduce-program-in-python/
from operator import itemgetter
import sys

current_word = None
current_count = 0
word = None #this is a test

# input comes from STDIN
for line in sys.stdin:
        # remove leading and trailing whitespace
        line = line.strip()

        # parse the input we got from mapper.py
        word, count = line.split('\t', 1)

        # convert count (currently a string) to int
        try:
                count = int(count)
        except ValueError:
                # count was not a number, so silently
                # ignore/discard this line
                continue

                # this IF-switch only works because Hadoop sorts map output
                # by key (here: word) before it is passed to the reducer
        if current_word == word:
                current_count += count
        else:
                if current_word:
                        # write result to STDOUT
                        print('%s\t%s' % (current_word, current_count))
                        current_count = count
                        current_word = word

# do not forget to output the last word if needed!
if current_word == word:
        print('%s\t%s' % (current_word, current_count))
" > /tmp/reducer.py

chmod +x /tmp/reducer.py

echo "Script finished."
echo "***********************************************************************************"
echo ""
# Scripting the below is problematic on Ubuntu/Debian if the script is run via a "sudo" command.
# Plus the below is just an example. Some people may want to call the directory something else.
echo " Run these commands (or commands like these) to prepare a directory to run the hadoop commands:

su hduser
cd ~
mkdir placetowork
cp /tmp/reducer.py placetowork/
cp /tmp/mapper.py placetowork/
chown -R hduser:hadoop placetowork

# Then run these commands manually as the hduser in the placetowork subdirectory:
bash /usr/local/hadoop/sbin/start-dfs.sh
bash /usr/local/hadoop/sbin/start-yarn.sh
# run the above two commands again just in case. (sometimes it is necessary; you could use "jps" to verify if they need to be run once again)
bash /usr/local/hadoop/sbin/start-dfs.sh
bash /usr/local/hadoop/sbin/start-yarn.sh

hdfs dfs -mkdir -p /usr/hduser/contint
hdfs dfs -copyFromLocal /tmp/contint/* /usr/hduser/

# The command below maybe needs to use /home/hduser instead of the /user/hduser
hadoop jar /usr/local/hadoop/share/hadoop/tools/lib/hadoop-streaming-3.3.0.jar -file /tmp/mapper.py -mapper /tmp/mapper.py -file /tmp/reducer.py -reducer /tmp/reducer.py -input /usr/hduser/* -output /usr/hduser/contint-output -verbose
*************************************************************************************
"

3.  Find the hadoop-streaming*.jar file.  Run this command if you need to find it:

sudo find / -name hadoop-streaming-*.jar

4.  Optional step.  Run this command after your replace "/user/hduser" with the directory in your hdfs that you will write to:   hdfs dfs -ls /user/hduser

# This just tests you are able to use hdfs with the user you are logged in as.

5.  Run the command below after you replace "/usr/local/hadoop/share/hadoop/tools/lib/hadoop-streaming-3.3.0.jar" with the path and file name of the hadoop-streaming*.jar file you have and after you replace "/user/hduser/" with the HDFS path your user has permissions to write to:

hadoop jar /usr/local/hadoop/share/hadoop/tools/lib/hadoop-streaming-3.3.0.jar -file /tmp/mapper.py -mapper /tmp/mapper.py -file /tmp/reducer.py -reducer /tmp/reducer.py -input /user/hduser/* -output /user/hduser/contint-output -verbose

6. Optional step. You can run this command to see the status of your job:

mapred job -list all
# if you have an old version of hadoop, try this command: hadoop job -list all

FYI
The above directions were tested on Ubuntu 20.x, SUSE 15.x and CentOS 8.x Linux servers (as well as other types including different versions of RHEL). With CentOS we saw some unreliable problems. We do think CentOS is a great OS however.

Leave a comment

Your email address will not be published. Required fields are marked *