当前位置: 首页 > news >正文

简单建站我的电脑做网站服务器吗

简单建站,我的电脑做网站服务器吗,苏州嘉盛建设工程有限公司网站,网站导航字体文章目录 一、使用 Java API 和 JavaRDDRow 在 Spark SQL 中向数据帧添加新列二、foreachPartition 遍历 Dataset三、Dataset 自定义 Partitioner四、Dataset 重分区并且获取分区数 一、使用 Java API 和 JavaRDD 在 Spark SQL 中向数据帧添加新列 在应用 mapPartition… 文章目录 一、使用 Java API 和 JavaRDDRow 在 Spark SQL 中向数据帧添加新列二、foreachPartition 遍历 Dataset三、Dataset 自定义 Partitioner四、Dataset 重分区并且获取分区数 一、使用 Java API 和 JavaRDD 在 Spark SQL 中向数据帧添加新列 在应用 mapPartition 函数后创建一个新的数据框 import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.function.FlatMapFunction; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; import org.apache.spark.sql.RowFactory; import org.apache.spark.sql.SparkSession; import org.apache.spark.sql.types.DataTypes; import org.apache.spark.sql.types.StructType;import java.io.IOException; import java.io.Serializable; import java.util.ArrayList; import java.util.Iterator; import java.util.List;public class Handler implements Serializable {public void handler(DatasetRow sourceData) {DatasetRow rowDataset sourceData.where(rowKey abcdefg_123).selectExpr(split(rowKey, _)[0] as id,name,time).where(name 小强).orderBy(functions.col(id).asc(), functions.col(time).desc());FlatMapFunctionIteratorRow,Row mapPartitonstoTime rows-{Int count 0; // 只能在每个分区内自增不能保证全局自增String startTime ;String endTime ;ListRow mappedRowsnew ArrayListRow();while(rows.hasNext()){count;Row next rows.next();String id next.getAs(id);if (count 2) {startTime next.getAs(time);endTime next.getAs(time);}Row mappedRow RowFactory.create(next.getString(0), next.getString(1), next.getString(2), endTime, startTime);mappedRows.add(mappedRow);}return mappedRows.iterator();};JavaRDDRow sensorDataDoubleRDDrowDataset.toJavaRDD().mapPartitions(mapPartitonstoTime);StructType oldSchemarowDataset.schema();StructType newSchema oldSchema.add(startTime,DataTypes.StringType,false).add(endTime,DataTypes.StringType,false);System.out.println(The new schema is: );newSchema.printTreeString();System.out.println(The old schema is: );oldSchema.printTreeString();DatasetRow sensorDataDoubleDFspark.createDataFrame(sensorDataDoubleRDD, newSchema);sensorDataDoubleDF.show(100, false);} }打印结果 The new schema is: root|-- id: string (nullable true)|-- name: string (nullable true)|-- time: string (nullable true)The old schema is: root|-- id: string (nullable true)|-- name: string (nullable true)|-- time: string (nullable true)|-- startTime: string (nullable true)|-- endTime: string (nullable true)-------------------------------------------------- |id |name |time |startTime |endTime | -------------------------------------------------- |abcdefg_123|xiaoqiang|1693462023|1693462023|1693462023| |abcdefg_321|xiaoliu |1693462028|1693462028|1693462028| --------------------------------------------------参考 java - 使用 Java API 和 JavaRDD 在 Spark SQL 中向数据帧添加新列 java.util.Arrays$ArrayList cannot be cast to java.util.Iterator 二、foreachPartition 遍历 Dataset import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.function.VoidFunction; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; import org.apache.spark.sql.SparkSession;import java.io.IOException; import java.io.Serializable; import java.util.Iterator;public class Handler implements Serializable {public void handler(DatasetRow sourceData) {JavaRDDRow dataRDD rowDataset.toJavaRDD();dataRDD.foreachPartition(new VoidFunctionIteratorRow() {Overridepublic void call(IteratorRow rowIterator) throws Exception {while (rowIterator.hasNext()) {Row next rowIterator.next();String id next.getAs(id);if (id.equals(123)) {String startTime next.getAs(time);// 其他业务逻辑}}}});// 转换为 lambda 表达式dataRDD.foreachPartition((VoidFunctionIteratorRow) rowIterator - {while (rowIterator.hasNext()) {Row next rowIterator.next();String id next.getAs(id);if (id.equals(123)) {String startTime next.getAs(time);// 其他业务逻辑}}});} }三、Dataset 自定义 Partitioner 参考spark 自定义 partitioner 分区 java 版 import org.apache.commons.collections.CollectionUtils; import org.apache.spark.Partitioner; import org.junit.Assert;import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap;/*** Created by lesly.lai on 2018/7/25.*/ public class CuxGroupPartitioner extends Partitioner {private int partitions;/*** mapkey, partitionIndex* 主要为了区分不同分区*/private MapObject, Integer hashCodePartitionIndexMap new ConcurrentHashMap();public CuxGroupPartitioner(ListObject groupList) {int size groupList.size();this.partitions size;initMap(partitions, groupList);}private void initMap(int size, ListObject groupList) {Assert.assertTrue(CollectionUtils.isNotEmpty(groupList));for (int i0; isize; i) {hashCodePartitionIndexMap.put(groupList.get(i), i);}}Overridepublic int numPartitions() {return partitions;}Overridepublic int getPartition(Object key) {return hashCodePartitionIndexMap.get(key);}public boolean equals(Object obj) {if (obj instanceof CuxGroupPartitioner) {return ((CuxGroupPartitioner) obj).partitions partitions;}return false;} }查看分区分布情况工具类 1Scala import org.apache.spark.sql.{Dataset, Row}/*** Created by lesly.lai on 2017/12FeeTask/25.*/ class SparkRddTaskInfo {def getTask(dataSet: Dataset[Row]) {val size dataSet.rdd.partitions.lengthprintln(s partition size: $size )import scala.collection.Iteratorval showElements (it: Iterator[Row]) {val ns it.toSeqimport org.apache.spark.TaskContextval pid TaskContext.get.partitionIdprintln(s[partition: $pid][size: ${ns.size}] ${ns.mkString( )})}dataSet.foreachPartition(showElements)} }2Java import org.apache.spark.TaskContext; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.function.VoidFunction; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row;import java.util.ArrayList; import java.util.Iterator; import java.util.List;public class SparkRddTaskInfo {public static void getTask(DatasetRow dataSet) {int size dataSet.rdd().partitions().length;System.out.println( partition size: size);JavaRDDRow dataRDD dataSet.toJavaRDD();dataRDD.foreachPartition((VoidFunctionIteratorRow) rowIterator - {ListString mappedRows new ArrayListString();int count 0;while (rowIterator.hasNext()) {Row next rowIterator.next();String id next.getAs(id);String partitionKey next.getAs(partition_key);String name next.getAs(name);mappedRows.add(id / partitionKey / name);}int pid TaskContext.get().partitionId();System.out.println([partition: pid ][size: mappedRows.size() ] mappedRows);});} }调用方式 import com.vip.spark.db.ConnectionInfos; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.function.Function; import org.apache.spark.sql.Column; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; import org.apache.spark.sql.SparkSession; import scala.Tuple2;import java.util.List; import java.util.stream.Collectors;/*** Created by lesly.lai on 2018/7/23.*/ public class SparkSimpleTestPartition {public static void main(String[] args) throws InterruptedException {SparkSession sparkSession SparkSession.builder().appName(Java Spark SQL basic example).getOrCreate();// 原始数据集DatasetRow originSet sparkSession.read().jdbc(ConnectionInfos.TEST_MYSQL_CONNECTION_URL, people, ConnectionInfos.getTestUserAndPasswordProperties());originSet.selectExpr(split(rowKey, _)[0] as id,concat(split(rowKey, _)[0],_,split(rowKey, _)[1]) as partition_key,split(rowKey, _)[1] as name.createOrReplaceTempView(people);// 获取分区分布情况工具类SparkRddTaskInfo taskInfo new SparkRddTaskInfo();DatasetRow groupSet sparkSession.sql( select partition_key from people group by partition_key);ListObject groupList groupSet.javaRDD().collect().stream().map(row - row.getAs(partition_key)).collect(Collectors.toList());// 创建pairRDD 目前只有pairRdd支持自定义partitioner所以需要先转成pairRddJavaPairRDD pairRDD originSet.javaRDD().mapToPair(row - {return new Tuple2(row.getAs(partition_key), row);});// 指定自定义partitionerJavaRDD javaRdd pairRDD.partitionBy(new CuxGroupPartitioner(groupList)).map(new FunctionTuple2String, Row, Row(){Overridepublic Row call(Tuple2String, Row v1) throws Exception {return v1._2;}});DatasetRow result sparkSession.createDataFrame(javaRdd, originSet.schema());// 打印分区分布情况taskInfo.getTask(result);} }四、Dataset 重分区并且获取分区数 System.out.println(1--rowDataset.rdd().partitions().length);System.out.println(1--rowDataset.rdd().getNumPartitions());DatasetRow hehe rowDataset.coalesce(1);System.out.println(2--hehe.rdd().partitions().length);System.out.println(2--hehe.rdd().getNumPartitions());运行结果 1--29 1--29 2--2 2--2注意在使用 repartition() 时两次打印的结果相同 print(rdd.getNumPartitions()) rdd.repartition(100) print(rdd.getNumPartitions())产生上述问题的原因有两个   首先 repartition() 是惰性求值操作需要执行一个 action 操作才可以使其执行。   其次repartition() 操作会返回一个新的 rdd并且新的 rdd 的分区已经修改为新的分区数因此必须使用返回的 rdd否则将仍在使用旧的分区。   修改为rdd2 rdd.repartition(100) 参考repartition() is not affecting RDD partition size
http://www.dnsts.com.cn/news/37300.html

相关文章:

  • 就业专项资金网站建设100到300万企业所得税
  • 网站如何判断做的好不好帮一个公司做网站多少钱
  • 北京工程建设交易中心网站如何做网站写手
  • 黑龙江龙采做网站如何网站建设后台管理
  • 广西一站网网络技术集团有限公司厦门网站个人制作
  • 海南城乡建设厅网站凡科和有赞哪个好用
  • 用dw建设个人网站视频wordpress 访客文章
  • 东莞网络推广网站郑州百度推广网站建设
  • 贵阳观山湖区网站建设豆角网是哪个网站开发的
  • 南阳网站制作哪家好crawling wordpress
  • 门户网站建设的请示有什么有趣的网站
  • 北京金创网站建设做 爱 网站视频
  • wordpress标签分类河池网站优化
  • 丹徒网站建设咨询长沙营销网站建设公司
  • 禹州市城乡建设局网站怎么可以创建网站
  • ftp网站上传之后怎么办小程序网站开发太原
  • 8图片这样的网站怎么做wordpress 主机推荐
  • 做网站排名有用吗域名是网址吗
  • 如何做淘客网站爱城市网官方下载
  • 企业进行网站建设的方式有( )做地方旅游网站目的意义
  • 西安网站seo方法网站建设的钱计入什么科目
  • 网站建设名头浅谈中兴电子商务网站建设
  • 代人做网站动画设计制作
  • 网站关键词优化怎么弄怎么做seo网站推广
  • 长沙手机网站建设哪些WordPress设置模块间距
  • 电子商务等于做网站吗天津制作网站公司
  • 唐山哪里建设网站好青岛做网站找什么公司
  • 长沙优质营销网站建设设计枣庄做网站
  • 杭州网站推广大全网站开发方向c语言
  • 国外平面设计师网站如何引导企业老板做网站