站長資訊網
        最全最豐富的資訊網站

        Java 手寫一個RPC框架

        Java 手寫一個RPC框架

        RPC框架稱為遠程調用框架,其實現的核心原理就是消費者端使用動態代理來代理一個接口的方法(基于JDK的動態代理,當然如果使用CGLib可以直接使用無接口類的方法),通過加入網絡傳輸編程,傳輸調用接口方法名稱,方法參數來給提供者獲取,再通過反射,來執行該接口的方法,再將反射執行的結果通過網絡編程傳回消費者端。

        現在我們來依次實現這些概念。這里我們做最簡單的實現,網絡編程使用的是BIO,大家可以使用Reactor模式的Netty來改寫性能更好的方式。而網絡傳輸中使用的序列化和反序列化也是Java自帶的,當然這樣的傳輸字節比較大,可以使用google的protoBuffer或者kryo來處理。這里只為了方便說明原理。

        pom

        <?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0"          xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"          xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">     <modelVersion>4.0.0</modelVersion>      <groupId>com.guanjian</groupId>     <artifactId>rpc-framework</artifactId>     <version>1.0-SNAPSHOT</version>          <build>         <plugins>             <plugin>                 <groupId>org.apache.maven.plugins</groupId>                 <artifactId>maven-compiler-plugin</artifactId>                 <version>3.7.0</version>                 <configuration>                     <source>1.8</source>                     <target>1.8</target>                     <encoding>UTF-8</encoding>                 </configuration>             </plugin>         </plugins>     </build> </project>

        首先當然是我們要進行遠程調用的接口以及接口的方法。

        public interface HelloService {     String sayHello(String content);}

        接口實現類

        public class HelloServiceImpl implements HelloService {    public String sayHello(String content) {        return "hello," + content;    } }

        消費者端的動態代理,如果你是把提供者和消費者寫在兩個工程中,則提供者端需要上面的接口和實現類,而消費者端只需要上面的接口。

        public class ConsumerProxy {     /**      * 消費者端的動態代理      * @param interfaceClass 代理的接口類      * @param host 遠程主機IP      * @param port 遠程主機端口      * @param <T>      * @return      */     @SuppressWarnings("unchecked")     public static <T> T consume(final Class<T> interfaceClass,final String host,final int port) {         return (T) Proxy.newProxyInstance(interfaceClass.getClassLoader(),                 new Class[]{interfaceClass}, (proxy,method,args) -> {                     //創建一個客戶端套接字                     Socket socket = new Socket(host, port);                     try {                         //創建一個對外傳輸的對象流,綁定套接字                         ObjectOutputStream output = new ObjectOutputStream(socket.getOutputStream());                         try {                             //將動態代理的方法名寫入對外傳輸的對象流中                             output.writeUTF(method.getName());                             //將動態代理的方法的參數寫入對外傳輸的對象流中                             output.writeObject(args);                             //創建一個對內傳輸的對象流,綁定套接字                             //這里是為了獲取提供者端傳回的結果                             ObjectInputStream input = new ObjectInputStream(socket.getInputStream());                             try {                                 //從對內傳輸的對象流中獲取結果                                 Object result = input.readObject();                                 if (result instanceof Throwable) {                                     throw (Throwable) result;                                 }                                 return result;                             } finally {                                 input.close();                             }                         } finally {                             output.close();                         }                     } finally {                         socket.close();                     }                 }         );     } }

        有關JDK動態代理的內容可以參考AOP原理與自實現 ,BIO的部分可以參考傳統IO與NIO比較

        提供者端的網絡傳輸和遠程方式調用服務

        public class ProviderReflect {     private static final ExecutorService executorService = Executors.newCachedThreadPool();      /**      * RPC監聽和遠程方法調用      * @param service RPC遠程方法調用的接口實例      * @param port 監聽的端口      * @throws Exception      */     public static void provider(final Object service,int port) throws Exception {         //創建服務端的套接字,綁定端口port         ServerSocket serverSocket = new ServerSocket(port);         while (true) {             //開始接收客戶端的消息,并以此創建套接字             final Socket socket = serverSocket.accept();             //多線程執行,這里的問題是連接數過大,線程池的線程數會耗盡             executorService.execute(() -> {                 try {                     //創建呢一個對內傳輸的對象流,并綁定套接字                     ObjectInputStream input = new ObjectInputStream(socket.getInputStream());                     try {                         try {                             //從對象流中讀取接口方法的方法名                             String methodName = input.readUTF();                             //從對象流中讀取接口方法的所有參數                             Object[] args = (Object[]) input.readObject();                             Class[] argsTypes = new Class[args.length];                             for (int i = 0;i < args.length;i++) {                                 argsTypes[i] = args[i].getClass();                              }                             //創建一個對外傳輸的對象流,并綁定套接字                             //這里是為了將反射執行結果傳遞回消費者端                             ObjectOutputStream output = new ObjectOutputStream(socket.getOutputStream());                             try {                                 Class<?>[] interfaces = service.getClass().getInterfaces();                                 Method method = null;                                 for (int i = 0;i < interfaces.length;i++) {                                     method = interfaces[i].getDeclaredMethod(methodName,argsTypes);                                     if (method != null) {                                         break;                                     }                                 }                                 Object result = method.invoke(service, args);                                 //將反射執行結果寫入對外傳輸的對象流中                                 output.writeObject(result);                             } catch (Throwable t) {                                 output.writeObject(t);                             } finally {                                 output.close();                             }                         } catch (Exception e) {                             e.printStackTrace();                         } finally {                             input.close();                         }                     } finally {                         socket.close();                     }                 } catch (Exception e) {                     e.printStackTrace();                 }             });         }     } }

        啟動提供者端的網絡偵聽和遠程調用

        public class RPCProviderMain {     public static void main(String[] args) throws Exception {         HelloService service = new HelloServiceImpl();         ProviderReflect.provider(service,8083);     } }

        啟動消費者的動態代理調用

        public class RPCConsumerMain {     public static void main(String[] args) throws InterruptedException {         HelloService service = ConsumerProxy.consume(HelloService.class,"127.0.0.1",8083);         for (int i = 0;i < 1000;i++) {             String hello = service.sayHello("你好_" + i);             System.out.println(hello);             Thread.sleep(1000);         }     } }

        運行結果

        hello,你好_0
        hello,你好_1
        hello,你好_2
        hello,你好_3
        hello,你好_4
        hello,你好_5

        …..

        如果你要擴展成一個Netty+ProtoBuffer的高性能RPC框架可以參考Netty整合Protobuffer 的相關寫法。

        推薦教程:《PHP》

        贊(0)
        分享到: 更多 (0)
        網站地圖   滬ICP備18035694號-2    滬公網安備31011702889846號
        主站蜘蛛池模板: 精品精品国产国产| 亚洲精品无码成人片在线观看 | 99久久99久久精品国产| 亚洲精品无码日韩国产不卡?V| 亚洲精品私拍国产福利在线| 精品无码无人网站免费视频| 亚欧乱色国产精品免费视频| 国产精品最新国产精品第十页| 欧美国产日韩精品| 91麻豆精品国产| 久久99国产综合精品免费| 亚洲精品456播放| 久热精品人妻视频| 国产在线精品一区二区高清不卡 | 久草热8精品视频在线观看| 97精品国产高清自在线看超| 国产精品欧美亚洲韩国日本不卡| 欧美精品亚洲精品日韩传电影| 亚洲国产午夜中文字幕精品黄网站 | 99热都是精品久久久久久| 国产在线拍揄自揄视精品不卡| 精品久久久久久无码中文字幕一区| 亚洲精品麻豆av| 四虎永久在线精品免费一区二区| 精品国产亚洲一区二区在线观看| 国产精品乱视频| 国产精品99爱免费视频| Aⅴ精品无码无卡在线观看| 四虎成人精品免费影院| 真实国产乱子伦精品免费| 91精品福利在线观看| 国产成人精品久久综合 | 久久夜色精品国产www| 精品亚洲一区二区三区在线播放| 国精品产露脸自拍| 久久精品女人天堂AV麻| 久久人人超碰精品CAOPOREN| 日本加勒比久久精品| 亚洲一日韩欧美中文字幕欧美日韩在线精品一区二 | 欧美精品免费观看二区| 精品国产福利久久久|