当前位置: 首页 > news >正文

要想学做网站盘锦门户网站制作

要想学做网站,盘锦门户网站制作,企业做网站还是做平台好,网站建设的岗位叫什么kafka动态认证 自定义认证 安全认证-亲测成功 背景 Kafka默认是没有安全机制的#xff0c;一直在裸奔。用户认证功能#xff0c;是一个成熟组件不可或缺的功能。在0.9版本以前kafka是没有用户认证模块的#xff08;或者说只有SSL#xff09;#xff0c;好在kafka0.9版本…kafka动态认证 自定义认证 安全认证-亲测成功 背景 Kafka默认是没有安全机制的一直在裸奔。用户认证功能是一个成熟组件不可或缺的功能。在0.9版本以前kafka是没有用户认证模块的或者说只有SSL好在kafka0.9版本以后逐渐发布了多种用户认证功能弥补了这一缺陷这里仅介绍SASL认证机制是SASL/PLAIN。 kafka下载安装 我这里用windows做的测试部署到Linux上也是一样 官方下载地址https://kafka.apache.org/downloads 我这里下载的kafka版本是kafka_2.12-3.5.0.tgz 直接解压如下图 启动zookeeper 这里的zookeeper配置其实没有做任何修改zookeeper这里不做认证控制。 zookeeper配置文件在kafka_2.12-3.5.0\config\zookeeper.properties下不用做任何修改 # Licensed to the Apache Software Foundation (ASF) under one or more # contributor license agreements. See the NOTICE file distributed with # this work for additional information regarding copyright ownership. # The ASF licenses this file to You under the Apache License, Version 2.0 # (the License); you may not use this file except in compliance with # the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an AS IS BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # the directory where the snapshot is stored. dataDirD:\kafka_2.12-3.5.0\zookeeper# the port at which the clients will connect clientPort2181 # disable the per-ip limit on the number of connections since this is a non-production config maxClientCnxns0 # Disable the adminserver by default to avoid port conflicts. # Set the port to something non-conflicting if choosing to enable this admin.enableServerfalse # admin.serverPort8080#authProvider.1org.apache.zookeeper.server.auth.SASLAuthenticationProvider #requireClientAuthSchemesasl #jaasLoginRenew3600000进入kafka主目录打开cmd #启动zookeeper bin\windows\zookeeper-server-start.bat config\zookeeper.propertieszookeeper-server-start.bat 启动脚本 echo off rem Licensed to the Apache Software Foundation (ASF) under one or more rem contributor license agreements. See the NOTICE file distributed with rem this work for additional information regarding copyright ownership. rem The ASF licenses this file to You under the Apache License, Version 2.0 rem (the License); you may not use this file except in compliance with rem the License. You may obtain a copy of the License at rem rem http://www.apache.org/licenses/LICENSE-2.0 rem rem Unless required by applicable law or agreed to in writing, software rem distributed under the License is distributed on an AS IS BASIS, rem WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. rem See the License for the specific language governing permissions and rem limitations under the License.IF [%1] EQU [] (echo USAGE: %0 zookeeper.propertiesEXIT /B 1 )rem set KAFKA_OPTS-Djava.security.auth.login.configD:\kafka_2.12-3.5.0\config\kafka_zookeeper_jaas.conf SetLocal IF [%KAFKA_LOG4J_OPTS%] EQU [] (set KAFKA_LOG4J_OPTS-Dlog4j.configurationfile:%~dp0../../config/log4j.properties) IF [%KAFKA_HEAP_OPTS%] EQU [] (set KAFKA_HEAP_OPTS-Xmx512M -Xms512M ) %~dp0kafka-run-class.bat org.apache.zookeeper.server.quorum.QuorumPeerMain %* EndLocal kafka自定义认证配置 kafka的用户认证是基于java的jaas。所以我们需要先添加jaas服务端的配置文件。 在kafka_2.12-3.5.0\config目录下新建kafka_jaas.conf 配置信息如下 KafkaServer {org.apache.kafka.common.security.plain.PlainLoginModule requiredusernameadminpasswordadmin-lianguser_adminadmin-123456user_liangliang-123456; };注意最后一个属性后面需要加分号配置是不难理解的第一行指定PlainLoginModule算是声明这是一个SASL/PLAIN的认证类型如果是其他的那么就需要reqired其他的类。username和password则是用于集群内部broker的认证用的。 这里会让人疑惑的应该是user_admin和user_liang这两个属性了。这个其实是用来定义用户名和密码的形式是这样user_userNamepassword。所以这里其实是定义了用户admin和用户liang对应的密码。 这一点可以在源码的PlainServerCallbackHandler类中找到对应的信息kafka源码中显示对用户认证的时候就会到jaas配置文件中通过user_username属性获取对应username用户的密码再进行校验。当然这样也导致了该配置文件只有重启才会生效即无法动态添加用户。 写完配置后需要在kafka的配置中添加jaas文件的路径。在kafka_2.12-3.5.0/bin/kafka-run-class.sh中找到下面的配置修改KAFKA_OPTS到配置信息。如下 rem Generic jvm settings you want to add IF [%KAFKA_OPTS%] EQU [] (set KAFKA_OPTS )将上述到KAFKA_OPTS修改为: rem Generic jvm settings you want to add IF [%KAFKA_OPTS%] EQU [] (set KAFKA_OPTS-Djava.security.auth.login.configD:\kafka_2.12-3.5.0\config\kafka_jaas.conf )修改Kafka配置文件 配置文件在kafka_2.12-3.5.0\config\server.properties 主要增加如下配置 sasl.enabled.mechanisms PLAIN sasl.mechanism.inter.broker.protocol PLAIN security.inter.broker.protocol SASL_PLAINTEXT listeners SASL_PLAINTEXT://localhost:9092其中SASL_PLAINTEXT的意思是明文传输的意思如果是SSL那么应该是SASL_SSL。 这样就算是配置好kafka broker了接下来启动kafka观察输出日志没有错误一般就没问题了。 进入kafka主目录另外打开一个cmd #启动kafka bin\windows\kafka-server-start.bat config\server.properties使用Kafka客户端工具Kafka Tool连接 此时就可以根据上面配置的用户admin和用户liang和相应的密码去连接了 其他用户或错误的密码连接就会提示没有权限用户或密码错误 动态认证 以上的配置方案除了没有使用SSL加密之外还存在一个严重的缺陷用户信息是通过静态配置文件的方式存储的当对用户信息进行添加、删除和修改的时候都需要重启Kafka集群而我们知道作为消息中间件Kafka的上下游与众多组件相连重启可能造成数据丢失或重复Kafka应当尽量避免重启。 如果要动态增加一个用户得修改kafka_jaas.conf的配置新增加一个用户而且还得重启Kafka这样显然不合适。 解决方案 还好Kafka允许用户为SASL/PLAIN认证机制提供了自定义的回调函数如果不希望采用静态配置文件存储用户认证信息的话只需要编写一个实现了 AuthenticateCallbackHandler 接口的类然后在配置文件中指明这个类即可指明的方法为在Kafka配置文件中添加如下内容 listener.name.sasl_plaintext.plain.sasl.server.callback.handler.classcom.liang.kafka.auth.handler.MyPlainServerCallbackHandler引入相关的maven依赖包pom添加如下依赖包 dependencygroupIdorg.apache.kafka/groupIdartifactIdkafka_2.13/artifactIdversion2.8.1/version/dependencydependencygroupIdcn.hutool/groupIdartifactIdhutool-cache/artifactIdversion5.7.21/version/dependency 动态认证的完整代码如下 package com.liang.kafka.auth.handler;import com.alibaba.druid.pool.DruidDataSource; import com.liang.kafka.auth.util.DataSourceUtil; import com.liang.kafka.auth.util.PasswordUtil; import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.security.JaasContext; import org.apache.kafka.common.security.auth.AuthenticateCallbackHandler; import org.apache.kafka.common.security.plain.PlainAuthenticateCallback; import org.apache.kafka.common.security.plain.PlainLoginModule; import org.slf4j.Logger; import org.slf4j.LoggerFactory;import javax.security.auth.callback.Callback; import javax.security.auth.callback.NameCallback; import javax.security.auth.callback.UnsupportedCallbackException; import javax.security.auth.login.AppConfigurationEntry; import java.io.IOException; import java.sql.Connection; import java.sql.PreparedStatement; import java.sql.ResultSet; import java.sql.SQLException; import java.util.Arrays; import java.util.List; import java.util.Map; import java.util.Objects;import static com.liang.kafka.auth.constants.Constants.*;/*** kafka自定义认证 sasl/plain二次开发* liang*/ public class MyPlainServerCallbackHandler implements AuthenticateCallbackHandler {private static final Logger logger LoggerFactory.getLogger(MyPlainServerCallbackHandler.class);/*** 数据源*/private DruidDataSource dataSource null;/*** 是否开启数据库验证开关*/private boolean enableDbAuth;private static final String JAAS_USER_PREFIX user_;private ListAppConfigurationEntry jaasConfigEntries;Overridepublic void configure(MapString, ? configs, String mechanism, ListAppConfigurationEntry jaasConfigEntries) {//jaas配置信息初始化一次这就是为什么plain无法添加用户this.jaasConfigEntries jaasConfigEntries;logger.info(configs:{}, JSON.toJSONString(configs));Object endbAuthObject configs.get(ENABLE_DB_AUTH);if (Objects.isNull(endbAuthObject)) {logger.error(缺少开关配置 enable_db_auth!);enableDbAuth Boolean.FALSE;return;}enableDbAuth TRUE.equalsIgnoreCase(endbAuthObject.toString());if (!enableDbAuth) {return;}dataSource DataSourceUtil.getInstance(configs);}//核心类获取用户密码后调用authenticate方法Overridepublic void handle(Callback[] callbacks) throws IOException, UnsupportedCallbackException {String username null;for (Callback callback: callbacks) {if (callback instanceof NameCallback)username ((NameCallback) callback).getDefaultName();else if (callback instanceof PlainAuthenticateCallback) {PlainAuthenticateCallback plainCallback (PlainAuthenticateCallback) callback;boolean authenticated authenticate(username, plainCallback.password());plainCallback.authenticated(authenticated);logger.info(认证 username:{},result:{}, username, authenticated);} elsethrow new UnsupportedCallbackException(callback);}}//用户密码是通过获取jaas文件的属性属性名就是JAAS_USER_PREFIX变量当前缀usernameprotected boolean authenticate(String username, char[] password) throws IOException {if (username null || password null) {logger.error(用户名或密码为空!);return false;} else {//先读取配置文件里的用户验证String expectedPassword JaasContext.configEntryOption(jaasConfigEntries,JAAS_USER_PREFIX username,PlainLoginModule.class.getName());logger.info(读取密码 username:{},pwd:{}, username, expectedPassword);boolean jaasUserBool expectedPassword ! null Arrays.equals(password, expectedPassword.toCharArray());if (jaasUserBool) {return true;}//是否开启数据库验证if (enableDbAuth) {return dbAuthenticate(username, password);}return false;}}protected boolean dbAuthenticate(String usernameInfo, char[] passwordCharArray) throws IOException {String password new String(passwordCharArray);logger.info(begin dbAuthenticate usernameInfo:{},password:{}, usernameInfo, password);String username usernameInfo;String userQuery select\n u.username, u.password\n from u_user u \n where u.state1 and u.username?;Connection conn null;try {conn dataSource.getConnection();PreparedStatement statement conn.prepareStatement(userQuery);statement.setString(1, username);ResultSet resultSet statement.executeQuery();if (resultSet.next()) {String dbPassword resultSet.getString(password);Boolean bl PasswordUtil.matches(password, dbPassword);if (Boolean.TRUE.equals(bl)) {logger.info(密码验证成功username:{}, username);} else {logger.error(密码验证失败username:{}, usernameInfo);}return bl;} else {logger.error(认证失败,username:{} 没有找到, usernameInfo);return false;}} catch (Exception e) {logger.error(数据库查询用户异常{}, e);throw new RuntimeException(e);} finally {if (conn ! null) {try {conn.close();} catch (SQLException e) {throw new RuntimeException(e);}}}}Overridepublic void close() throws KafkaException {if (dataSource ! null) {dataSource.close();}}} 获取数据源代码 package com.liang.kafka.auth.util;import com.alibaba.druid.pool.DruidDataSource; import org.slf4j.Logger; import org.slf4j.LoggerFactory;import java.util.Map; import java.util.Properties;/*** author liang* desc 获取数据源*/ public class DataSourceUtil {private static final Logger LOG LoggerFactory.getLogger(DataSourceUtil.class);/*** 保证 instance 在所有线程中同步*/private static volatile DruidDataSource dataSource null;public static synchronized DruidDataSource getInstance(MapString, ? configs) {if (dataSource null || dataSource.isClosed()) {dataSource initDataSource(configs);}return dataSource;}private static final DruidDataSource initDataSource(final MapString, ? configs) {Properties properties new Properties();for (Map.EntryString, ? entry : configs.entrySet()) {if (entry.getKey().startsWith(druid.)) {String key entry.getKey();String value (String) entry.getValue();LOG.info(datasource connection config: {}:{}, key, value);properties.setProperty(key, value);}}dataSource new DruidDataSource();dataSource.configFromPropety(properties);return dataSource;}} Kafka配置文件中添加数据源的相关配置 enable_db_auth true listener.name.sasl_plaintext.plain.sasl.server.callback.handler.classcom.liang.kafka.auth.handler.MyPlainServerCallbackHandler druid.name mysql_db druid.type com.alibaba.druid.pool.DruidDataSource druid.url jdbc:mysql://127.0.0.1:3306/test?useSSLFALSEuseUnicodetruecharacterEncodingUTF-8allowMultiQueriestrueserverTimezoneAsia/Shanghai druid.username root druid.password root druid.filters stat druid.driverClassName com.mysql.cj.jdbc.Driver druid.initialSize 5 druid.minIdle 2 druid.maxActive 50 druid.maxWait 60000 druid.timeBetweenEvictionRunsMillis 60000 druid.minEvictableIdleTimeMillis 300000 druid.validationQuery SELECT x druid.testWhileIdle true druid.testOnBorrow false druid.poolPreparedStatements false druid.maxPoolPreparedStatementPerConnectionSize 20其中enable_db_auth来控制是否开启动态认证。 编译打成jar包后需要放到kafka_2.12-3.5.0\libs目录还使用了相关的依赖包也要放入 重启Kafka后生效Kafka的连接认证就会从数据库去查询想增加修改删除用户直接在数据库表里操作。 参考链接 https://www.top8488.top/kafka/458.html/ https://zhuanlan.zhihu.com/p/301343840?utm_mediumsocialutm_oi886243404000944128utm_id0 https://www.jianshu.com/p/e4c50e4affb8
http://www.dnsts.com.cn/news/68338.html

相关文章:

  • 手机网站跳出率低免费动画制作app哪个好用
  • 北京网站改版费用游戏币销售网站建设
  • 遵义网站推广货架网站开发
  • 汝州建站公司搭建网站实时访问地图
  • 0基础如何做网站百度推广一级代理商名单
  • wordpress连接微博设置密码seo免费
  • 网站建设 软件 开源如何确定网站建设 栏目
  • 沈阳网站建设哪家好企业公司简介范文
  • 企业网站建设有名 乐云seo小程序开发流程详细
  • 如何确定网站被khtml怎么弄
  • 合肥建设网站制作哪个好工作室是个体户还是公司
  • 如何查看网站页面大小国际旅游网站设计报告
  • 微网站html5模板公司主页网站制作
  • 网站名称及域名wordpress图片加框架
  • 建网站怎么样才能流畅网址缩短在线生成
  • 建设银行查余额网站企业形象设计包括哪些
  • 公司网站服务器租用一个网站备案多个域名吗
  • 和镜像网站做友链做天猫网站要多少钱
  • 手机网站引导页js做网站拉广告
  • 详述电子商务网站的建设东莞电子产品网站建设
  • 网站不用域名解绑如何策划网络推广方案
  • 深圳系统开发高端网站建设wordpress回复提醒
  • 服务 信誉好的网站制作股票软件定制
  • 浙江广厦建设职业技术学院招生网站app网站区别
  • 鸿邑科技 网站建设上海网页制作机构
  • 横岗网站建设公司企业网站必备模块
  • 北京建设局投诉网站深圳建设网站费用
  • 文章网站模板哪个好网站备案登记信息
  • 网站建设原因分析涟源市建设局网站
  • 哪些网站做代理商用PS做的个人网站图片