当前位置: 首页 > news >正文

手机端网站开发视频武城网站建设

手机端网站开发视频,武城网站建设,电子商务网站的建站流程,建造师求职网文章目录 #xff08;118#xff09;MapJoin案例需求分析ReduceJoin的问题如何解决ReduceJoin的问题如何将一个文件主动缓存到集群的内存里 #xff08;119#xff09;MapJoin案例代码实现参考文献 #xff08;118#xff09;MapJoin案例需求分析 ReduceJoin的问题 在R… 文章目录 118MapJoin案例需求分析ReduceJoin的问题如何解决ReduceJoin的问题如何将一个文件主动缓存到集群的内存里 119MapJoin案例代码实现参考文献 118MapJoin案例需求分析 ReduceJoin的问题 在ReduceJoin中合并的操作是在Reduce阶段进行的所以相比Map阶段Reduce阶段的处理压力过大。另外相同的产品ID的数据会进入同一个Reducer中如果这个产品ID下数据过多其他产品ID的数据很少那么会导致前面那个Reducer压力过大这就是数据倾斜问题。 如何解决ReduceJoin的问题 那如何解决这种问题呢 比较好的方法是不使用ReduceJoin使用MapJoin即在Map阶段实现拼接。 思路简单来说就是将产品码表放进内存orders.txt正常切片进入mapper然后mapper处理的时候就逐行对orders.txt里的数据进行产品码值的替换。 基于这种方式MapJoin的适用场景也就很明显了MapJoin适用于一张或多张表特别小不能把内存撑爆了一张表特别大的场景。 如何将一个文件主动缓存到集群的内存里 那问题来了在Hadoop里怎么把一张表主动缓存到内存当中且还能在map()里调用呢 首先我们需要在驱动类里指定将文件加载到缓存 //缓存普通文件到Task运行节点。 job.addCacheFile(new URI(file:///e:/cache/pd.txt)); //如果是集群运行,需要设置HDFS路径 job.addCacheFile(new URI(hdfs://hadoop102:8020/cache/pd.txt));// MapJoin的话就不需要Reduce阶段了 job.setNumReduceTasks(0);然后在自定义Mapper类的setup()里按以下流程编写代码以读取缓存的文件数据 //1. 获取缓存的文件 // 2.循环读取缓存文件中每一行 // 3. 切割 // 4. 缓存数据到集合setup()执行完成后才会执行map()。 所以我们最后在map()里获取一行后截取到pid从内存中码表拿到产品中文名拼接给出就可以。 119MapJoin案例代码实现 过了一遍教程其实就是对上一小节的代码实现。 总的来说就是只有一个Map阶段在Map阶段中在map()处理之前先把码表读进内存中然后map()在一行一行读取后直接使用内存中的码表对指定字段进行替换即可。 对我来讲用处不大所以这里直接跳过但还是补充一下代码 在MapJoinDriver驱动类中添加缓存文件: package com.atguigu.mapreduce.mapjoin;import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import java.io.IOException; import java.net.URI; import java.net.URISyntaxException;public class MapJoinDriver {public static void main(String[] args) throws IOException, URISyntaxException, ClassNotFoundException, InterruptedException {// 1 获取job信息Configuration conf new Configuration();Job job Job.getInstance(conf);// 2 设置加载jar包路径job.setJarByClass(MapJoinDriver.class);// 3 关联mapperjob.setMapperClass(MapJoinMapper.class);// 4 设置Map输出KV类型job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(NullWritable.class);// 5 设置最终输出KV类型job.setOutputKeyClass(Text.class);job.setOutputValueClass(NullWritable.class);// 加载缓存数据job.addCacheFile(new URI(file:///D:/input/tablecache/pd.txt));// Map端Join的逻辑不需要Reduce阶段设置reduceTask数量为0job.setNumReduceTasks(0);// 6 设置输入输出路径FileInputFormat.setInputPaths(job, new Path(D:\\input));FileOutputFormat.setOutputPath(job, new Path(D:\\output));// 7 提交boolean b job.waitForCompletion(true);System.exit(b ? 0 : 1);} }在MapJoinMapper类中的setup方法中读取缓存文件并在map()里进行替换: package com.atguigu.mapreduce.mapjoin;import org.apache.commons.lang.StringUtils; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper;import java.io.BufferedReader; import java.io.IOException; import java.io.InputStreamReader; import java.net.URI; import java.util.HashMap; import java.util.Map;public class MapJoinMapper extends MapperLongWritable, Text, Text, NullWritable {private MapString, String pdMap new HashMap();private Text text new Text();//任务开始前将pd数据缓存进pdMapOverrideprotected void setup(Context context) throws IOException, InterruptedException {//通过缓存文件得到小表数据pd.txtURI[] cacheFiles context.getCacheFiles();Path path new Path(cacheFiles[0]);//获取文件系统对象,并开流FileSystem fs FileSystem.get(context.getConfiguration());FSDataInputStream fis fs.open(path);//通过包装流转换为reader,方便按行读取BufferedReader reader new BufferedReader(new InputStreamReader(fis, UTF-8));//逐行读取按行处理String line;while (StringUtils.isNotEmpty(line reader.readLine())) {//切割一行 //01 小米String[] split line.split(\t);pdMap.put(split[0], split[1]);}//关流IOUtils.closeStream(reader);}Overrideprotected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {//读取大表数据 //1001 01 1String[] fields value.toString().split(\t);//通过大表每行数据的pid,去pdMap里面取出pnameString pname pdMap.get(fields[1]);//将大表每行数据的pid替换为pnametext.set(fields[0] \t pname \t fields[2]);//写出context.write(text,NullWritable.get());} }参考文献 【尚硅谷大数据Hadoop教程hadoop3.x搭建到集群调优百万播放】
http://www.dnsts.com.cn/news/9468.html

相关文章:

  • 重庆网站改版有哪些企业网站
  • 网站图片上浮动文字淄博的大型网站建设
  • 环保网站建设开发建筑公司企业发展建议
  • 襄阳做网站哪家好连云港网站制作公司口碑好
  • 网站做一样的算侵权么泰安房产最新出售信息
  • 泸县手机网站建设推广型网站开发
  • 做电影网站赚钱wordpress仿wiki
  • 连云港企业做网站cn域名做犯法网站
  • 做软件挣钱的网站门户信息类网站建设
  • 请人做个网站多少钱中国信息网官网查询系统
  • 外包网站价格网页制作平台的是
  • 阿里云做影视网站wordpress网站服务时间
  • 网站备案视频title (网站开发)
  • 网站开发项目建设规范广州企业网站制作
  • 网站备案和前置审批重庆网站制作长沙
  • 马蜂窝网站做的重点西安网站seo技术厂家
  • 网站缓存优化怎么做wordpress二级联动
  • 郑州睿网站建设可信网站认证 技术支持单位
  • 视频网站怎么做网站引流海外推广运营
  • 南平企业网站建设平面设计培训多少钱 贵吗
  • 如何查看网站的更新频率女生做网站推广
  • 12306网站的建设历程知名企业网站人才招聘情况
  • 网站建设优化方案广东公司网站建设哪家好
  • 局域网内网站建设wordpress 评论者邮箱
  • 社交网站先做pc站可以吗share poine 户做网站
  • 烟台网站制作效果服务器下载安装
  • 石家庄建站工具网站制作学什么
  • 网站管理助手 mysqlwordpress 微博 插件
  • 把手机做网站服务器雅思培训
  • 公司网站未备案吗wordpress编辑留言板