广州网站建设培训学校,wordpress菜单字体大小,电子商务网站功能需求,wordpress 仿糗百Spring boot装载模板代码涉及的子模块及准备省心Clickhouse批量写JSON多层级数据自动映射值模板代码生成及移交控制权给Spring IOC涉及的子模块及准备 
最近比较有空#xff0c;之前一直好奇#xff0c;提交到线上考试的代码是如何执行测试的#xff0c;在实现了基础的demo后…
Spring boot装载模板代码涉及的子模块及准备省心Clickhouse批量写JSON多层级数据自动映射值模板代码生成及移交控制权给Spring IOC涉及的子模块及准备 
最近比较有空之前一直好奇提交到线上考试的代码是如何执行测试的在实现了基础的demo后进一步希望加载到Spring上支持动态执行 经过一段时间琢磨之后终于完成了基础版本其中也结合了近来自己封装的消组件省心Clickhouse批量写、JSON多层级数据自动映射值。 
省心Clickhouse批量写 
在刚刚使用Clickhouse大批量写入时经常会出现Clickhouse机器cpu飙升导致查询不可用等情况在研究了Clickhouse JDBC的官网说明文档以及Clickhouse文档链接: CK官方文档后明白Clickhouse写入方式支持多种直接使用MyBatis-Plus的批量写入方式就会出现cpu问题还有一种格式结构化写入类似于HBase文件块写入但是需要知道表字段和类型 
// 消耗比较小的批量写入方式
insert into table_name select %s from input(%s);第三种写入方式是基于JSON格式csv等都支持参考Ck文档的Formats for Input and Output Data部分写入Clickhouse会自动匹配表字段和数据的键值对非空字段严格校验之外其他字段都会根据数据填充对应的值。 注意不能做数据更新只做插入处理忽略已存在的排序主键数据更新数据混杂在新增数据时只执行插入数据 
// 
INSERT INTO afanti_aweme_info_all FORMAT JSONEachRow JSON1\nJSON2目前只实现第二种写入方式基本做到了自动生成input 字段及其格式关键代码如下 public interface MyIServiceT extends IServiceT {void saveBatchRecordsByInput(ListT records) throws Exception;} public class MyServiceImplM extends MyBaseMapperT, T extends ServiceImplM , T implements MyIServiceT {protected Log log  LogFactory.getLog(this.getClass());protected ClassM mapperClass  this.currentMapperClass();protected ClassT entityClass  this.currentModelClass();Autowiredprivate SqlSessionTemplate sqlSessionTemplate;Autowiredprotected M myBaseMapper;protected Logger LOGGER  LoggerFactory.getLogger(this.getClass());private final ConcurrentHashMapString, String sqlMap  new ConcurrentHashMap();private final ConcurrentHashMapString, ListString columnMap  new ConcurrentHashMap();Overridepublic void saveBatchRecordsByInput(ListT records) throws Exception {String key  this.mapperClass.getSimpleName()  .saveBatchRecordsByInput;TableName tableNameAnn  this.entityClass.getAnnotation(TableName.class);// _all结束代表ck的分布式表此处需要获取本地表的字段和类型String tableName  tableNameAnn.value().endsWith(_all) ? tableNameAnn.value().substring(0, tableNameAnn.value().length() - 4) : tableNameAnn.value();if (!sqlMap.containsKey(key)) {buildSql(tableName, key);}String mapperSql  sqlMap.get(key);Connection connection  getConnection();PreparedStatement ps  connection.prepareStatement(mapperSql);try {ListString fieldNames  columnMap.get(key);for (T record : records) {int i  1;for (String column : fieldNames) {Field field;try {field  entityClass.getDeclaredField(column);} catch (NoSuchFieldException e) {field  entityClass.getSuperclass().getDeclaredField(column);}field.setAccessible(true);Object val  field.get(record);// 数组类型处理if (val instanceof String[]) {ps.setArray(i, connection.createArrayOf(String, (String[]) val));} else if (val instanceof Long[]) {ps.setArray(i, connection.createArrayOf(Long, (Long[]) val));} else if (val instanceof Integer[]) {ps.setArray(i, connection.createArrayOf(Integer, (Integer[]) val));} else if (val instanceof Date) {// 特殊字段格式处理if (statisticsDay.equals(column)) {ps.setObject(i, DateUtils.format((Date) val, DateUtils.SDF_YYYY_MM_DD));} else {ps.setObject(i, DateUtils.format((Date) val));}} else {ps.setObject(i, val);}i;}ps.addBatch();}ps.executeBatch();ps.clearBatch();} finally {connection.close();}}private synchronized void buildSql(String tableName, String key) throws SQLException {Connection connection  getConnection();PreparedStatement ps  connection.prepareStatement(String.format(query, tableName, rawdata));ResultSet set  ps.executeQuery();StringJoiner columns  new StringJoiner(,, , );StringJoiner columnAndTypes  new StringJoiner(,, , );ListString columsList  new ArrayList();while (set.next()) {String column  set.getString(col_name);String dataType  set.getString(data_type);columns.add(column);columnAndTypes.add(column     dataType);columsList.add(toHumpString(column));}connection.close();// 写入数据时需要写入到分布式表分布式表根据分布式键规则把数据分发到对应机器的本地表上存储String querySt  String.format(querySQL, tableName  _all, columns.toString(), columnAndTypes.toString());sqlMap.putIfAbsent(key, querySt);columnMap.putIfAbsent(key, columsList);}private static String toHumpString(String string) {StringBuilder stringBuilder  new StringBuilder();String[] str  string.split(_);for (String string2 : str) {if(stringBuilder.length()  0){stringBuilder.append(string2);}else {stringBuilder.append(string2.substring(0, 1).toUpperCase());stringBuilder.append(string2.substring(1));}}return stringBuilder.toString();}String querySQL  insert into %s select %s from input(%s) ;// 获取连接池中的链接public Connection getConnection() {Connection conn  null;try {SqlSession sqlSession  sqlSessionTemplate.getSqlSessionFactory().openSession();conn  sqlSession.getConfiguration().getEnvironment().getDataSource().getConnection();} catch (Exception e) {LOGGER.error(Clickhouse getConnection:{}, e.getMessage());e.printStackTrace();}return conn;}String query  select name as col_name, type as data_type from  system.columns where table  %s and database  %s order by position asc;
} 
扩展一下Mybatis-plus的模板方法并重写一段生成input的字段和类型字符串逻辑即可。 
JSON多层级数据自动映射值 
封装了Json-path包主要内容录如下 
dependencygroupIdcom.jayway.jsonpath/groupIdartifactIdjson-path/artifactIdversion2.6.0/version/dependency 
封装未处理格式属性需要自行处理代码如下 public class JsonPathParseUtil {public static Configuration configuration  Configuration.builder().options(Option.DEFAULT_PATH_LEAF_TO_NULL, Option.SUPPRESS_EXCEPTIONS).build();public static Date parseDate(String dateStr) {String parsedDate;if (dateStr.length()  10) {parsedDate  yyyy-MM-dd;} else {parsedDate  yyyy-MM-dd HH:mm:ss;}try {return org.apache.commons.lang3.time.DateUtils.parseDate(dateStr, parsedDate);} catch (ParseException e) {e.printStackTrace();}return null;}public static T T Json2DTO(String msgString, ClassT clazz) {try {T dto  clazz.newInstance();ReadContext ctx  JsonPath.parse(msgString, configuration);Field[] fields  clazz.getDeclaredFields();for (Field field : fields) {JPath path  field.getAnnotation(JPath.class);if (path ! null) {Object obj  ctx.read(path.value());field.setAccessible(true);if (obj ! null) {if (obj instanceof String) {String value  (String) obj;if (StrUtil.isBlank(value)) {field.set(dto, null);} else if (field.getType().equals(Long.class)) {field.set(dto, Long.parseLong(value));} else if (field.getType().equals(Integer.class)) {field.set(dto, Integer.parseInt(value));} else if (field.getType().equals(Date.class)) {field.set(dto, parseDate(value));} else {field.set(dto, value);}} else if (obj instanceof Map) {Map value  (Map) obj;Object v  JSONObject.toJavaObject(new JSONObject(value), field.getType());field.set(dto, v);} else if (obj instanceof Integer) {Integer value  (Integer) obj;if (field.getType().equals(Long.class)) {field.set(dto, value.longValue());} else if (field.getType().equals(Date.class)) {if (value.toString().length()  10) {field.set(dto, new Date(value.longValue() * 1000));} else if (value.toString().length()  13) {field.set(dto, new Date(value.longValue()));}} else if (field.getType().equals(String.class)) {field.set(dto, value.toString());} else if (field.getType().equals(Float.class)) {field.set(dto, Float.valueOf(value));} else if (field.getType().equals(Double.class)) {field.set(dto, Double.valueOf(value));} else {field.set(dto, value);}} else if (obj instanceof Long) {Long value  (Long) obj;if (field.getType().equals(Integer.class)) {field.set(dto, value.intValue());} else if (field.getType().equals(Date.class)) {if (value.toString().length()  10) {field.set(dto, new Date(value * 1000));} else if (value.toString().length()  13) {field.set(dto, new Date(value));}} else if (field.getType().equals(String.class)) {field.set(dto, value.toString());} else {field.set(dto, value);}} else if (obj instanceof Double) {Double value  (Double) obj;if (field.getType().equals(String.class)) {field.set(dto, value.toString());} else if (field.getType().equals(Float.class)) {field.set(dto, value.floatValue());} else {field.set(dto, value);}} else if (obj instanceof Date) {Date value  (Date) obj;field.set(dto, value);} else if (obj instanceof JSONArray) {JSONArray value  (JSONArray) obj;if (field.getType().equals(String.class)) {field.set(dto, value.toJSONString());} else {Type genericType  field.getGenericType();if (genericType instanceof ParameterizedType) {ParameterizedType pt  (ParameterizedType) genericType;// 得到泛型里的class类型对象Class? actualTypeArgument  (Class?) pt.getActualTypeArguments()[0];List values  com.alibaba.fastjson.JSONArray.parseArray(value.toJSONString(), actualTypeArgument);field.set(dto, values);} else {field.set(dto, value);}}} else if (obj instanceof List) {List value  (List) obj;field.set(dto, value);} else if (obj instanceof JSONObject) {JSONObject value  (JSONObject) obj;if (field.getType().equals(String.class)) {field.set(dto, value.toJSONString());} else {field.set(dto, value);}} else if (obj instanceof Boolean) {Boolean value  (Boolean) obj;if (field.getType().equals(Integer.class)) {field.set(dto, value ? 1 : 0);} else {field.set(dto, value);}} else if (obj instanceof BigDecimal) {BigDecimal value  (BigDecimal) obj;if (field.getType().equals(String.class)) {field.set(dto, value.toString());} else if (field.getType().equals(Double.class)) {field.set(dto, value.doubleValue());} else if (field.getType().equals(BigDecimal.class)) {field.set(dto, value);} else if (field.getType().equals(Float.class)) {field.set(dto, value.floatValue());}}}}}return dto;} catch (IllegalAccessException | InstantiationException iae) {iae.printStackTrace();}return null;}} 
JPath Target({ElementType.FIELD})
Retention(RetentionPolicy.RUNTIME)
Documented
public interface JPath {String value() default ;String format() default ;} 
模板代码生成及移交控制权给Spring IOC 
关键代码如下用模板代码生成对应处理逻辑的代码的字符串包装成java运行中的内存文件获取编译器把内存文件表示的数据加载到编译任务队列执行编译返回Class对象到这一步就是编程线上代码考试的逻辑你提交自己的代码到远程服务器上编译用已经准备好的测试数据反射执行你的方法验证代码逻辑符不符合变成要求。 
接上一步获取Spring运行环境上下文BeanDefinitionBuilder加载Class类配置初始化设置设置Bean的名称注册BeanDefinition通过ApplicationContext以及Bean的名称调用Bean即可。 Data
Component
ConfigurationProperties(rocket-config)
Slf4j
public class CodeRunner implements CommandLineRunner {private ListMq mq;private final String LISTENER_CODE import com.aliyun.openservices.ons.api.Action;\n import com.aliyun.openservices.ons.api.ConsumeContext;\n import com.aliyun.openservices.ons.api.Message;\n import com.aliyun.openservices.ons.api.MessageListener;\n import com.afanticar.transform.util.JsonPathParseUtil;\n import com.afanti.datastreamline.utils.SpringUtils;\n import java.util.ArrayList;\n import java.util.List;\n import org.slf4j.Logger;\n import org.slf4j.LoggerFactory;\n import java.util.Date;\n import java.sql.SQLException;\n import java.lang.reflect.Field;\n import com.afanti.datastreamline.service.AfantiService;\n \n SuppressWarnings(\unchecked\) public class AfantiDouyinDataMessageListener implements MessageListener {\n \n     private final Logger LOGGER  LoggerFactory.getLogger(this.getClass());\n \n     Override\n     public Action consume(Message message, ConsumeContext context) {\n         String msgString  new String(message.getBody());\n         \n         System.out.println(\接收到消息\  msgString);\n         Object afantiDouyinAwemeInfo  SpringUtils.getBean(\afantiDouyinAwemeInfo\);\n         afantiDouyinAwemeInfo  JsonPathParseUtil.Json2DTO(msgString, afantiDouyinAwemeInfo.getClass());\n         try {\n             Field field  afantiDouyinAwemeInfo.getClass().getDeclaredField(\ctime\);\n             field.setAccessible(true);\n             field.set(afantiDouyinAwemeInfo, new Date());\n         } catch (NoSuchFieldException | IllegalAccessException e) {\n             e.printStackTrace();\n         }         List records  new ArrayList();\n         records.add(afantiDouyinAwemeInfo);\n         AfantiService afantiService  (AfantiService) SpringUtils.getBean(\afantiService\);\n         try {\n             afantiService.saveBatchRecordsByInput(\afanti_aweme_info_all\, records);\n         } catch (SQLException exception) {\n             exception.printStackTrace();\n         }\n         return Action.CommitMessage;\n     }\n     \n };private final String CONSUMER_CODE import com.aliyun.openservices.ons.api.MessageListener;\n import com.aliyun.openservices.ons.api.PropertyKeyConst;\n import com.aliyun.openservices.ons.api.bean.ConsumerBean;\n import com.aliyun.openservices.ons.api.bean.Subscription;\n import com.afanti.datastreamline.utils.SpringUtils;\n import java.util.HashMap;\n import java.util.Map;\n import java.util.Properties;\n import com.afanti.datastreamline.config.MqProperties;\n \n public class AfantiDouyinDataConsumer extends ConsumerBean {\n \n     public void initConsumer() {\n         //配置文件\n         MqProperties mqConfig  (MqProperties) SpringUtils.getBean(\mqProperties\);\n         Properties properties  mqConfig.getMqPropertie();\n         // System.out.println(mqConfig.print());\n         properties.setProperty(PropertyKeyConst.GROUP_ID, \GID_AFANTI_CHIN_SURVEY\);\n         //将消费者线程数固定为20个 20为默认值\n         properties.setProperty(PropertyKeyConst.ConsumeThreadNums, \15\);\n         properties.setProperty(PropertyKeyConst.MaxCachedMessageAmount,\1000\);\n         this.setProperties(properties);\n         //订阅关系\n         MapSubscription, MessageListener subscriptionTable  new HashMapSubscription, MessageListener();\n         Subscription subscription  new Subscription();\n         subscription.setTopic(\AFANTI_CHIN_SURVEY\);\n         MessageListener afantiDouyinDataMessageListener  (MessageListener) SpringUtils.getBean(\afantiDouyinDataMessageListener\);\n         subscriptionTable.put(subscription, afantiDouyinDataMessageListener);\n         //订阅多个topic如上面设置\n         this.setSubscriptionTable(subscriptionTable);\n         this.start();\n     }\n };private final String KAFKA_TEST  import lombok.extern.slf4j.Slf4j;\n import org.apache.kafka.clients.consumer.ConsumerRecord;\n import org.springframework.beans.factory.annotation.Autowired;\n import org.springframework.kafka.annotation.KafkaListener;\n import org.springframework.stereotype.Component;\n import org.springframework.kafka.support.Acknowledgment;\n\n import java.util.List;\n \n /**\n  * author Data\n  */\n Slf4j\n Component\n public class AfantiMessageListener {\n \n     KafkaListener(\n             topics  \AFANTI_CHIN_DEV\,\n             containerFactory  \kafkaListenerContainerFactory\,\n             groupId  \chin-test\)\n     public void kafkaListener(ListConsumerRecordString, String messages, Acknowledgment ack) throws Exception {\n         System.out.println(messages.get(0));\n         ack.acknowledge();\n     }\n \n };private final String COLUMNS  {\afanti_douyin_aweme_info\:{\aweme_id\:{\type\:\String\,\path\:\item_id\},\aweme_title\:{\type\:\String\,\path\:\title\},\cover\:{\type\:\String\,\path\:\aweme_cover\},\ctime\:{\type\:\Date\,\path\:\c\}}};Overridepublic void run(String... args) {try {compilerAndRegister(DtoFreemarkerUtil.buildCode(COLUMNS), AfantiDouyinAwemeInfo, null, true);log.info(DTO、Entity代码构建、装载和初始化完成...);compilerAndRegister(LISTENER_CODE, AfantiDouyinDataMessageListener, null, false);log.info(Listener代码构建、装载完成...);compilerAndRegister(CONSUMER_CODE, AfantiDouyinDataConsumer, initConsumer, false);log.info(RocketMq Consumer代码构建、装载完成...);log.info(RocketMq Consumer初始化完成...);compilerAndRegister(KAFKA_TEST, AfantiMessageListener, null, false);} catch (URISyntaxException | TemplateException | IOException e) {e.printStackTrace();}}Datastatic public class Mq {private String gid;private String topic;private String platform;}private void compilerAndRegister(String code, String clazzName, String initMethod, boolean needObject) throws URISyntaxException {JavaCompiler compiler  ToolProvider.getSystemJavaCompiler();StandardJavaFileManager standardFileManager  compiler.getStandardFileManager(null, null, null);ClassJavaFileManager classJavaFileManager  new ClassJavaFileManager(standardFileManager);StringObject stringObject  new StringObject(new URI( clazzName  .java), JavaFileObject.Kind.SOURCE, code);// 加入编译任务队列JavaCompiler.CompilationTask task  compiler.getTask(null, classJavaFileManager, null, null, null,Collections.singletonList(stringObject));Class clazz  null;Object entityObj  null;if (task.call()) {ClassJavaFileObject javaFileObject  classJavaFileManager.getClassJavaFileObject();// 获取AppClassloader加载器ClassLoader classLoader  new MyClassLoader(javaFileObject);try {clazz  classLoader.loadClass(clazzName);if (needObject) {entityObj  clazz.newInstance();}} catch (ClassNotFoundException | IllegalAccessException | InstantiationException e) {e.printStackTrace();}}ApplicationContext ctx  SpringUtils.getApplicationContext();// Spring Ioc Bean工厂DefaultListableBeanFactory defaultListableBeanFactory  (DefaultListableBeanFactory) ctx.getAutowireCapableBeanFactory();assert clazz ! null;// Bean属性等定义器BeanDefinitionBuilder beanDefinitionBuilder;if (needObject) {assert entityObj ! null;beanDefinitionBuilder  BeanDefinitionBuilder.genericBeanDefinition(entityObj.getClass());} else {beanDefinitionBuilder  BeanDefinitionBuilder.genericBeanDefinition(clazz);}// 设置初始化配置 RocketMQ Consumerif (initMethod ! null) {beanDefinitionBuilder.setInitMethodName(initMethod);beanDefinitionBuilder.setDestroyMethodName(shutdown);}beanDefinitionBuilder.setLazyInit(false);String beanName  clazzName.substring(0,1).toLowerCase(Locale.ROOT)  clazzName.substring(1);defaultListableBeanFactory.registerBeanDefinition(beanName, beanDefinitionBuilder.getBeanDefinition());ctx.getBean(beanName);}static class ClassJavaFileManager extends ForwardingJavaFileManager {private ClassJavaFileObject classJavaFileObject;public ClassJavaFileManager(JavaFileManager fileManager) {super(fileManager);}public ClassJavaFileObject getClassJavaFileObject() {return classJavaFileObject;}/**读取Class文件字节流*/Overridepublic JavaFileObject getJavaFileForOutput(JavaFileManager.Location location, String className,JavaFileObject.Kind kind, FileObject sibling) {return (classJavaFileObject  new ClassJavaFileObject(className,kind));}}/**存储源文件*/static class StringObject extends SimpleJavaFileObject {private final String content;public StringObject(URI uri, Kind kind, String content) {super(uri, kind);this.content  content;}//使JavaCompiler可以从content获取java源码Overridepublic CharSequence getCharContent(boolean ignoreEncodingErrors) {return this.content;}}/**class文件不需要存到文件中*/static class ClassJavaFileObject extends SimpleJavaFileObject {ByteArrayOutputStream outputStream;public ClassJavaFileObject(String className, Kind kind) {super(URI.create(className  kind.extension), kind);this.outputStream  new ByteArrayOutputStream();}Overridepublic OutputStream openOutputStream() {return this.outputStream;}//获取输出流为byte[]数组public byte[] getBytes(){return this.outputStream.toByteArray();}}/**自定义classloader*/static class MyClassLoader extends ClassLoader {private final ClassJavaFileObject stringObject;public MyClassLoader(ClassJavaFileObject stringObject){this.stringObject  stringObject;}Overrideprotected Class? findClass(String name) {byte[] bytes  this.stringObject.getBytes();return defineClass(name,bytes,0,bytes.length);}}} 
以上代码参数配置以常量显示线上环境跟Demo有点差别下次会展示最终的线上代码正在修修补补中… 
PS.参考文档已忘…