成都网站制作设计,重庆网站模板制作,室内设计网站,传奇网站怎么制作教程一、原始消费数据buy.txt
zhangsan 5676 2765 887
lisi 6754 3234 1232
wangwu 3214 6654 388
lisi 1123 4534 2121
zhangsan 982 3421 5566
zhangsan 1219 36 45二、实现思路#xff1a;先通过一个MapReduce将顾客的消费金额进行汇总#xff0c;再通过一个MapReduce来根据金…一、原始消费数据buy.txt
zhangsan 5676 2765 887
lisi 6754 3234 1232
wangwu 3214 6654 388
lisi 1123 4534 2121
zhangsan 982 3421 5566
zhangsan 1219 36 45二、实现思路先通过一个MapReduce将顾客的消费金额进行汇总再通过一个MapReduce来根据金额进行排序 三、定义一个实体类其中compareTo方法实现了排序规则
package cn.edu.tju;import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;public class Buy implements WritableComparableBuy {private double jingdong;private double taobao;private double duodian;public Buy() {}public Buy(double jingdong, double taobao, double duodian) {this.jingdong jingdong;this.taobao taobao;this.duodian duodian;}public double getJingdong() {return jingdong;}public void setJingdong(double jingdong) {this.jingdong jingdong;}public double getTaobao() {return taobao;}public void setTaobao(double taobao) {this.taobao taobao;}public double getDuodian() {return duodian;}public void setDuodian(double duodian) {this.duodian duodian;}Overridepublic String toString() {return jingdong \t taobao \t duodian;}Overridepublic void write(DataOutput out) throws IOException {out.writeDouble(jingdong);out.writeDouble(taobao);out.writeDouble(duodian);}Overridepublic void readFields(DataInput in) throws IOException {this.jingdong in.readDouble();this.taobao in.readDouble();this.duodian in.readDouble();}Overridepublic int compareTo(Buy o) {if(this.jingdongo.getJingdong()){return 1;} else if(this.getJingdong() o.getJingdong()){return -1;} else {if(this.getTaobao()o.getTaobao()){return 1;}else if(this.getTaobao() o.getTaobao()){return -1;} else return 0;}}
}
四、定义第一对Mapper和Reducer
package cn.edu.tju;import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;import java.io.IOException;public class MyBuyMapper1 extends MapperLongWritable, Text, Text, Buy {Overrideprotected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {String str value.toString();String[] fieldList str.split( );double jingdong Double.parseDouble(fieldList[1]);double taobao Double.parseDouble(fieldList[2]);double duodian Double.parseDouble(fieldList[3]);String person fieldList[0];context.write(new Text(person), new Buy(jingdong,taobao,duodian));}
}
package cn.edu.tju;import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;import java.io.IOException;
import java.util.Iterator;public class MyBuyReducer1 extends ReducerText, Buy, Text, Buy {Overrideprotected void reduce(Text key, IterableBuy values, ReducerText, Buy, Text, Buy.Context context) throws IOException, InterruptedException {double sum1 0;double sum2 0;double sum3 0;IteratorBuy iterator values.iterator();while (iterator.hasNext()) {Buy next iterator.next();sum1 next.getJingdong();sum2 next.getTaobao();sum3 next.getDuodian();}context.write(key, new Buy(sum1, sum2, sum3));}
}
五、定义第二对Mapper和Reducer
package cn.edu.tju;import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;import java.io.IOException;public class MyBuyMapper2 extends MapperLongWritable, Text, Buy, Text {Overrideprotected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {String str value.toString();String[] fieldList str.split(\t);double jingdong Double.parseDouble(fieldList[1]);double taobao Double.parseDouble(fieldList[2]);double duodian Double.parseDouble(fieldList[3]);String person fieldList[0];context.write(new Buy(jingdong,taobao,duodian), new Text(person));}
}
package cn.edu.tju;import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;import java.io.IOException;
import java.util.Iterator;public class MyBuyReducer2 extends ReducerBuy, Text, Text, Buy {Overrideprotected void reduce(Buy key, IterableText values, Context context) throws IOException, InterruptedException {IteratorText iterator values.iterator();while(iterator.hasNext()){Text next iterator.next();context.write(next, key);}}
}
六、定义主类其中定义两个Job,等第一个job运行结束之后第二Job开始运行
package cn.edu.tju;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;public class MyBuyMain2 {public static void main(String[] args) throws Exception {Configuration configuration new Configuration(true);configuration.set(mapreduce.framework.name, local);Job job Job.getInstance(configuration);//job.setJarByClass(MyBuyMain.class);//job namejob.setJobName(buy- System.currentTimeMillis());//设置Reducer数量//job.setNumReduceTasks(3);//输入数据路径FileInputFormat.setInputPaths(job, new Path(D:\\tool\\TestHadoop3\\buy.txt));//输出数据路径,当前必须不存在FileOutputFormat.setOutputPath(job, new Path(count_1 ));job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(Buy.class);job.setMapperClass(MyBuyMapper1.class);job.setReducerClass(MyBuyReducer1.class);//等待任务执行完成job.waitForCompletion(true);Job job2 Job.getInstance(configuration);job2.setJarByClass(MyBuyMain2.class);job2.setJobName(buy2- System.currentTimeMillis());FileInputFormat.setInputPaths(job2, new Path(D:\\tool\\TestHadoop3\\count_1\\part-r-00000));//输出数据路径,当前必须不存在FileOutputFormat.setOutputPath(job2, new Path(count_2 ));job2.setMapOutputKeyClass(Buy.class);job2.setMapOutputValueClass(Text.class);job2.setMapperClass(MyBuyMapper2.class);job2.setReducerClass(MyBuyReducer2.class);//等待任务执行完成job2.waitForCompletion(true);}
}
七、运行结果