使用Python语言写Hadoop MapReduce程序

Python部落(python.freelycode.com)组织翻译,禁止转载,欢迎转发。

在本教程中,我将描述如何使用Python语言为Hadoop编写一个简单的MapReduce程序。

目的

尽管Hadoop框架是用Java编写的,但是为Hadoop编写的程序不必非要Java写,还可以使用其他语言开发,比如Python或C++(Haoop在0.14.1版本提供C++ API)。然而,Hadoop的官方文档和在官网上最著名的Python的例子会让你觉得你必须使用Jython(译者注:用Java语言写的一个Python解释器,原生的Python解释器是用C语言写的。)将Python代码转换成一个Java的jar文件。很显然,这非常不方便甚至会出问题,如果你依赖的Python特性不被Jython提供。另一个Jython方式的问题是:编写与Haoop交互的Python程序存在着间接开销——你只要看一下这个例子$HADOOP_HOME/src/examples/python/WordCount.py,你就可以明白我的意思了。

也就是说,本教程的目的是:以Python风格,编写一个Hadoop MapReduce程序,即你应该熟悉的方式。

我们想要做什么

我们将用Python写一个简单的MapReduce程序(可以看下维基的MapReduce条目)但是不用Jython将我们的代码转换成Java的jar文件。

我们的程序会模仿单词计数,即读取文本文件并计算单词出现的次数。输入是文本文件,输出也是文本文件,其中每一行包含一个单词及它出现的次数,以Tab分隔。

注意:你也可以使用本教程描述的“技术”,而使用Python之外的编程语言比如Perl或Ruby。

前提条件

你应该有一个Hadoop的集群在运行,因为我们将用到。如果你还没有一个集群,我下面的教程可能会帮助你搭建一个。本教程是在Ubuntu系统下测试的,但应该也适用于其他Linux/Unix系统。

在Ubuntu上运行Hadoop(单节点集群)——如何搭建一个伪分布式,单节点Hadoop集群,后台存储是Hadoop分布式文件系统(HDFS)。

在Ubuntu上运行Hadoop(多节点集群)——如何搭建一个分布式,多节点的Hadoop集群,后台存储是Hadoop分布式文件系统(HDFS)。

MapReduce的Python代码

下面Python代码的一个“窍门”是我们将使用Hadoop流API(可以看下相关的维基条目)来帮助我们通过STDIN(标准输入)和STDOUT(标准输出)在Map和Reduce代码间传递数据。我们只是使用Python的sys.stdin读取输入数据和打印输出到sys.stdout。这就是我们需要做的,因为Hadoop流将处理好一切。

Map部分代码:mapper.py

将下面的代码保存在文件 /home/hduser/mapper.py 中。它将从STDIN读取数据,拆分为单词并输出一组映射单词和它们数量(中间值)的行到STDOUT。尽管这个Map脚本不会计算出单词出现次数的总和(中间值)。相反,它会立即输出(<word> 1)元组的形式——即使某个特定的单词可能会在输入中出现多次。在我们的例子中,我们让后续的Reduce做最终的总和计数。当然,你可以按照你的想法在你自己的脚本中修改这段代码,但是,由于教学原因,我们在本教程中就先这样做。:-)

请确保该文件具有可执行权限(chmod +x /home/hduser/mapper.py ),否则你会遇到问题。

Selection_001.png

Reduce部分代码:reducer.py

将下面的代码保存在文件 /home/hduser/reducer.py 中。它将从STDIN读取mapper.py的结果(因此mapper.py的输出格式和reducer.py预期的输入格式必须匹配),然后统计每个单词出现的次数,最后将结果输出到STDOUT中。

请确保该文件具有可执行权限(chmod +x /home/hduser/reducer.py ),否则你会遇到问题。

Selection_002.png

代码测试(cat data | map | sort | reduce)

在MapReduce作业中使用它们之前,我建议先在本地测试你的mapper.py和reducer.py脚本。否则,你的作业可能成功完成了但没有作业结果数据或得到了不是你想要的结果。如果发生这种情况,很有可能是你(或我)搞砸了。

这里有一些想法,关于如何测试这个Map和Reduce脚本的功能。

Selection_003.png

在Hadoop上运行Python代码

下载示例输入数据

对于这个示例,我们将使用来自Gutenberg项目的三个文本:


下载每个文件为纯文本文件,以UTF-8编译并且将这些文件存储在一个临时目录中,如/tmp/gutenberg。

***说明:你将需要在你的Cloudera虚拟机中打开浏览器。选择适当的文件下载(UTF-8 版本),它将显示在你的浏览器中。点击鼠标右键按钮来保存该文件。给它一个合适的名称(如"Ulysses"),并注意它将保存在下载目录中。***


Selection_004.png

将本地示例数据拷贝到HDFS

在我们运行实际的MapReduce作业前,我们首先必须从我们本地文件系统中拷贝文件到Hadoop的HDFS内。

***说明:
我们假设你是在你的下载目录中。我们必须在HDFS中创建一个子目录,然后拷贝文件过来。最后,我们验证拷贝文件成功。

首先,我们在HDFS中创建子目录MyFirst:

    [cloudera@quickstart Downloads]$ hadoop fs -mkdir  MyFirst

然后,我们拷贝文件。注意,三个文件以.txt结尾:

    [cloudera@quickstart Downloads]$ hadoop fs -copyFromLocal *.txt MyFirst

最后,我们验证拷贝成功: 

    [cloudera@quickstart Downloads]$ hadoop fs -ls MyFirst

   
    Found 3 items

    -rw-r--r--   1 cloudera cloudera    1423803 2014-11-30 08:02 MyFirst/Leonardo.txt
    -rw-r--r--   1 cloudera cloudera     674570 2014-11-30 08:02 MyFirst/OutlineOfScience.txt
    -rw-r--r--   1 cloudera cloudera    1573150 2014-11-30 08:02 MyFirst/Ulysses.txt


Selection_005.png

运行MapReduce作业

***说明:

运行MapReduce作业,敲入如下命令:

    [cloudera@quickstart ~]$ hadoop jar /usr/lib/hadoop-0.20-mapreduce/contrib/streaming/hadoop-streaming.jar  -file mapper.py    -mapper mapper.py \
 -file reducer.py   -reducer reducer.py -input MyFirst/* -output MyFirst4-output

你会收到有关文件被弃用的警告,不用担心。重要的是:当你发出这条命令时,输出目录(在这个示例中是MyFirst-output)不存在。

验证这个程序工作正常。首先,输入命令:hadoop fs -ls MyFirst4-output

[cloudera@quickstart ~]$ hadoop fs -ls MyFirst4-output

Found 2 items

-rw-r--r--   1 cloudera cloudera          0 2014-11-30 09:23 MyFirst4-output/_SUCCESS

-rw-r--r--   1 cloudera cloudera     880829 2014-11-30 09:23 MyFirst4-output/part-00000

然后,查看输出文件:

[cloudera@quickstart ~]$ hadoop fs -cat MyFirst4-output/part-00000

将文件从HDFS中拷入到你本地文件系统中:

[cloudera@quickstart ~]$ hadoop fs -copyToLocal MyFirst4-output/part-00000

MyFirstOutputLocal.txt

现在,一切都准备好了,我们终于可以在Hadoop集群上运行我们的Python MapReduce作业了。如上所述,我们使用Hadoop流API通过STDIN和STDOUT在Map和Reduce间传递数据。

Selection_006.png

如果你想要在运行的时候修改Hadoop参数,如增加Reduce任务的数量,你可以使用-D选项:

    hduser@ubuntu:/usr/local/hadoop$ bin/hadoop jar contrib/streaming/hadoop-*streaming*.jar -D mapred.reduce.tasks=16 ...


关于mapred.map.tasks说明:Hadoop does not honor mapred.map.tasks beyond considering it a hint 。但是,Hadoop接受用户指定mapred.reduce.tasks并且不操作。你不能强制指定mapred.map.tasks,但可以指定mapred.reduce.tasks。

这个任务将读取HDFS目录/user/hduser/gutenberg中的所有文件,处理它们,并将结果存储在HDFS目录/user/hduser/gutenberg-output中。一般情况下,Hadoop对每个reducer产生一个输出文件;在我们的示例中,然而它将只创建单个文件因为输入的文件都很小。

在终端中前一个命令的输出示例︰

Selection_007.png

**译者说明:截图中的命令不完整,完整命令如下:

hduser@ubuntu:/usr/local/hadoop$ bin/hadoop jar contrib/streaming/hadoop-*streaming*.jar -mapper /home/hduser/mapper.py -reducer /home/hduser/reducer.py -input /user/hduser/gutenberg/* -output /user/hduser/gutenberg-output


在上面的输出中你可以看到,Hadoop也为统计和信息提供一个基本的web界面。Hadoop集群运行时,在浏览器中打开 http://localhost:50030/  ,浏览下界面。这里有一张我们刚才运行任务的Hadoop Web界面的截图。


Hadoop-web-interface-screenshot.png

图1:Hadoop的JobTracker界面截图,详细显示了我们刚运行的MapReduce作业。

检查结果是否成功存储在了HDFS目录/user/hduser/gutenberg-output中:

Selection_008.png

然后,你可以使用dfs -cat命令查看文件的内容:

Selection_009.png

请注意,上面特定的输出中被双引号引用的单词没有被Hadoop插入。他们是我们的Python代码拆分单词的结果,在这种情况下它匹配了文件中引用的开头。进一步查看下part-00000这个文件。

改进Mapper和Reducer代码:使用Python的迭代器(iterator)和生成器(generator)

上面的Mapper和Reducer例子应该给你提供了一种思路,关于如何创建第一个MapReduce程序。重点是代码简洁和易于理解,特别是对于Python语言的初学者。在现实程序中,你可能想要通过Python的迭代器和生成器一个更好的介绍PDF文档)优化你的代码。

一般来说,迭代器和生成器(生成迭代器的函数,举例Python中yield语句)有一个优点:序列中的元素在你确时需要它的时候才会生成。这会很有用,当你手头上计算资源昂贵或内存紧缺。

注意:下面的Map和Reduce脚本只有运行在Hadoop环境中才会正常工作,即在 MapReduce任务中作为Mapper和Reducer。这表示在本地运行的测试命令"cat DATA | ./mapper.py | sort -k1,1 | ./reducer.py"不会正常工作,因为一些功能是由Hadoop来完成的。

准确地说,我们计算了一个单词出现的次数,例如("foo", 4),只有恰巧相同的单词(foo)相继出现多次。然而,在大多数情况下,我们让Hadoop在Map和Reduce过程时自动分组(key, value)对这样的形式,因为Hadoop在这方面比我们简单的Python脚本效率更高。

mapper.py

Selection_010.png

reducer.py

Selection_011.png

英文原文:http://www.emunix.emich.edu/~sverdlik/COSC472/WritingAnHadoopMapReduceProgramInPython-MichaelG.Noll.html
译者:leisants