校园网站建设的用处,wordpress 育儿主题,dlog4j wordpress,app推广在哪里可以接单Netty优化-rpc 1.3 RPC 框架1#xff09;准备工作 1.3 RPC 框架
1#xff09;准备工作
这些代码可以认为是现成的#xff0c;无需从头编写练习
为了简化起见#xff0c;在原来聊天项目的基础上新增 Rpc 请求和响应消息
Data
public abstract class Message implements … Netty优化-rpc 1.3 RPC 框架1准备工作 1.3 RPC 框架
1准备工作
这些代码可以认为是现成的无需从头编写练习
为了简化起见在原来聊天项目的基础上新增 Rpc 请求和响应消息
Data
public abstract class Message implements Serializable {// 省略旧的代码public static final int RPC_MESSAGE_TYPE_REQUEST 101;public static final int RPC_MESSAGE_TYPE_RESPONSE 102;static {// ...messageClasses.put(RPC_MESSAGE_TYPE_REQUEST, RpcRequestMessage.class);messageClasses.put(RPC_MESSAGE_TYPE_RESPONSE, RpcResponseMessage.class);}}请求消息
Getter
ToString(callSuper true)
public class RpcRequestMessage extends Message {/*** 调用的接口全限定名服务端根据它找到实现*/private String interfaceName;/*** 调用接口中的方法名*/private String methodName;/*** 方法返回类型*/private Class? returnType;/*** 方法参数类型数组*/private Class[] parameterTypes;/*** 方法参数值数组*/private Object[] parameterValue;public RpcRequestMessage(int sequenceId, String interfaceName, String methodName, Class? returnType, Class[] parameterTypes, Object[] parameterValue) {super.setSequenceId(sequenceId);this.interfaceName interfaceName;this.methodName methodName;this.returnType returnType;this.parameterTypes parameterTypes;this.parameterValue parameterValue;}Overridepublic int getMessageType() {return RPC_MESSAGE_TYPE_REQUEST;}
}响应消息
Data
ToString(callSuper true)
public class RpcResponseMessage extends Message {/*** 返回值*/private Object returnValue;/*** 异常值*/private Exception exceptionValue;Overridepublic int getMessageType() {return RPC_MESSAGE_TYPE_RESPONSE;}
}服务器架子
Slf4j
public class RpcServer {public static void main(String[] args) {NioEventLoopGroup boss new NioEventLoopGroup();NioEventLoopGroup worker new NioEventLoopGroup();LoggingHandler LOGGING_HANDLER new LoggingHandler(LogLevel.DEBUG);MessageCodecSharable MESSAGE_CODEC new MessageCodecSharable();// rpc 请求消息处理器待实现RpcRequestMessageHandler RPC_HANDLER new RpcRequestMessageHandler();try {ServerBootstrap serverBootstrap new ServerBootstrap();serverBootstrap.channel(NioServerSocketChannel.class);serverBootstrap.group(boss, worker);serverBootstrap.childHandler(new ChannelInitializerSocketChannel() {Overrideprotected void initChannel(SocketChannel ch) throws Exception {ch.pipeline().addLast(new ProcotolFrameDecoder());ch.pipeline().addLast(LOGGING_HANDLER);ch.pipeline().addLast(MESSAGE_CODEC);ch.pipeline().addLast(RPC_HANDLER);}});Channel channel serverBootstrap.bind(8080).sync().channel();channel.closeFuture().sync();} catch (InterruptedException e) {log.error(server error, e);} finally {boss.shutdownGracefully();worker.shutdownGracefully();}}
}服务器 handler
Slf4j
ChannelHandler.Sharable
public class RpcRequestMessageHandler extends SimpleChannelInboundHandlerRpcRequestMessage {Overrideprotected void channelRead0(ChannelHandlerContext ctx, RpcRequestMessage message) {RpcResponseMessage response new RpcResponseMessage();response.setSequenceId(message.getSequenceId());try {// 获取真正的实现对象HelloService service (HelloService)ServicesFactory.getService(Class.forName(message.getInterfaceName()));// 获取要调用的方法Method method service.getClass().getMethod(message.getMethodName(), message.getParameterTypes());// 调用方法Object invoke method.invoke(service, message.getParameterValue());// 调用成功response.setReturnValue(invoke);} catch (Exception e) {e.printStackTrace();// 调用异常response.setExceptionValue(e);}// 返回结果ctx.writeAndFlush(response);}public static void main(String[] args) throws Exception {RpcRequestMessage message new RpcRequestMessage(1,cn.itcast.rpc.service.HelloService,sayHello,String.class,new Class[]{String.class},new Object[]{张三});// 获取真正的实现对象HelloService service (HelloService)ServicesFactory.getService(Class.forName(message.getInterfaceName()));Method method service.getClass().getMethod(message.getMethodName(), message.getParameterTypes());Object invoke method.invoke(service, message.getParameterValue());System.out.println(invoke);}
}客户端架子
Slf4j
public class RpcClient {public static void main(String[] args) {NioEventLoopGroup group new NioEventLoopGroup();LoggingHandler LOGGING_HANDLER new LoggingHandler(LogLevel.DEBUG);MessageCodecSharable MESSAGE_CODEC new MessageCodecSharable();// rpc 响应消息处理器待实现RpcResponseMessageHandler RPC_HANDLER new RpcResponseMessageHandler();try {Bootstrap bootstrap new Bootstrap();bootstrap.channel(NioSocketChannel.class);bootstrap.group(group);bootstrap.handler(new ChannelInitializerSocketChannel() {Overrideprotected void initChannel(SocketChannel ch) throws Exception {ch.pipeline().addLast(new ProcotolFrameDecoder());ch.pipeline().addLast(LOGGING_HANDLER);ch.pipeline().addLast(MESSAGE_CODEC);ch.pipeline().addLast(RPC_HANDLER);}});Channel channel bootstrap.connect(localhost, 8080).sync().channel();ChannelFuture future channel.writeAndFlush(new RpcRequestMessage(1,cn.itcast.rpc.service.HelloService,sayHello,String.class,new Class[]{String.class},new Object[]{张三})).addListener(promise - {if (!promise.isSuccess()) {Throwable cause promise.cause();log.error(error, cause);}});channel.closeFuture().sync();} catch (Exception e) {log.error(client error, e);} finally {group.shutdownGracefully();}}
}客户端handler
Slf4j
ChannelHandler.Sharable
public class RpcResponseMessageHandler extends SimpleChannelInboundHandlerRpcResponseMessage {Overrideprotected void channelRead0(ChannelHandlerContext ctx, RpcResponseMessage msg) throws Exception {log.debug({}, msg);}
}服务器端的 service 获取
public class ServicesFactory {static Properties properties;static MapClass?, Object map new ConcurrentHashMap();static {try (InputStream in Config.class.getResourceAsStream(/application.properties)) {properties new Properties();properties.load(in);SetString names properties.stringPropertyNames();for (String name : names) {if (name.endsWith(Service)) {Class? interfaceClass Class.forName(name);Class? instanceClass Class.forName(properties.getProperty(name));map.put(interfaceClass, instanceClass.newInstance());}}} catch (IOException | ClassNotFoundException | InstantiationException | IllegalAccessException e) {throw new ExceptionInInitializerError(e);}}public static T T getService(ClassT interfaceClass) {return (T) map.get(interfaceClass);}
}相关配置 application.properties
serializer.algorithmJson
cn.itcast.server.service.HelloServicecn.itcast.server.service.HelloServiceImpl业务类
public interface HelloService {String sayHello(String name);
}public class HelloServiceImpl implements HelloService {Overridepublic String sayHello(String msg) {//int i 1 / 0;return 你好, msg;}
}计数器
public abstract class SequenceIdGenerator {private static final AtomicInteger id new AtomicInteger();public static int nextId() {return id.incrementAndGet();}
}
客户端最终代码
Slf4j
public class RpcClientManager {public static void main(String[] args) throws IOException {HelloService service getProxyService(HelloService.class);System.out.println(service.sayHello(zhangsan));System.out.println(service.sayHello(lisi));}public static T T getProxyService(ClassT serviceClass) {ClassLoader classLoader serviceClass.getClassLoader();Class?[] interfaces new Class[]{serviceClass};Object o Proxy.newProxyInstance(classLoader, interfaces, (proxy, method, args) - {// 1. 将方法调用转换为 消息对象int sequenceId SequenceIdGenerator.nextId();RpcRequestMessage msg new RpcRequestMessage(sequenceId,serviceClass.getName(),method.getName(),method.getReturnType(),method.getParameterTypes(),args);// 2. 将消息对象发送出去getChannel().writeAndFlush(msg);// 3. 准备一个空 Promise 对象来接收结果 指定 promise 对象异步接收结果线程DefaultPromiseObject promise new DefaultPromise(channel.eventLoop());RpcResponseMessageHandler.PROMISES.put(sequenceId, promise);log.debug(主线程开始等待...);// 4. 阻塞等待 promise 结果promise.await();log.debug(主线程放行...);if (promise.isSuccess()) {return promise.getNow();} else {throw new RuntimeException(promise.cause());}});return (T) o;}private static Channel channel null;private static final Object LOCK new Object();public static Channel getChannel() {if (channel ! null) {return channel;}synchronized (LOCK) {if (channel ! null) {return channel;}initChannel();return channel;}}//初始化channelprivate static void initChannel() {NioEventLoopGroup group new NioEventLoopGroup();LoggingHandler LOGGING_HANDLER new LoggingHandler(LogLevel.DEBUG);MessageCodecSharable MESSAGE_CODEC new MessageCodecSharable();RpcResponseMessageHandler RPC_HANDLER new RpcResponseMessageHandler();Bootstrap bootstrap new Bootstrap();bootstrap.channel(NioSocketChannel.class);bootstrap.group(group);bootstrap.handler(new ChannelInitializerSocketChannel() {Overrideprotected void initChannel(SocketChannel ch) throws Exception {ch.pipeline().addLast(new ProcotolFrameDecoder());ch.pipeline().addLast(LOGGING_HANDLER);ch.pipeline().addLast(MESSAGE_CODEC);ch.pipeline().addLast(RPC_HANDLER);}});try {channel bootstrap.connect(localhost, 8080).sync().channel();channel.closeFuture().addListener(future - {group.shutdownGracefully();});} catch (Exception e) {log.error(client error, e);}}
}服务端不变服务端handler
Slf4j
ChannelHandler.Sharable
public class RpcRequestMessageHandler extends SimpleChannelInboundHandlerRpcRequestMessage {Overrideprotected void channelRead0(ChannelHandlerContext ctx, RpcRequestMessage message) {RpcResponseMessage response new RpcResponseMessage();response.setSequenceId(message.getSequenceId());try {HelloService service (HelloService)ServicesFactory.getService(Class.forName(message.getInterfaceName()));Method method service.getClass().getMethod(message.getMethodName(), message.getParameterTypes());Object invoke method.invoke(service, message.getParameterValue());response.setReturnValue(invoke);} catch (Exception e) {e.printStackTrace();String msg e.getCause().getMessage();response.setExceptionValue(new Exception(远程调用出错: msg));}log.error(response.toString());ctx.writeAndFlush(response);}
}执行结果
16:42:29 [DEBUG] [main] c.i.r.RpcClientManager - 主线程开始等待...
16:42:29 [DEBUG] [nioEventLoopGroup-2-1] c.i.p.MessageCodecSharable - 16909060, 1, 0, 102, 1, 228
16:42:29 [DEBUG] [nioEventLoopGroup-2-1] c.i.p.MessageCodecSharable - RpcResponseMessage(superMessage(sequenceId1, messageType102), returnValue你好, zhangsan, exceptionValuenull)
16:42:29 [DEBUG] [main] c.i.r.RpcClientManager - 主线程放行...
你好, zhangsan
16:42:29 [DEBUG] [main] c.i.r.RpcClientManager - 主线程开始等待...
16:42:29 [DEBUG] [nioEventLoopGroup-2-1] c.i.p.MessageCodecSharable - 16909060, 1, 0, 102, 2, 224
16:42:29 [DEBUG] [nioEventLoopGroup-2-1] c.i.p.MessageCodecSharable - RpcResponseMessage(superMessage(sequenceId2, messageType102), returnValue你好, lisi, exceptionValuenull)
16:42:29 [DEBUG] [main] c.i.r.RpcClientManager - 主线程放行...
你好, lisi