当前位置: 首页 > news >正文

济南网站seo顾问北京空间信息传输中心

济南网站seo顾问,北京空间信息传输中心,网站后台上传不了文件,做互联网需要网站吗目录 需求流程#xff1a; 需求分析与规范 作业 作业2 需求流程#xff1a; 全量抽取 增量抽取 - DataX Kettle Sqoop ... 场景: 业务部门同事或者甲方的工作人员给我们的部门经理和你提出了新的需 求 流程: 联系 开会讨论 确认需求 落地 需求文档( 具体…目录 需求流程 需求分析与规范 作业 作业2 需求流程 全量抽取 增量抽取 - DataX Kettle Sqoop ... 场景: 业务部门同事或者甲方的工作人员给我们的部门经理和你提出了新的需 求 流程: 联系 开会讨论 确认需求 落地 需求文档( 具体需要的东西) 原型文档 (报表的原型 纸笔/画图工具) 第一张汇总报表需要的指标 - 决策报表 汇总表 每次计算只有一天的记录 - 大 BOSS: - 全部会员数 新增会员数 - 有效会员数 有效会员占比 - 流失会员数: 倒推一年含一年无消费记录的会员 - 净增有效会员数 - 会员消费级别分类人数 (A 2000 B 1000 2000 C 500 1000 D 100 500 E 100) - 会员消费总额 - 会员消费总次数 - 会员消费单价 - 流失会员数 - 新增流失会员 - 60 天会员复购率 - 180 天会员复购率 - 365 天会员复购率 第二张报表用于市场营销 - 明细报表, 普通报表 - 市场部同事 - 筛选大于 30 笔的会员或者消费总金额大于 1500 的会员作为目标用户 用于电 话营销 - 字段: 姓名 手机号 消费总额 消费次数 城市 门店 付款偏好 (手机 刷卡 现金..) 关注的疾病 - 该会员最近 3 个月的月消费订单数和额度 m1_total m1_sale m2_total m2_sale 第三张报表用于市场营销 - 2022.1-2023.12 每个月消费前 20 的会员名单 24X20480 条 - 市场部经理 - T1(月) yyyy-mm 月份 - 会员姓名 联系方式... 消费级别分类, 最近 30 天消费订单数和总额 - 该会员当月前 30 天消费, 60 天消费, 90 天消费 (困难点) - 报表排序方式: 默认按消费总额倒序 / 按消费次数倒序 - 报表默认显示 2021 年 1 月份的数据, 可选 2 年内任何一个月的数据查看 需求分析与规范 经理整理出一个类似宽表文档的东西 - 方便后续的明细查询和指标计算 它决定了我们需要抽取哪些表 crm.user_base_info_his 客户信息表 erp.u_memcard_reg 会员卡信息表 erp.u_sale_m 订单表 1900W 数据 erp.u_sale_pay 订单支付表 1200W erp.c_memcard_class_group 会员分组表 erp.u_memcard_reg_c 疾病关注表 his.chronic_patient_info_new 检测表 erp.c_org_busi 门店表 # 额外的从文件处理的码值表 erp.c_code_value # 7 个全量 系统名前缀 _ 表名 _(full|inc) crm.user_base_info_his 全量 ods_lijinquan.crm_user_base_info_his_full erp.u_memcard_reg 全量 ods_lijinquan.erp_u_memcard_reg_full erp.c_memcard_class_group 全量 ods_lijinquan.erp_c_memcard_class_group_full erp.u_memcard_reg_c 全量ods_lijinquan.erp_u_memcard_reg_c_full his.chronic_patient_info_new 全量 ods_lijinquan.his_chronic_patient_info_new_full erp.c_org_busi 全量 ods_lijinquan.erp_c_org_busi_full erp.c_code_value 全量文件处理 ods_lijinquan.c_code_value_full # 增量 erp.u_sale_m 先 做 全 量 ( 一 次 性 ) 再 做 增 量 ( 每 天 执 行 ) ods_lijinquan.erp_u_sale_m_inc erp.u_sale_pay 同上 增量 ods_lijinquan.erp_u_sale_pay_inc 作业 完成 7 张全量表的抽取, 部署到调度平台 7 个调度任务 所有表最后都要跟源表的总数进行对比 需要一致 全量表处理 升级辅助脚本 - 自动读取表的字段信息, 自动生成 datax json 文件 和 Hive 建表文件 full.py: #!/bin/python3 import pymysql import sys # 自动写 datax 的 json 文件 if len(sys.argv)!3: print(使用方法为:python3 full.py 数据库名 表名) sys.exit() sys_namesys.argv[1] table_namesys.argv[2] # datax_jsonf{sys_name}.{table_name}_full.json dbpymysql.connect( hostzhiyun.pub, port23306, userzhiyun, passwordzhiyun, databaseinformation_schema ) cursordb.cursor() cursor.execute(fselect column_name,data_type from information_schema.columns where table_schema{sys_name} and table_name{table_name}) datacursor.fetchall() fileds[]for field in data: field_name field[0] field_type field[1] #转换成 hive 类型 field_hive_typestring if field_typeint or field_typetinyint or field_typebigint: field_hive_typeint if field_typefloat or field_typedouble: field_hive_typefloat fileds.append([field_name,field_hive_type]) db.close() print( 配置 datax ) file_pathf/zhiyun/shihaihong/jobs/{sys_name}_{table_name}_fu ll.json template_path/zhiyun/shihaihong/jobs/template.json with open(template_path,r,encodingutf-8) as f: template_contentf.read() new_contenttemplate_content.replace(#sys_name#,sys_name) new_contentnew_content.replace(#table_name#,table_name) #列的替换 lines[] for filed in fileds: line {name:filed[0] ,type:filed[1]}, lines.append(line) columns\n.join(lines) columnscolumns.strip(,) new_contentnew_content.replace(\#columns#\,columns) #写入到新的配置 with open(file_path,w,encodingutf-8) as ff: ff.write(new_content) ff.close() f.close() print(datax 文件配置成功) print( 配置 hive )file_pathf/zhiyun/shihaihong/sql/{sys_name}_{table_name}_ful l.sql template_path/zhiyun/shihaihong/sql/template.sql with open(template_path,r,encodingutf-8) as f: template_contentf.read() new_contenttemplate_content.replace(#sys_name#,sys_name) new_contentnew_content.replace(#table_name#,table_name) #列的替换 lines[] for filed in fileds: linef {filed[0]} {filed[1]}, lines.append(line) columns\n.join(lines) columnscolumns.strip(,) new_contentnew_content.replace(#columns#,columns) #写入到新的配置 with open(file_path,w,encodingutf-8) as ff: ff.write(new_content) ff.close() print(hive 建表文件生成成功) json 模板 template.json: { job: { content: [ { reader: { name: mysqlreader, parameter: { column: [*], connection: [ { jdbcUrl: [jdbc:mysql://zhiyun.pub:233 06/crm?useSSLfalse ], table: [ #table_name# ] } ], password: zhiyun, username: zhiyun } }, writer: { name: hdfswriter, parameter: { column: [ #column# ], defaultFS: hdfs://cdh02:8020, fieldDelimiter: \t, fileName: #sys_name#_#table_name#_full.data, fileType: orc, path: /zhiyun/shihaihong/ods/#sys_name#_#table_name#_full, writeMode: truncate } } } ], setting: { speed: { channel: 3 } } } } sql 模板文件 create database if not exists ods_shihaihong location /zhiyun/shihaihong/ods;-- 增量表 create external table if not exists ods_shihaihong.#table_name#_full( #columns# ) row format delimited fields terminated by \t lines terminated by \n stored as orc location /zhiyun/shihaihong/ods/#sys_name#_#table_name#_full; 测试 python3 python/full.py crm user_base_info_his 生成 json 和 sql 文件 根据 json 和 sql 文件写出获得数据的 shell 脚本 crm_user_base_info_his_full.sh #!/bin/bash echo 生成全量配置文件mkdir -p /zhiyun/shihaihong/jobs echo { job: { content: [ { reader: { name: mysqlreader, parameter: { column: [*], connection: [ { jdbcUrl: [ jdbc:mysql://zhiyun.pub:233 06/crm?useSSLfalse ], table: [ user_base_info_his ] } ], password: zhiyun, username: zhiyun } }, writer: { name: hdfswriter, parameter: { column: [ {name: id,type:int}, {name:user_id,type:string}, {name:user_type,type:string} , {name:source,type:string}, {name:erp_code,type:string} , {name:active_time,type:strin g}, {name:name,type:string}, {name:sex,type:string}, {name:education,type:string} ,{name:job,type:string}, {name:email,type:string}, {name:wechat,type:string}, {name:webo,type:string}, {name:birthday,type:string} , {name:age,type:int}, {name:id_card_no,type:string }, {name:social_insurance_no,type :string}, {name:address,type:string}, {name:last_subscribe_time,type :int} ], defaultFS: hdfs://cdh02:8020, fieldDelimiter: \t, fileName: crm_user_base_info_his_full.data, fileType: orc, path: /zhiyun/shihaihong/ods/crm_user_base_info_his_full, writeMode: truncate } } } ], setting: { speed: { channel: 3 } } } } /zhiyun/shihaihong/jobs/crm_user_base_info_his_full.json echo 开始抽取 hadoop fs -mkdir -p /zhiyun/shihaihong/ods/crm_user_base_info_his_full python /opt/datax/bin/datax.py /zhiyun/shihaihong/jobs/crm_user_base_info_his_full.json echo hive 建表 beeline -u jdbc:hive2://localhost:10000 -n root -p 123 -e create database if not exists ods_shihaihong location /zhiyun/shihaihong/ods; -- 增量表 create external table if not exists ods_shihaihong.user_base_info_his_full( id int, user_id string, user_type string, source string, erp_code string, active_time string, name string, sex string, education string, job string, email string, wechat string, webo string, birthday string, age int, id_card_no string, social_insurance_no string, address string, last_subscribe_time int ) row format delimited fields terminated by \t lines terminated by \n stored as orc location /zhiyun/shihaihong/ods/crm_user_base_info_his_full;# echo 加载数据 # beeline -u jdbc:hive2://localhost:10000 -n root -p 123 -e # load data inpath \/zhiyun/shihaihong/tmp/crm_user_base_info_his_full/*\ overwrite into table ods_shihaihong.crm_user_base_info_his_full partition(createtime$day); # echo 验证数据 beeline -u jdbc:hive2://localhost:10000 -n root -p 123 -e select count(1) from ods_shihaihong.user_base_info_his_full; echo 抽取完成 运行测试 在本人数据库检查 在生产调度中心设置任务 在 GLUE IDE 插入 sh 文件后执行一次 运行成功。 其它六张表用同样的方式操作即可。 其它六张表的 shell 脚本 erp_u_memcard_reg_full.sh #!/bin/bash echo 生成全量配置文件 mkdir -p /zhiyun/shihaihong/jobs echo { job: { content: [ { reader: { name: mysqlreader, parameter: { column: [*], connection: [ { jdbcUrl: [ jdbc:mysql://zhiyun.pub:233 06/erp?useSSLfalse ], table: [ u_memcard_reg ] } ], password: zhiyun, username: zhiyun } }, writer: { name: hdfswriter, parameter: { column: [ {name: id,type:int}, {name:memcardno,type:string} , {name:busno,type:string}, {name:introducer,type:string }, {name:cardtype,type:int}, {name:cardlevel,type:int},{name:cardpass,type:string} , {name:cardstatus,type:int}, {name:saleamount,type:string }, {name:realamount,type:string }, {name:puramount,type:string} , {name:integral,type:string} , {name:integrala,type:string} , {name:integralflag,type:int} , {name:cardholder,type:string }, {name:cardaddress,type:strin g}, {name:sex,type:string}, {name:tel,type:string}, {name:handset,type:string}, {name:fax,type:string}, {name:createuser,type:string }, {name:createtime,type:string }, {name:tstatus,type:int}, {name:notes,type:string}, {name:stamp,type:string}, {name:idcard,type:string}, {name:birthday,type:string} , {name:allowintegral,type:int }, {name:apptype,type:string}, {name:applytime,type:string} , {name:invalidate,type:string }, {name:lastdate,type:string} , {name:bak1,type:string},{name:scrm_userid,type:strin g} ], defaultFS: hdfs://cdh02:8020, fieldDelimiter: \t, fileName: erp_u_memcard_reg_full.data, fileType: orc, path: /zhiyun/shihaihong/ods/erp_u_memcard_reg_full, writeMode: truncate } } } ], setting: { speed: { channel: 3 } } } } /zhiyun/shihaihong/jobs/erp_u_memcard_reg_full.json echo 开始抽取 hadoop fs -mkdir -p /zhiyun/shihaihong/ods/erp_u_memcard_reg_full python /opt/datax/bin/datax.py /zhiyun/shihaihong/jobs/erp_u_memcard_reg_full.json echo hive 建表 beeline -u jdbc:hive2://localhost:10000 -n root -p 123 -e create database if not exists ods_shihaihong location /zhiyun/shihaihong/ods; -- 增量表 create external table if not exists ods_shihaihong.u_memcard_reg_full( id int, memcardno string, busno string, introducer string, cardtype int, cardlevel int, cardpass string, cardstatus int,saleamount string, realamount string, puramount string, integral string, integrala string, integralflag int, cardholder string, cardaddress string, sex string, tel string, handset string, fax string, createuser string, createtime string, tstatus int, notes string, stamp string, idcard string, birthday string, allowintegral int, apptype string, applytime string, invalidate string, lastdate string, bak1 string, scrm_userid string ) row format delimited fields terminated by \t lines terminated by \n stored as orc location /zhiyun/shihaihong/ods/erp_u_memcard_reg_full;# echo 加载数据 # beeline -u jdbc:hive2://localhost:10000 -n root -p 123 -e # load data inpath \/zhiyun/shihaihong/tmp/erp_u_memcard_reg_full/*\ overwrite into table ods_shihaihong.erp_u_memcard_reg_full partition(createtime$day); # echo 验证数据 beeline -u jdbc:hive2://localhost:10000 -n root -p 123 -e select count(1) from ods_shihaihong.u_memcard_reg_full;echo 抽取完成 erp.c_memcard_class_group: #!/bin/bash echo 生成全量配置文件 mkdir -p /zhiyun/shihaihong/jobs echo { job: { content: [ { reader: { name: mysqlreader, parameter: { column: [*], connection: [ { jdbcUrl: [ jdbc:mysql://zhiyun.pub:233 06/erp?useSSLfalse ], table: [ c_memcard_class_group ] } ], password: zhiyun, username: zhiyun } }, writer: { name: hdfswriter, parameter: { column: [ {name: createtime,type:string},{name:createuser,type:string }, {name:groupid,type:int}, {name:groupname,type:string} , {name:notes,type:string}, {name:stamp,type:int} ], defaultFS: hdfs://cdh02:8020, fieldDelimiter: \t, fileName: erp_c_memcard_class_group_full.data, fileType: orc, path: /zhiyun/shihaihong/ods/erp_c_memcard_class_group_full, writeMode: truncate } } } ], setting: { speed: { channel: 3 } } } } /zhiyun/shihaihong/jobs/erp_c_memcard_class_group_full.json echo 开始抽取 hadoop fs -mkdir -p /zhiyun/shihaihong/ods/erp_c_memcard_class_group_full python /opt/datax/bin/datax.py /zhiyun/shihaihong/jobs/erp_c_memcard_class_group_full.json echo hive 建表 beeline -u jdbc:hive2://localhost:10000 -n root -p 123 -e create database if not exists ods_shihaihong location /zhiyun/shihaihong/ods; -- 增量表 create external table if not exists ods_shihaihong.c_memcard_class_group_full( createtime string, createuser string,groupid int, groupname string, notes string, stamp int ) row format delimited fields terminated by \t lines terminated by \n stored as orc location /zhiyun/shihaihong/ods/erp_c_memcard_class_group_full;# echo 加载数据 # beeline -u jdbc:hive2://localhost:10000 -n root -p 123 -e # load data inpath \/zhiyun/shihaihong/tmp/erp_c_memcard_class_group_full/*\ overwrite into table ods_shihaihong.erp_c_memcard_class_group_full partition(createtime$day); # echo 验证数据 beeline -u jdbc:hive2://localhost:10000 -n root -p 123 -e select count(1) from ods_shihaihong.c_memcard_class_group_full;echo 抽取完成 erp.u_memcard_reg_c: #!/bin/bash echo 生成全量配置文件 mkdir -p /zhiyun/shihaihong/jobs echo { job: { content: [ { reader: { name: mysqlreader,parameter: { column: [*], connection: [ { jdbcUrl: [ jdbc:mysql://zhiyun.pub:233 06/erp?useSSLfalse ], table: [ u_memcard_reg_c ] } ], password: zhiyun, username: zhiyun } }, writer: { name: hdfswriter, parameter: { column: [ {name: id,type:int}, {name:memcardno,type:string} , {name:sickness,type:string} , {name:status,type:string} ], defaultFS: hdfs://cdh02:8020, fieldDelimiter: \t, fileName: erp_u_memcard_reg_c_full.data, fileType: orc, path: /zhiyun/shihaihong/ods/erp_u_memcard_reg_c_full, writeMode: truncate } } } ], setting: { speed: {channel: 3 } } } } /zhiyun/shihaihong/jobs/erp_u_memcard_reg_c_full.json echo 开始抽取 hadoop fs -mkdir -p /zhiyun/shihaihong/ods/erp_u_memcard_reg_c_full python /opt/datax/bin/datax.py /zhiyun/shihaihong/jobs/erp_u_memcard_reg_c_full.json echo hive 建表 beeline -u jdbc:hive2://localhost:10000 -n root -p 123 -e create database if not exists ods_shihaihong location /zhiyun/shihaihong/ods; -- 增量表 create external table if not exists ods_shihaihong.u_memcard_reg_c_full( id int, memcardno string, sickness string, status string ) row format delimited fields terminated by \t lines terminated by \n stored as orc location /zhiyun/shihaihong/ods/erp_u_memcard_reg_c_full;# echo 加载数据 # beeline -u jdbc:hive2://localhost:10000 -n root -p 123 -e # load data inpath \/zhiyun/shihaihong/tmp/erp_u_memcard_reg_c_full/*\ overwrite into table ods_shihaihong.erp_u_memcard_reg_c_full partition(createtime$day); # echo 验证数据 beeline -u jdbc:hive2://localhost:10000 -n root -p 123 -e select count(1) from ods_shihaihong.u_memcard_reg_c_full; echo 抽取完成 his.chronic_patient_info_new: #!/bin/bash echo 生成全量配置文件 mkdir -p /zhiyun/shihaihong/jobs echo { job: { content: [ { reader: { name: mysqlreader, parameter: { column: [*], connection: [ { jdbcUrl: [ jdbc:mysql://zhiyun.pub:233 06/his?useSSLfalse ], table: [ chronic_patient_info_new ] } ], password: zhiyun, username: zhiyun } }, writer: { name: hdfswriter, parameter: { column: [ {name: id,type:int},{name:member_id,type:string} , {name:erp_code,type:string} , {name:extend,type:string}, {name:detect_time,type:strin g}, {name:bec_chr_mbr_date,type: string} ], defaultFS: hdfs://cdh02:8020, fieldDelimiter: \t, fileName: his_chronic_patient_info_new_full.data, fileType: orc, path: /zhiyun/shihaihong/ods/his_chronic_patient_info_new_full, writeMode: truncate } } } ], setting: { speed: { channel: 3 } } } } /zhiyun/shihaihong/jobs/his_chronic_patient_info_new_full.json echo 开始抽取 hadoop fs -mkdir -p /zhiyun/shihaihong/ods/his_chronic_patient_info_new_full python /opt/datax/bin/datax.py /zhiyun/shihaihong/jobs/his_chronic_patient_info_new_full.json echo hive 建表 beeline -u jdbc:hive2://localhost:10000 -n root -p 123 -e create database if not exists ods_shihaihong location /zhiyun/shihaihong/ods; -- 增量表create external table if not exists ods_shihaihong.chronic_patient_info_new_full( id int, member_id string, erp_code string, extend string, detect_time string, bec_chr_mbr_date string ) row format delimited fields terminated by \t lines terminated by \n stored as orc location /zhiyun/shihaihong/ods/his_chronic_patient_info_new_full;# echo 加载数据 # beeline -u jdbc:hive2://localhost:10000 -n root -p 123 -e # load data inpath \/zhiyun/shihaihong/tmp/his_chronic_patient_info_new_full/*\ overwrite into table ods_shihaihong.his_chronic_patient_info_new_full partition(createtime$day); # echo 验证数据 beeline -u jdbc:hive2://localhost:10000 -n root -p 123 -e select count(1) from ods_shihaihong.chronic_patient_info_new_full;echo 抽取完成 erp.c_org_busi : #!/bin/bash echo 生成全量配置文件 mkdir -p /zhiyun/shihaihong/jobsecho { job: { content: [ { reader: { name: mysqlreader, parameter: { column: [*], connection: [ { jdbcUrl: [ jdbc:mysql://zhiyun.pub:233 06/erp?useSSLfalse ], table: [ c_org_busi ] } ], password: zhiyun, username: zhiyun } }, writer: { name: hdfswriter, parameter: { column: [ {name: id,type:int}, {name:busno,type:string}, {name:orgname,type:string}, {name:orgsubno,type:string} , {name:orgtype,type:string}, {name:salegroup,type:string} , {name:org_tran_code,type:str ing}, {name:accno,type:string}, {name:sendtype,type:string} , {name:sendday,type:string},{name:maxday,type:string}, {name:minday,type:string}, {name:notes,type:string}, {name:stamp,type:string}, {name:status,type:string}, {name:customid,type:string} , {name:whl_vendorno,type:stri ng}, {name:whlgroup,type:string} , {name:rate,type:string}, {name:creditamt,type:string} , {name:creditday,type:string} , {name:peoples,type:string}, {name:area,type:string}, {name:abc,type:string}, {name:address,type:string}, {name:tel,type:string}, {name:principal,type:string} , {name:identity_card,type:str ing}, {name:mobil,type:string}, {name:corporation,type:strin g}, {name:saler,type:string}, {name:createtime,type:string }, {name:bank,type:string}, {name:bankno,type:string}, {name:bak1,type:string}, {name:bak2,type:string}, {name:a_bak1,type:string}, {name:aa_bak1,type:string}, {name:b_bak1,type:string}, {name:bb_bak1,type:string}, {name:y_bak1,type:string}, {name:t_bak1,type:string}, {name:ym_bak1,type:string}, {name:tm_bak1,type:string},{name:supervise_code,type:st ring}, {name:monthrent,type:string} , {name:wms_warehid,type:strin g}, {name:settlement_cycle,type: string}, {name:apply_cycle,type:strin g}, {name:applydate,type:string} , {name:accounttype,type:strin g}, {name:applydate_last,type:st ring}, {name:paymode,type:string}, {name:yaolian_flag,type:stri ng}, {name:org_longitude,type:str ing}, {name:org_latitude,type:stri ng}, {name:org_province,type:stri ng}, {name:org_city,type:string} , {name:org_area,type:string} , {name:business_time,type:str ing}, {name:yaolian_group,type:str ing}, {name:pacard_storeid,type:st ring}, {name:opening_time,type:stri ng}, {name:ret_ent_id,type:string }, {name:ent_id,type:string} ], defaultFS: hdfs://cdh02:8020, fieldDelimiter: \t,fileName: erp_c_org_busi_full.data, fileType: orc, path: /zhiyun/shihaihong/ods/erp_c_org_busi_full, writeMode: truncate } } } ], setting: { speed: { channel: 3 } } } } /zhiyun/shihaihong/jobs/erp_c_org_busi_full.json echo 开始抽取 hadoop fs -mkdir -p /zhiyun/shihaihong/ods/erp_c_org_busi_full python /opt/datax/bin/datax.py /zhiyun/shihaihong/jobs/erp_c_org_busi_full.json echo hive 建表 beeline -u jdbc:hive2://localhost:10000 -n root -p 123 -e create database if not exists ods_shihaihong location /zhiyun/shihaihong/ods; -- 增量表 create external table if not exists ods_shihaihong.c_org_busi_full( id int, busno string, orgname string, orgsubno string, orgtype string, salegroup string, org_tran_code string, accno string, sendtype string, sendday string, maxday string, minday string, notes string, stamp string,status string, customid string, whl_vendorno string, whlgroup string, rate string, creditamt string, creditday string, peoples string, area string, abc string, address string, tel string, principal string, identity_card string, mobil string, corporation string, saler string, createtime string, bank string, bankno string, bak1 string, bak2 string, a_bak1 string, aa_bak1 string, b_bak1 string, bb_bak1 string, y_bak1 string, t_bak1 string, ym_bak1 string, tm_bak1 string, supervise_code string, monthrent string, wms_warehid string, settlement_cycle string, apply_cycle string, applydate string, accounttype string, applydate_last string, paymode string, yaolian_flag string, org_longitude string, org_latitude string, org_province string,org_city string, org_area string, business_time string, yaolian_group string, pacard_storeid string, opening_time string, ret_ent_id string, ent_id string ) row format delimited fields terminated by \t lines terminated by \n stored as orc location /zhiyun/shihaihong/ods/erp_c_org_busi_full;# echo 加载数据 # beeline -u jdbc:hive2://localhost:10000 -n root -p 123 -e # load data inpath \/zhiyun/shihaihong/tmp/erp_c_org_busi_full/*\ overwrite into table ods_shihaihong.erp_c_org_busi_full partition(createtime$day); # echo 验证数据 beeline -u jdbc:hive2://localhost:10000 -n root -p 123 -e select count(1) from ods_shihaihong.c_org_busi_full;echo 抽取完成 erp.c_code_value: #!/bin/bash echo 生成全量配置文件 mkdir -p /zhiyun/shihaihong/jobs echo { job: {content: [ { reader: { name: mysqlreader, parameter: { column: [*], connection: [ { jdbcUrl: [ jdbc:mysql://zhiyun.pub:233 06/erp?useSSLfalse ], table: [ c_code_value ] } ], password: zhiyun, username: zhiyun } }, writer: { name: hdfswriter, parameter: { column: [ {name: id,type:int}, {name:cat_name,type:string} , {name:cat_code,type:string} , {name:val_name,type:string} , {name:var_desc,type:string} ], defaultFS: hdfs://cdh02:8020, fieldDelimiter: \t, fileName: erp_c_code_value_full.data, fileType: orc, path: /zhiyun/shihaihong/ods/erp_c_code_value_full, writeMode: truncate} } } ], setting: { speed: { channel: 3 } } } } /zhiyun/shihaihong/jobs/erp_c_code_value_full.json echo 开始抽取 hadoop fs -mkdir -p /zhiyun/shihaihong/ods/erp_c_code_value_full python /opt/datax/bin/datax.py /zhiyun/shihaihong/jobs/erp_c_code_value_full.json echo hive 建表 beeline -u jdbc:hive2://localhost:10000 -n root -p 123 -e create database if not exists ods_shihaihong location /zhiyun/shihaihong/ods; -- 增量表 create external table if not exists ods_shihaihong.c_code_value_full( id int, cat_name string, cat_code string, val_name string, var_desc string ) row format delimited fields terminated by \t lines terminated by \n stored as orc location /zhiyun/shihaihong/ods/erp_c_code_value_full;# echo 加载数据 # beeline -u jdbc:hive2://localhost:10000 -n root -p 123 -e # load data inpath \/zhiyun/shihaihong/tmp/erp_c_code_value_full/*\ overwrite into table ods_shihaihong.erp_c_code_value_full partition(createtime$day); # echo 验证数据 beeline -u jdbc:hive2://localhost:10000 -n root -p 123 -e select count(1) from ods_shihaihong.c_code_value_full;echo 抽取完成 任务调度 erp.u_memcard_reg: erp.c_memcard_class_group: erp.u_memcard_reg_c: his.chronic_patient_info_new: erp.c_org_busi : erp.c_code_value : 将码值表文件上传到 data 中用 python 写一个数据清洗的脚本 #!/bin/python3 import os import pandas as pd from openpyxl import load_workbook ss lst[]#使用 pandas 的 read_excel 函数读取指定路径的 Excel 文件。 sheet_nameNone 表示读取文件中的所有工作表而 header2 表示数据的表 头位于第 3 行索引从 0 开始 dfs pd.read_excel(/zhiyun/shihaihong/data/12.码值 表.xlsx,sheet_nameNone,header2) dirlist(dfs.keys()) #获取 xlsx 文件数据 for i in range(len(dir)): if i1: #获取 A2 行数据 wb load_workbook(filename/zhiyun/shihaihong/data/12. 码值表.xlsx) str_head wb[dir[i]][A2].value datadfs[dir[i]] #获取其它行数据 lst1[] for i in data.columns: for j in range(len(data)): if data[i][j] ! NaN: lst1.append(str(data[i][j])) nint(len(lst1)/2) for i in range(n): ssf{str_head.split(-)[0]}|{str_head.split(-)[ 1]}|{lst1[i]}|{lst1[in]} lst.append(ss) print(写入数据到 data) template_path /zhiyun/shihaihong/data/code_value.txt with open(template_path,w,encodingutf-8) as f: content\n.join(lst) f.write(content) f.close print(上传 data 文件 到 hdfs) os.system(fhdfs dfs -mkdir -p /zhiyun/shihaihong/filetxt/) os.system(fhdfs dfs -put {template_path} /zhiyun/shihaihong/filetxt/) #!/bin/bash # 作用: 完成从编写配置文件到验证数据的整个过程 # 需要在任何节点都可以执行 # 创建本人文件夹 mkdir -p /zhiyun/shihaihong/data /zhiyun/shihaihong/jobs /zhiyun/shihaihong/python /zhiyun/shihaihong/shell /zhiyun/shihaihong/sql echo hive 建表 beeline -u jdbc:hive2://localhost:10000 -n root -p 123 -e create database if not exists ods_shihaihong location /zhiyun/shihaihong/ods; create external table if not exists ods_shihaihong.c_code_value_full( cat_name string, cat_code string, val_name string, var_desc string ) row format delimited fields terminated by | lines terminated by \n stored as textfile location /zhiyun/shihaihong/filetxt; echo hive 建表完成 echo 验证数据 beeline -u jdbc:hive2://localhost:10000 -n root -p 123 -e select count(1) from ods_shihaihong.c_code_value_full;echo 验证完成 执行后用 shell 脚本抽取   任务调度 作业2 完成 2 张增量表的处理 历史数据调度任务 增量调度任务 4 个调度任务 所有表最后都要跟源表的总数进行对比 需要一致 抽取的增量表 erp.u_sale_m 先 做 全 量 ( 一 次 性 ) 再 做 增 量 ( 每 天 执 行 ) ods_lijinquan.erp_u_sale_m_inc erp.u_sale_pay 同上 增量 ods_lijinquan.erp_u_sale_pay_inc 首次抽取为全量抽取 erp_u_sale_m_full.sh: #!/bin/bash echo 生成全量配置文件 mkdir -p /zhiyun/shihaihong/jobs echo { job: { content: [ { reader: { name: mysqlreader, parameter: { column: [*], connection: [ { jdbcUrl: [ jdbc:mysql://zhiyun.pub:233 06/erp?useSSLfalse ], table: [ u_sale_m] } ], password: zhiyun, username: zhiyun } }, writer: { name: hdfswriter, parameter: { column: [ {name:id,type:int}, {name:saleno,type:string}, {name:busno,type:string}, {name:posno,type:string}, {name:extno,type:string}, {name:extsource,type:string} , {name:o2o_trade_from,type:st ring}, {name:channel,type:int}, {name:starttime,type:string} , {name:finaltime,type:string} , {name:payee,type:string}, {name:discounter,type:string }, {name:crediter,type:string} , {name:returner,type:string} , {name:warranter1,type:string }, {name:warranter2,type:string }, {name:stdsum,type:string}, {name:netsum,type:string}, {name:loss,type:string}, {name:discount,type:float}, {name:member,type:string}, {name:precash,type:string}, {name:stamp,type:string},{name:shiftid,type:string}, {name:shiftdate,type:string} , {name:yb_saleno,type:string } ], defaultFS: hdfs://cdh02:8020, fieldDelimiter: \t, fileName: erp_u_sale_m_full.data, fileType: orc, path: /zhiyun/shihaihong/ods/erp_u_sale_m_full, writeMode: truncate } } } ], setting: { speed: { channel: 3 } } } } /zhiyun/shihaihong/jobs/erp_u_sale_m_full.json echo 开始抽取 hadoop fs -mkdir -p /zhiyun/shihaihong/ods/erp_u_sale_m_full python /opt/datax/bin/datax.py /zhiyun/shihaihong/jobs/erp_u_sale_m_full.json echo hive 建表 beeline -u jdbc:hive2://localhost:10000 -n root -p 123 -e create database if not exists ods_shihaihong location /zhiyun/shihaihong/ods; -- 增量表 create external table if not exists ods_shihaihong.u_sale_m_full( id int, saleno string, busno string, posno string, extno string, extsource string, o2o_trade_from string,channel int, starttime string, finaltime string, payee string, discounter string, crediter string, returner string, warranter1 string, warranter2 string, stdsum string, netsum string, loss string, discount float, member string, precash string, stamp string, shiftid string, shiftdate string, yb_saleno string ) row format delimited fields terminated by \t lines terminated by \n stored as orc location /zhiyun/shihaihong/ods/erp_u_sale_m_full;# echo 加载数据 # beeline -u jdbc:hive2://localhost:10000 -n root -p 123 -e # load data inpath \/zhiyun/shihaihong/tmp/erp_u_sale_m_full/*\ overwrite into table ods_shihaihong.erp_u_sale_m_full partition(createtime$day); # echo 验证数据 beeline -u jdbc:hive2://localhost:10000 -n root -p 123 -e select count(1) from ods_shihaihong.u_sale_m_full;echo 抽取完成 写入任务调度平台 将 sh 文件内容复制入 GLUE IDE 中执行一次 后续用增量抽取 #!/bin/bash day$(date -d yesterday %Y-%m-%d) if [ $1 ! ]; thenday$(date -d $1 -1 day %Y-%m-%d); fi; echo 抽取的日期为 $day echo 生成增量配置文件 mkdir -p /zhiyun/shihaihong/jobs echo { job: { content: [ { reader: { name: mysqlreader, parameter: { connection: [ { jdbcUrl: [ jdbc:mysql://zhiyun.pub:233 06/erp?useSSLfalse ], querySql: [ select * from u_sale_m where stamp between \$day 00:00:00\ and \$day 23:59:59\ and id0 ] } ], password: zhiyun, username: zhiyun } }, writer: { name: hdfswriter, parameter: { column: [ {name:id,type:int}, {name:saleno,type:string}, {name:busno,type:string}, {name:posno,type:string}, {name:extno,type:string},{name:extsource,type:string} , {name:o2o_trade_from,type:str ing}, {name:channel,type:int}, {name:starttime,type:string} , {name:finaltime,type:string} , {name:payee,type:string}, {name:discounter,type:string} , {name:crediter,type:string}, {name:returner,type:string}, {name:warranter1,type:string} , {name:warranter2,type:string} , {name:stdsum,type:string}, {name:netsum,type:string}, {name:loss,type:string}, {name:discount,type:float}, {name:member,type:string}, {name:precash,type:string}, {name:stamp,type:string}, {name:shiftid,type:string}, {name:shiftdate,type:string} , {name:yb_saleno,type:string} ], defaultFS: hdfs://cdh02:8020, fieldDelimiter: \t, fileName: erp_u_sale_m_inc.data, fileType: orc, path: /zhiyun/shihaihong/tmp/erp_u_sale_m_inc, writeMode: truncate } } } ], setting: { speed: {channel: 2 } } } } /zhiyun/shihaihong/jobs/erp_u_sale_m_inc.json echo 开始抽取 hadoop fs -mkdir -p /zhiyun/shihaihong/tmp/erp_u_sale_m_inc python /opt/datax/bin/datax.py /zhiyun/shihaihong/jobs/erp_u_sale_m_inc.json echo hive 建表 beeline -u jdbc:hive2://localhost:10000 -n root -p 123 -e create database if not exists ods_shihaihong location /zhiyun/shihaihong/ods; create external table if not exists ods_shihaihong.erp_u_sale_m_inc( id int, saleno string, busno string, posno string, extno string, extsource string, o2o_trade_from string, channel int, starttime string, finaltime string, payee string, discounter string, crediter string, returner string, warranter1 string, warranter2 string, stdsum string, netsum string, loss string, discount float, member string, precash string, shiftid string, shiftdate string, yb_saleno string) partitioned by (stamp string) row format delimited fields terminated by \t lines terminated by \n stored as orc location /zhiyun/shihaihong/ods/erp_u_sale_m_inc;echo 加载数据 beeline -u jdbc:hive2://localhost:10000 -n root -p 123 -e load data inpath /zhiyun/shihaihong/tmp/erp_u_sale_m_inc/* overwrite into table ods_shihaihong.erp_u_sale_m_inc partition(stamp$day);echo 验证数据 beeline -u jdbc:hive2://localhost:10000 -n root -p 123 -e show partitions ods_shihaihong.erp_u_sale_m_inc; select count(1) from ods_shihaihong.erp_u_sale_m_inc where stamp$day; select * from ods_shihaihong.erp_u_sale_m_inc where stamp $day limit 5;echo 抽取完成 任务调度 执行一次输入参数 erp_u_sale_pay_inc 全量抽取 #!/bin/bash echo 生成全量配置文件 mkdir -p /zhiyun/shihaihong/jobsecho { job: { content: [ { reader: { name: mysqlreader, parameter: { column: [*], connection: [ { jdbcUrl: [ jdbc:mysql://zhiyun.pub:233 06/erp?useSSLfalse ], table: [ u_sale_pay ] } ], password: zhiyun, username: zhiyun } }, writer: { name: hdfswriter, parameter: { column: [ {name: id,type:int}, {name:saleno,type:string}, {name:cardno,type:string}, {name:netsum,type:string}, {name:paytype,type:string}, {name:bak1,type:string} ], defaultFS: hdfs://cdh02:8020, fieldDelimiter: \t, fileName: erp_u_sale_pay_full.data, fileType: orc, path: /zhiyun/shihaihong/ods/erp_u_sale_pay_full, writeMode: truncate} } } ], setting: { speed: { channel: 3 } } } } /zhiyun/shihaihong/jobs/erp_u_sale_pay_full.json echo 开始抽取 hadoop fs -mkdir -p /zhiyun/shihaihong/ods/erp_u_sale_pay_full python /opt/datax/bin/datax.py /zhiyun/shihaihong/jobs/erp_u_sale_pay_full.json echo hive 建表 beeline -u jdbc:hive2://localhost:10000 -n root -p 123 -e create database if not exists ods_shihaihong location /zhiyun/shihaihong/ods; -- 增量表 create external table if not exists ods_shihaihong.u_sale_pay_full( id int, saleno string, cardno string, netsum string, paytype string, bak1 string ) row format delimited fields terminated by \t lines terminated by \n stored as orc location /zhiyun/shihaihong/ods/erp_u_sale_pay_full;# echo 加载数据 # beeline -u jdbc:hive2://localhost:10000 -n root -p 123 -e # load data inpath \/zhiyun/shihaihong/tmp/erp_u_sale_pay_full/*\ overwrite into table ods_shihaihong.erp_u_sale_pay_full partition(createtime$day);# echo 验证数据 beeline -u jdbc:hive2://localhost:10000 -n root -p 123 -e select count(1) from ods_shihaihong.u_sale_pay_full;echo 抽取完成 任务调度 编辑 GLUE IDE, 执行一次 增量抽取 #!/bin/bash day$(date -d yesterday %Y-%m-%d) if [ $1 ! ]; then day$(date -d $1 -1 day %Y-%m-%d); fi; echo 抽取的日期为 $day echo 生成增量配置文件 mkdir -p /zhiyun/shihaihong/jobs echo { job: { content: [ { reader: { name: mysqlreader, parameter: { column: [*], connection: [ { jdbcUrl: [ jdbc:mysql://zhiyun.pub:233 06/erp?useSSLfalse ], table: [ select u_sale_pay.*,stamp from u_sale_pay left join u_sale_m on u_sale_pay.salenou_sale_m.saleno where stamp between \$day 00:00:00\ and \$day 23:59:59\ and id0 ] } ], password: zhiyun, username: zhiyun }}, writer: { name: hdfswriter, parameter: { column: [ {name: id,type:int}, {name:saleno,type:string}, {name:cardno,type:string}, {name:netsum,type:string}, {name:paytype,type:string}, {name:bak1,type:string}, {name:stamp,type:string} ], defaultFS: hdfs://cdh02:8020, fieldDelimiter: \t, fileName: erp_u_sale_pay_inc.data, fileType: orc, path: /zhiyun/shihaihong/tmp/erp_u_sale_pay_inc, writeMode: truncate } } } ], setting: { speed: { channel: 3 } } } }/zhiyun/shihaihong/jobs/erp_u_sale_play_inc.json echo 开始抽取 hadoop fs -mkdir -p /zhiyun/shihaihong/tmp/erp_u_sale_play_inc python /opt/datax/bin/datax.py /zhiyun/shihaihong/jobs/erp_u_sale_play_inc.json echo hive 建表 beeline -u jdbc:hive2://localhost:10000 -n root -p 123 -e create database if not exists ods_shihaihong location /zhiyun/shihaihong/ods; create external table if not exists ods_shihaihong.erp_u_sale_play_inc( id int, saleno string, cardno string, netsum string, paytype string, bak1 string ) partitioned by (stamp string) row format delimited fields terminated by \t lines terminated by \n stored as orc location /zhiyun/shihaihong/ods/erp_u_sale_play_inc;echo 加载数据 beeline -u jdbc:hive2://localhost:10000 -n root -p 123 -e load data inpath /zhiyun/shihaihong/tmp/erp_u_sale_play_inc/* overwrite into table ods_shihaihong.erp_u_sale_play_inc partition(stamp$day);echo 验证数据 beeline -u jdbc:hive2://localhost:10000 -n root -p 123 -e show partitions ods_shihaihong.erp_u_sale_play_inc; select count(1) from ods_shihaihong.erp_u_sale_play_inc where stamp $day; select * from ods_shihaihong.erp_u_sale_play_inc where stamp $day limit 5;echo 抽取完成 任务调度 执行一次
http://www.dnsts.com.cn/news/29093.html

相关文章:

  • 铜陵公司做网站盐城网站建设推广
  • 外国高端网站设计检察 网站建设
  • flash手机网站制作怎样做搜索引擎推广
  • 潍坊建设网站的公司电话学校门户网站建设的优势
  • 建筑公司网站首页图片门户网站优化方案
  • 郫都区规划建设局网站wordpress修改模板技巧
  • 长兴做网站wordpress模板 简单
  • jeecms可以做网站卖吗织梦动漫网站模板
  • 网站关键词优化代理军队信息化建设网站
  • 无极网络信息科技网站建设与seo
  • 连云港做网站优化家庭装潢设计
  • 网站建设完成后 下一步做什么wordpress安装与配置
  • 我公司想做网站做视频特效的网站
  • 浦江建设局网站asp 手机网站
  • 做艺术品展览的网站东莞广告公司招聘
  • 西安做网站费用江苏河海建设有限公司官方网站
  • 网站如何建立数据库那个网站适合学生做兼职
  • 搬家网站建设公司批量刷wordpress评论
  • 电子兼职网站建设dz整站免费网站建设
  • 网站页面分辨率wordpress音频
  • 沈阳制作网站的公司有哪些一个完整的短视频策划方案
  • 上海网站建设报价单教育局网站建设方案
  • 网站访客qq提取企业软件解决方案
  • 自己做的网站怎么放到小程序无限空间 网站
  • 网站建设会使用的技术中国建设机械教育协会网站
  • 广州康体设备网站建设网络营销是什么的重要组成部分
  • 单产品 网站领动做的网站怎么样
  • 如何做自己的电影网站wordpress自动提交百度
  • 做aa视频网站网站建设使用多语言
  • 求推荐公司网站建设印象笔记 wordpress