手写RPC框架第三章《RPC中间件》
作者:付政委 | 欢迎关注并获取源码
东武望馀杭。云海天涯两杳茫。何日功成名遂了,还乡。醉笑陪公三万场。
不用诉离觞。痛饮从来别有肠。今夜送归灯火冷,河塘。堕泪羊公却姓杨。
微信公众号:bugstack虫洞栈
专注于原创专题案例沉淀技术,以最易学习编程的方式分享知识,让萌新、小白、大牛都能有所收获。目前已完成的专题有;Netty4.x实战专题案例、用Java实现JVM、基于JavaAgent的全链路监控、手写RPC框架等。
案例介绍
结合上面两章节,本章将实现rpc的基础功能;提供一个rpc中间件jar给生产端和服务端。
技术点;
1、注册中心,生产者在启动的时候需要将本地接口发布到注册中心,我们这里采用redis作为注册中心,随机取数模拟权重。
2、客户端在启动的时候,连接到注册中心,也就是我们的redis。连接成功后将配置的生产者方法发布到注册中心{接口+别名}。
3、服务端配置生产者的信息后,在加载xml时候由中间件生成动态代理类,当发生发放调用时实际则调用了我们代理类的方法,代理里会通过netty的futuer通信方式进行数据交互。
环境准备
1、jdk 1.8.0
2、IntelliJ IDEA Community Edition 2018.3.1 x64
3、windows redis
代码示例 | 源码获取,关注公众号:bugstack虫洞栈 | 回复RPC案例
1itstack-demo-rpc-03
2└── src
3 └── main
4 │ ├── java
5 │ │ └── org.itstack.demo.rpc
6 │ │ ├── config
7 │ │ ├── domain
8 │ │ ├── network
9 │ │ │ ├── client
10 │ │ │ │ ├── ClientSocket.java
11 │ │ │ │ └── MyClientHandler.java
12 │ │ │ ├── codec
13 │ │ │ │ ├── RpcDecoder.java
14 │ │ │ │ └── RpcEncoder.java
15 │ │ │ ├── future
16 │ │ │ │ ├── SyncWrite.java
17 │ │ │ │ ├── SyncWriteFuture.java
18 │ │ │ │ ├── SyncWriteMap.java
19 │ │ │ │ └── WriteFuture.java
20 │ │ │ ├── msg
21 │ │ │ │ ├── Request.java
22 │ │ │ │ └── Response.java
23 │ │ │ ├── server
24 │ │ │ │ ├── MyServerHandler.java
25 │ │ │ │ └── ServerSocket.java
26 │ │ │ └── util
27 │ │ │ └── SerializationUtil.java
28 │ │ ├── reflect
29 │ │ │ ├── JDKInvocationHandler.java
30 │ │ │ └── JDKProxy.java
31 │ │ ├── registry
32 │ │ │ └── RedisRegistryCenter.java
33 │ │ └── util
34 │ └── resource
35 │ └── META-INF
36 │ ├── rpc.xsd
37 │ ├── spring.handlers
38 │ └── spring.schemas
39
40 └── test
41 ├── java
42 │ └── org.itstack.demo.test
43 │ ├── service
44 │ │ ├── impl
45 │ │ │ └── HelloServiceImpl.java
46 │ │ └── HelloService.java
47 │ └── ApiTest.java
48 └── resource
49 ├── itstack-rpc-center.xml
50 ├── itstack-rpc-consumer.xml
51 ├── itstack-rpc-provider.xml
52 └── log4j.xml
ConsumerBean.java
1package org.itstack.demo.rpc.config.spring.bean;
2
3import com.alibaba.fastjson.JSON;
4import io.netty.channel.ChannelFuture;
5import org.itstack.demo.rpc.config.ConsumerConfig;
6import org.itstack.demo.rpc.domain.RpcProviderConfig;
7import org.itstack.demo.rpc.network.client.ClientSocket;
8import org.itstack.demo.rpc.network.msg.Request;
9import org.itstack.demo.rpc.reflect.JDKProxy;
10import org.itstack.demo.rpc.registry.RedisRegistryCenter;
11import org.itstack.demo.rpc.util.ClassLoaderUtils;
12import org.springframework.beans.factory.FactoryBean;
13import org.springframework.util.Assert;
14
15/**
16 * http://www.itstack.org
17 * create by fuzhengwei on 2019/5/6
18 */
19public class ConsumerBean<T> extends ConsumerConfig<T> implements FactoryBean {
20
21 private ChannelFuture channelFuture;
22
23 private RpcProviderConfig rpcProviderConfig;
24
25
26 public Object getObject() throws Exception {
27
28 //从redis获取链接
29 if (null == rpcProviderConfig) {
30 String infoStr = RedisRegistryCenter.obtainProvider(nozzle, alias);
31 rpcProviderConfig = JSON.parseObject(infoStr, RpcProviderConfig.class);
32 }
33 Assert.isTrue(null != rpcProviderConfig);
34
35 //获取通信channel
36 if (null == channelFuture) {
37 ClientSocket clientSocket = new ClientSocket(rpcProviderConfig.getHost(), rpcProviderConfig.getPort());
38 new Thread(clientSocket).start();
39 for (int i = 0; i < 100; i++) {
40 if (null != channelFuture) break;
41 Thread.sleep(500);
42 channelFuture = clientSocket.getFuture();
43 }
44 }
45 Assert.isTrue(null != channelFuture);
46
47 Request request = new Request();
48 request.setChannel(channelFuture.channel());
49 request.setNozzle(nozzle);
50 request.setRef(rpcProviderConfig.getRef());
51 request.setAlias(alias);
52 return (T) JDKProxy.getProxy(ClassLoaderUtils.forName(nozzle), request);
53 }
54
55
56 public Class<?> getObjectType() {
57 try {
58 return ClassLoaderUtils.forName(nozzle);
59 } catch (ClassNotFoundException e) {
60 return null;
61 }
62 }
63
64
65 public boolean isSingleton() {
66 return true;
67 }
68
69
70}
ProviderBean.java
1package org.itstack.demo.rpc.config.spring.bean;
2
3import com.alibaba.fastjson.JSON;
4import org.itstack.demo.rpc.config.ProviderConfig;
5import org.itstack.demo.rpc.domain.LocalServerInfo;
6import org.itstack.demo.rpc.domain.RpcProviderConfig;
7import org.itstack.demo.rpc.registry.RedisRegistryCenter;
8import org.slf4j.Logger;
9import org.slf4j.LoggerFactory;
10import org.springframework.beans.BeansException;
11import org.springframework.context.ApplicationContext;
12import org.springframework.context.ApplicationContextAware;
13
14/**
15 * http://www.itstack.org
16 * create by fuzhengwei on 2019/5/6
17 */
18public class ProviderBean extends ProviderConfig implements ApplicationContextAware {
19
20 private Logger logger = LoggerFactory.getLogger(ProviderBean.class);
21
22
23 public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
24
25 RpcProviderConfig rpcProviderConfig = new RpcProviderConfig();
26 rpcProviderConfig.setNozzle(nozzle);
27 rpcProviderConfig.setRef(ref);
28 rpcProviderConfig.setAlias(alias);
29 rpcProviderConfig.setHost(LocalServerInfo.LOCAL_HOST);
30 rpcProviderConfig.setPort(LocalServerInfo.LOCAL_PORT);
31
32 //注册生产者
33 long count = RedisRegistryCenter.registryProvider(nozzle, alias, JSON.toJSONString(rpcProviderConfig));
34
35 logger.info("注册生产者:{} {} {}", nozzle, alias, count);
36 }
37
38}
ServerBean.java
1package org.itstack.demo.rpc.config.spring.bean;
2
3import org.itstack.demo.rpc.config.ServerConfig;
4import org.itstack.demo.rpc.domain.LocalServerInfo;
5import org.itstack.demo.rpc.network.server.ServerSocket;
6import org.itstack.demo.rpc.registry.RedisRegistryCenter;
7import org.slf4j.Logger;
8import org.slf4j.LoggerFactory;
9import org.springframework.beans.BeansException;
10import org.springframework.beans.factory.InitializingBean;
11import org.springframework.context.ApplicationContext;
12import org.springframework.context.ApplicationContextAware;
13
14/**
15 * http://www.itstack.org
16 * create by fuzhengwei on 2019/5/6
17 */
18public class ServerBean extends ServerConfig implements ApplicationContextAware {
19
20 private Logger logger = LoggerFactory.getLogger(ServerBean.class);
21
22
23 public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
24 //启动注册中心
25 logger.info("启动注册中心 ...");
26 RedisRegistryCenter.init(host, port);
27 logger.info("启动注册中心完成 {} {}", host, port);
28
29 //初始化服务端
30 logger.info("初始化生产端服务 ...");
31 ServerSocket serverSocket = new ServerSocket(applicationContext);
32 Thread thread = new Thread(serverSocket);
33 thread.start();
34 while (!serverSocket.isActiveSocketServer()) {
35 try {
36 Thread.sleep(500);
37 } catch (InterruptedException ignore) {
38 }
39 }
40
41 logger.info("初始化生产端服务完成 {} {}", LocalServerInfo.LOCAL_HOST, LocalServerInfo.LOCAL_PORT);
42 }
43
44
45}
MyClientHandler.java
1package org.itstack.demo.rpc.network.client;
2
3import io.netty.channel.ChannelHandlerContext;
4import io.netty.channel.ChannelInboundHandlerAdapter;
5import org.itstack.demo.rpc.network.future.SyncWriteFuture;
6import org.itstack.demo.rpc.network.future.SyncWriteMap;
7import org.itstack.demo.rpc.network.msg.Response;
8
9/**
10 * http://www.itstack.org
11 * create by fuzhengwei on 2019/5/6
12 */
13public class MyClientHandler extends ChannelInboundHandlerAdapter {
14
15
16 public void channelRead(ChannelHandlerContext ctx, Object obj) throws Exception {
17 Response msg = (Response) obj;
18 String requestId = msg.getRequestId();
19 SyncWriteFuture future = (SyncWriteFuture) SyncWriteMap.syncKey.get(requestId);
20 if (future != null) {
21 future.setResponse(msg);
22 }
23 }
24
25
26 public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
27 cause.printStackTrace();
28 ctx.close();
29 }
30
31}
MyServerHandler.java
1package org.itstack.demo.rpc.network.server;
2
3import io.netty.channel.ChannelHandlerContext;
4import io.netty.channel.ChannelInboundHandlerAdapter;
5import io.netty.util.ReferenceCountUtil;
6import org.itstack.demo.rpc.network.msg.Request;
7import org.itstack.demo.rpc.network.msg.Response;
8import org.itstack.demo.rpc.util.ClassLoaderUtils;
9import org.springframework.context.ApplicationContext;
10
11import java.lang.reflect.Method;
12
13/**
14 * http://www.itstack.org
15 * create by fuzhengwei on 2019/5/6
16 */
17public class MyServerHandler extends ChannelInboundHandlerAdapter {
18
19 private ApplicationContext applicationContext;
20
21 MyServerHandler(ApplicationContext applicationContext) {
22 this.applicationContext = applicationContext;
23 }
24
25
26 public void channelRead(ChannelHandlerContext ctx, Object obj) {
27 try {
28 Request msg = (Request) obj;
29 //调用
30 Class<?> classType = ClassLoaderUtils.forName(msg.getNozzle());
31 Method addMethod = classType.getMethod(msg.getMethodName(), msg.getParamTypes());
32 Object objectBean = applicationContext.getBean(msg.getRef());
33 Object result = addMethod.invoke(objectBean, msg.getArgs());
34 //反馈
35 Response request = new Response();
36 request.setRequestId(msg.getRequestId());
37 request.setResult(result);
38 ctx.writeAndFlush(request);
39 //释放
40 ReferenceCountUtil.release(msg);
41 } catch (Exception e) {
42 e.printStackTrace();
43 }
44 }
45
46
47 public void channelReadComplete(ChannelHandlerContext ctx) {
48 ctx.flush();
49 }
50
51}
JDKInvocationHandler.java
1package org.itstack.demo.rpc.reflect;
2
3
4import org.itstack.demo.rpc.network.future.SyncWrite;
5import org.itstack.demo.rpc.network.msg.Request;
6import org.itstack.demo.rpc.network.msg.Response;
7
8import java.lang.reflect.InvocationHandler;
9import java.lang.reflect.Method;
10
11public class JDKInvocationHandler implements InvocationHandler {
12
13 private Request request;
14
15 public JDKInvocationHandler(Request request) {
16 this.request = request;
17 }
18
19
20 public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
21 String methodName = method.getName();
22 Class[] paramTypes = method.getParameterTypes();
23 if ("toString".equals(methodName) && paramTypes.length == 0) {
24 return request.toString();
25 } else if ("hashCode".equals(methodName) && paramTypes.length == 0) {
26 return request.hashCode();
27 } else if ("equals".equals(methodName) && paramTypes.length == 1) {
28 return request.equals(args[0]);
29 }
30 //设置参数
31 request.setMethodName(methodName);
32 request.setParamTypes(paramTypes);
33 request.setArgs(args);
34 request.setRef(request.getRef());
35 Response response = new SyncWrite().writeAndSync(request.getChannel(), request, 5000);
36 //异步调用
37 return response.getResult();
38
39 }
40
41}
JDKProxy.java
1package org.itstack.demo.rpc.reflect;
2
3
4import org.itstack.demo.rpc.network.msg.Request;
5import org.itstack.demo.rpc.util.ClassLoaderUtils;
6
7import java.lang.reflect.InvocationHandler;
8import java.lang.reflect.Proxy;
9
10public class JDKProxy {
11
12 public static <T> T getProxy(Class<T> interfaceClass, Request request) throws Exception {
13 InvocationHandler handler = new JDKInvocationHandler(request);
14 ClassLoader classLoader = ClassLoaderUtils.getCurrentClassLoader();
15 T result = (T) Proxy.newProxyInstance(classLoader, new Class[]{interfaceClass}, handler);
16 return result;
17 }
18
19}
RedisRegistryCenter.java
1package org.itstack.demo.rpc.registry;
2
3import redis.clients.jedis.Jedis;
4import redis.clients.jedis.JedisPool;
5import redis.clients.jedis.JedisPoolConfig;
6
7/**
8 * http://www.itstack.org
9 * create by fuzhengwei on 2019/5/7
10 * redis 模拟RPC注册中心
11 */
12public class RedisRegistryCenter {
13
14 private static Jedis jedis; //非切片额客户端连接
15
16 //初始化redis
17 public static void init(String host, int port) {
18 // 池基本配置
19 JedisPoolConfig config = new JedisPoolConfig();
20 config.setMaxIdle(5);
21 config.setTestOnBorrow(false);
22 JedisPool jedisPool = new JedisPool(config, host, port);
23 jedis = jedisPool.getResource();
24 }
25
26 /**
27 * 注册生产者
28 *
29 * @param nozzle 接口
30 * @param alias 别名
31 * @param info 信息
32 * @return 注册结果
33 */
34 public static Long registryProvider(String nozzle, String alias, String info) {
35 return jedis.sadd(nozzle + "_" + alias, info);
36 }
37
38 /**
39 * 获取生产者
40 * 模拟权重,随机获取
41 * @param nozzle 接口名称
42 */
43 public static String obtainProvider(String nozzle, String alias) {
44 return jedis.srandmember(nozzle + "_" + alias);
45 }
46
47 public static Jedis jedis() {
48 return jedis;
49 }
50
51}
ApiTest.java
1public class ApiTest {
2
3 public static void main(String[] args) {
4 String[] configs = {"itstack-rpc-center.xml", "itstack-rpc-provider.xml", "itstack-rpc-consumer.xml"};
5 new ClassPathXmlApplicationContext(configs);
6 }
7
8}
框架,测试结果
12019-....ClassPathXmlApplicationContext:prepareRefresh:510] - Refreshing org.springframework.context.support.ClassPathXmlApplicationContext@299a06ac: startup date [Tue May 07 20:19:47 CST 2019]; root of context hierarchy
22019-...ml.XmlBeanDefinitionReader:loadBeanDefinitions:315] - Loading XML bean definitions from class path resource [spring/itstack-rpc-center.xml]
32019-...ml.XmlBeanDefinitionReader:loadBeanDefinitions:315] - Loading XML bean definitions from class path resource [spring/itstack-rpc-provider.xml]
42019-...ml.XmlBeanDefinitionReader:loadBeanDefinitions:315] - Loading XML bean definitions from class path resource [spring/itstack-rpc-consumer.xml]
52019-...upport.DefaultListableBeanFactory:preInstantiateSingletons:577] - Pre-instantiating singletons in org.springframework.beans.factory.support.DefaultListableBeanFactory@7e0b0338: defining beans [consumer_itstack,provider_helloService,consumer_helloService]; root of factory hierarchy
62019-...bean.ServerBean:setApplicationContext:25] - 启动注册中心 ...
72019-...bean.ServerBean:setApplicationContext:27] - 启动注册中心完成 127.0.0.1 6379
82019-...bean.ServerBean:setApplicationContext:30] - 初始化生产端服务 ...
92019-...bean.ServerBean:setApplicationContext:41] - 初始化生产端服务完成 10.13.81.104 22201
102019-...bean.ProviderBean:setApplicationContext:35] - 注册生产者:org.itstack.demo.test.service.HelloService itStackRpc 0
框架应用
为了测试我们写两个测试工程;itstack-demo-rpc-provider、itstack-demo-rpc-consumer
itstack-demo-rpc-provider 提供生产者接口
1itstack-demo-rpc-provider
2├── itstack-demo-rpc-provider-export
3│ └── src
4│ └── main
5│ └── java
6│ └── org.itstack.demo.rpc.provider.export
7│ ├── domain
8│ │ └── Hi.java
9│ └── HelloService.java
10│
11└── itstack-demo-rpc-provider-web
12 └── src
13 └── main
14 ├── java
15 │ └── org.itstack.demo.rpc.provider.web
16 │ └── HelloServiceImpl.java
17 └── resources
18 └── spring
19 └── spring-itstack-rpc-provider.xml
HelloService.java
1public interface HelloService {
2
3 String hi();
4
5 String say(String str);
6
7 String sayHi(Hi hi);
8
9}
HelloServiceImpl.java
1 ("helloService")
2public class HelloServiceImpl implements HelloService {
3
4
5 public String hi() {
6 return "hi itstack rpc";
7 }
8
9
10 public String say(String str) {
11 return str;
12 }
13
14
15 public String sayHi(Hi hi) {
16 return hi.getUserName() + " say:" + hi.getSayMsg();
17 }
18
19}
spring-itstack-rpc-provider.xml
1
2<beans xmlns="http://www.springframework.org/schema/beans"
3 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:rpc="http://rpc.itstack.org/schema/rpc"
4 xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
5 http://rpc.itstack.org/schema/rpc http://rpc.itstack.org/schema/rpc/rpc.xsd">
6
7 <!-- 注册中心 -->
8 <rpc:server id="rpcServer" host="127.0.0.1" port="6379"/>
9
10 <rpc:provider id="helloServiceRpc" nozzle="org.itstack.demo.rpc.provider.export.HelloService"
11 ref="helloService" alias="itstackRpc"/>
12
13</beans>
itstack-demo-rpc-consumer 提供消费者调用
1itstack-demo-rpc-consumer
2└── src
3 ├── main
4 │ ├── java
5 │ └── resources
6 │ └── spring
7 │ └── spring-itstack-rpc-consumer.xml
8 │
9 └── test
10 └── java
11 └── org.itstack.demo.test
12 └── ConsumerTest.java
spring-itstack-rpc-consumer.xml
1
2<beans xmlns="http://www.springframework.org/schema/beans"
3 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:rpc="http://rpc.itstack.org/schema/rpc"
4 xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
5 http://rpc.itstack.org/schema/rpc http://rpc.itstack.org/schema/rpc/rpc.xsd">
6
7 <!-- 注册中心 -->
8 <rpc:server id="consumer_itstack" host="127.0.0.1" port="6379"/>
9
10 <rpc:consumer id="helloService" nozzle="org.itstack.demo.rpc.provider.export.HelloService" alias="itstackRpc"/>
11
12</beans>
ConsumerTest.java
1 (SpringJUnit4ClassRunner.class)
2 ("/spring-config.xml")
3public class ConsumerTest {
4
5 (name = "helloService")
6 private HelloService helloService;
7
8
9 public void test() {
10
11 String hi = helloService.hi();
12 System.out.println("测试结果:" + hi);
13
14 String say = helloService.say("hello world");
15 System.out.println("测试结果:" + say);
16
17 Hi hiReq = new Hi();
18 hiReq.setUserName("付栈");
19 hiReq.setSayMsg("付可敌国,栈无不胜");
20 String hiRes = helloService.sayHi(hiReq);
21
22 System.out.println("测试结果:" + hiRes);
23 }
24
25}
应用,测试结果 测试时启动redis
启动ProviderTest Redis中的注册数据
1redis 127.0.0.1:6379> srandmember org.itstack.demo.rpc.provider.export.HelloService_itstackRpc
2"{\"alias\":\"itstackRpc\",\"host\":\"10.13.81.104\",\"nozzle\":\"org.itstack.demo.rpc.provider.export.HelloService\",\"port\":22201,\"ref\":\"helloService\"}"
3redis 127.0.0.1:6379>
执行ConsumerTest中的单元测试方法
1log4j:WARN No appenders could be found for logger (org.springframework.test.context.junit4.SpringJUnit4ClassRunner).
2log4j:WARN Please initialize the log4j system properly.
3log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
4测试结果:hi itstack rpc
5测试结果:hello world
6测试结果:付栈 say:付可敌国,栈无不胜
7
8Process finished with exit code 0
9