蚌埠专业制作网站的公司,网站开发工具介绍,做网站需要会语言吗,做网站主要学什么文章作者邮箱:yugongshiye@sina.cn 地址:广东惠州 ▲ 本章节目的
⚪ 掌握网站流量项目的SparkStreaming代码;
⚪ 掌握网站流量项目的HBaseUtil代码;
⚪ 掌握网站流量项目的MysqlUtil代码;
⚪ 掌握网站流量项目的LogBean代码;
⚪ 掌握网站流量项目的To…文章作者邮箱:yugongshiye@sina.cn 地址:广东惠州 ▲ 本章节目的
⚪掌握网站流量项目的SparkStreaming代码;
⚪掌握网站流量项目的HBaseUtil代码;
⚪掌握网站流量项目的MysqlUtil代码;
⚪掌握网站流量项目的LogBean代码;
⚪掌握网站流量项目的TongjiBean代码; 一、SparkStreaming代码 package cn.tedu.kafkasource import org.apache.kafka.clients.consumer.ConsumerRecord import org.apache.kafka.common.TopicPartition import org.apache.kafka.common.serialization.StringDeserializer import org.apache.spark.SparkConf import org.apache.spark.streaming.dstream.InputDStream import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent import org.apache.spark.streaming.kafka010._ import org.apache.spark.streaming.{Seconds, StreamingContext} import org.apache.spark.SparkContext import cn.tedu.pojo.LogBean import java.util.Calendar import cn.tedu.dao.HBaseUtil import cn.tedu.pojo.TongjiBean import cn.tedu.dao.MysqlUtil object SparkStreaming { def main(args: Array[String]): Unit = { val conf= new SparkConf().setMaster("local[3]").setAppName("test01") .set("spark.serializer","org.apache.spark.serializer.KryoSerializer") val sc=new SparkContext(conf) val ssc=new StreamingContext(sc, Seconds(5)) val kafkaParams: Map[String, Object] = Map[String, Object]( "bootstrap.servers" - "hadoop01:9092,hadoop02:9092,hadoop03:9092", "key.deserializer" - classOf[StringDeserializer], "value.deserializer" - classOf[StringDeserializer], "group.id" - "gp2" ) val topics = Array("logdata") val kafkaSource=KafkaUtils.createDirectStream[String, String]( ssc, PreferConsistent, Subscribe[String, String](topics, kafkaParams) ).map(x=x.value()) kafkaSource.foreachRDD{rdd= //lines里存储了当前批次内的所有数据 val lines=rdd.toLocalIterator //遍历迭代器,对每条数据进行处理 while(lines.hasNext){ val line=lines.next() //第一步:清洗出所需要的业务字段。url,urlname,uvid,ssid,sscount,sstime,cip val info=line.split("\\|") val url=info(0) val urlname=info(1) val uvid=info(13) val ssid=info(14).split("_")(0) val sscount=info(14).split("_")(1) val sstime=info(14).split("_")(2)