仿qq网站程序,网站建设首选 云端高科,325建筑网站,网站搭建app目录 1. 执行过程1.1 分割1.2 Map1.3 Combine1.4 Reduce 2. 代码和结果2.1 pom.xml中依赖配置2.2 工具类util2.3 WordCount2.4 结果 参考 1. 执行过程 假设WordCount的两个输入文本text1.txt和text2.txt如下。
Hello World
Bye WorldHello Hadoop
Bye Hadoop1.1 分割 将每个文… 目录 1. 执行过程1.1 分割1.2 Map1.3 Combine1.4 Reduce 2. 代码和结果2.1 pom.xml中依赖配置2.2 工具类util2.3 WordCount2.4 结果 参考 1. 执行过程 假设WordCount的两个输入文本text1.txt和text2.txt如下。
Hello World
Bye WorldHello Hadoop
Bye Hadoop1.1 分割 将每个文件拆分成split分片由于测试文件比较小所以每个文件为一个split并将文件按行分割形成keyvalue对如下图所示。这一步由MapReduce自动完成其中key值为偏移量由MapReduce自动计算出来包括回车所占的字符数。
1.2 Map 将分割好的keyvalue对交给用户定义的Map方法处理生成新的keyvalue对。处理流程为先对每一行文字按空格拆分为多个单词每个单词出现次数设初值为1key为某个单词value为1如下图所示。
1.3 Combine 得到Map方法输出的keyvalue对后Mapper将它们按照key值进行升序排列并执行Combine合并过程将key值相同的value值累加得到Mapper的最终输出结果并写入磁盘如下图所示。
1.4 Reduce Reducer先对从Mapper接受的数据进行排序并将key值相同的value值合并到一个list列表中再交由用户自定义的Reduce方法进行汇总处理得到新的keyvalue对并作为WordCount的输出结果存入HDFS如下图所示。
2. 代码和结果
2.1 pom.xml中依赖配置 dependenciesdependencygroupIdjunit/groupIdartifactIdjunit/artifactIdversion4.11/versionscopetest/scope/dependencydependencygroupIdorg.apache.hadoop/groupIdartifactIdhadoop-common/artifactIdversion3.3.6/versionexclusionsexclusiongroupIdorg.slf4j/groupIdartifactIdslf4j-log4j12/artifactId/exclusion/exclusions/dependencydependencygroupIdorg.apache.hadoop/groupIdartifactIdhadoop-mapreduce-client-core/artifactIdversion3.3.6/versiontypepom/type/dependencydependencygroupIdorg.apache.hadoop/groupIdartifactIdhadoop-mapreduce-client-jobclient/artifactIdversion3.3.6/version/dependency/dependencies2.2 工具类util util.removeALL的功能是删除hdfs上的指定输出路径(如果存在的话)而util.showResult的功能是打印wordcount的结果。
import java.net.URI;
import java.util.regex.Matcher;
import java.util.regex.Pattern;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;public class util {public static FileSystem getFileSystem(String uri, Configuration conf) throws Exception {URI add new URI(uri);return FileSystem.get(add, conf);}public static void removeALL(String uri, Configuration conf, String path) throws Exception {FileSystem fs getFileSystem(uri, conf);if (fs.exists(new Path(path))) {boolean isDeleted fs.delete(new Path(path), true);System.out.println(Delete Output Folder? isDeleted);}}public static void showResult(String uri, Configuration conf, String path) throws Exception {FileSystem fs getFileSystem(uri, conf);String regex part-r-;Pattern pattern Pattern.compile(regex);if (fs.exists(new Path(path))) {FileStatus[] files fs.listStatus(new Path(path));for (FileStatus file : files) {Matcher matcher pattern.matcher(file.getPath().toString());if (matcher.find()) {FSDataInputStream openStream fs.open(file.getPath());IOUtils.copyBytes(openStream, System.out, 1024);openStream.close();}}}}
}2.3 WordCount 正常来说MapReduce编程都是要把代码打包成jar文件然后用hadoop jar jar文件名 主类名称 输入路径 输出路径。下面代码中直接给出了输入和输出路径可以直接运行。
import java.io.IOException;
import java.util.StringTokenizer;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;public class App {public static class MyMapper extends MapperLongWritable, Text, Text, IntWritable {public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {System.out.println(key value);Text keyOut;IntWritable valueOut new IntWritable(1);StringTokenizer token new StringTokenizer(value.toString());while (token.hasMoreTokens()) {keyOut new Text(token.nextToken());context.write(keyOut, valueOut);}}}public static class MyReducer extends ReducerText, IntWritable, Text, IntWritable {public void reduce(Text key, IterableIntWritable values, Context context) throws IOException, InterruptedException {int sum 0;for (IntWritable value : values) {sum value.get();}context.write(key, new IntWritable(sum));} }public static void main(String[] args) throws Exception {Configuration conf new Configuration();String[] myArgs {file:///home/developer/CodeArtsProjects/WordCount/text1.txt, file:///home/developer/CodeArtsProjects/WordCount/text2.txt, hdfs://localhost:9000/user/developer/wordcount/output};util.removeALL(hdfs://localhost:9000, conf, myArgs[myArgs.length - 1]);Job job Job.getInstance(conf, wordcount);job.setJarByClass(App.class);job.setMapperClass(MyMapper.class);job.setReducerClass(MyReducer.class);job.setCombinerClass(MyReducer.class);job.setOutputKeyClass(Text.class);job.setOutputValueClass(IntWritable.class);for (int i 0; i myArgs.length - 1; i) {FileInputFormat.addInputPath(job, new Path(myArgs[i]));}FileOutputFormat.setOutputPath(job, new Path(myArgs[myArgs.length - 1]));int res job.waitForCompletion(true) ? 0 : 1;if (res 0) {System.out.println(WordCount结果:);util.showResult(hdfs://localhost:9000, conf, myArgs[myArgs.length - 1]);}System.exit(res);}
}2.4 结果 参考
吴章勇 杨强著 大数据Hadoop3.X分布式处理实战