java 从零开始手写 RPC (02)

说明

上一篇代码基于 socket 的实现非常简单,但是对于实际生产,一般使用 netty。

至于 netty 的优点可以参考:

代码实现

maven 引入

<dependency>

&lt;groupId&gt;io.netty&lt;/groupId&gt;<br/>
&lt;artifactId&gt;netty-all&lt;/artifactId&gt;<br/>
&lt;version&gt;${netty.version}&lt;/version&gt;<br/>

&lt;/dependency&gt;

引入 netty 对应的 maven 包,此处为 4.1.17.Final。

服务端代码实现

netty 的服务端启动代码是比较固定的。

package com.github.houbb.rpc.server.core;
import com.github.houbb.log.integration.core.Log;
import com.github.houbb.log.integration.core.LogFactory;
import com.github.houbb.rpc.server.constant.RpcServerConst;
import com.github.houbb.rpc.server.handler.RpcServerHandler;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel; /**

  • rpc 服务端
  • @author binbin.hou
  • @since 0.0.1
    */
    public class RpcServer extends Thread { private static final Log log = LogFactory.getLog(RpcServer.class); /**
    • 端口号
      */
      private final int port; public RpcServer() {
      this.port = RpcServerConst.DEFAULT_PORT;
      } public RpcServer(int port) {
      this.port = port;
      } @Override
      public void run() {
      // 启动服务端
      log.info(“RPC 服务开始启动服务端”); EventLoopGroup bossGroup = new NioEventLoopGroup();
      EventLoopGroup workerGroup = new NioEventLoopGroup(); try {
      ServerBootstrap serverBootstrap = new ServerBootstrap();
      serverBootstrap.group(workerGroup, bossGroup)
      .channel(NioServerSocketChannel.class)
      .childHandler(new ChannelInitializer&lt;Channel&gt;() {
      @Override
      protected void initChannel(Channel ch) throws Exception {
      ch.pipeline().addLast(new RpcServerHandler());
      }
      })
      // 这个参数影响的是还没有被accept 取出的连接
      .option(ChannelOption.SO_BACKLOG, 128)
      // 这个参数只是过一段时间内客户端没有响应,服务端会发送一个 ack 包,以判断客户端是否还活着。
      .childOption(ChannelOption.SO_KEEPALIVE, true); // 绑定端口,开始接收进来的链接
      ChannelFuture channelFuture = serverBootstrap.bind(port).syncUninterruptibly();
      log.info(“RPC 服务端启动完成,监听【” + port + “】端口”); channelFuture.channel().closeFuture().syncUninterruptibly();
      log.info(“RPC 服务端关闭完成”);
      } catch (Exception e) {
      log.error(“RPC 服务异常”, e);
      } finally {
      workerGroup.shutdownGracefully();
      bossGroup.shutdownGracefully();
      }
      } }

为了简单,服务端启动端口号固定,RpcServerConst 常量类内容如下:

public final class RpcServerConst {
private RpcServerConst(){}
/**
  • 默认端口
  • @since 0.0.1
    */
    public static final int DEFAULT_PORT = 9627; }
  • RpcServerHandler

    当然,还有一个比较核心的类就是 RpcServerHandler

    public class RpcServerHandler extends SimpleChannelInboundHandler {
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
    // do nothing now
    }
    }

    目前是空实现,后续可以添加对应的日志输出及逻辑处理。

    测试

    启动测试的代码非常简单:

    /**
  • 服务启动代码测试
  • @param args 参数
    /
    public static void main(String[] args) {
    new RpcServer().start();
    }
  • 说明

    上面我们实现了服务端的实现,这一节来一起看一下 client 客户端代码实现。

    代码实现

    RpcClient

    /
  • Copyright © 2019. houbinbin Inc.
  • rpc All rights reserved.
    */ package com.github.houbb.rpc.client.core; import com.github.houbb.log.integration.core.Log;
    import com.github.houbb.log.integration.core.LogFactory;
    import com.github.houbb.rpc.client.handler.RpcClientHandler; import io.netty.bootstrap.Bootstrap;
    import io.netty.channel.Channel;
    import io.netty.channel.ChannelFuture;
    import io.netty.channel.ChannelInitializer;
    import io.netty.channel.ChannelOption;
    import io.netty.channel.EventLoopGroup;
    import io.netty.channel.nio.NioEventLoopGroup;
    import io.netty.channel.socket.nio.NioSocketChannel;
    import io.netty.handler.logging.LogLevel;
    import io.netty.handler.logging.LoggingHandler; /**
  • &lt;p&gt; rpc 客户端 &lt;/p&gt;
    *
  • &lt;pre&gt; Created: 2019/10/16 11:21 下午 &lt;/pre&gt;
  • &lt;pre&gt; Project: rpc &lt;/pre&gt;
    *
  • @author houbinbin
  • @since 0.0.2
    */
    public class RpcClient extends Thread { private static final Log log = LogFactory.getLog(RpcClient.class); /**
    • 监听端口号
      /
      private final int port; public RpcClient(int port) {
      this.port = port;
      } public RpcClient() {
      this(9527);
      } @Override
      public void run() {
      // 启动服务端
      log.info(“RPC 服务开始启动客户端”); EventLoopGroup workerGroup = new NioEventLoopGroup(); try {
      Bootstrap bootstrap = new Bootstrap();
      ChannelFuture channelFuture = bootstrap.group(workerGroup)
      .channel(NioSocketChannel.class)
      .option(ChannelOption.SO_KEEPALIVE, true)
      .handler(new ChannelInitializer&lt;Channel&gt;(){
      @Override
      protected void initChannel(Channel ch) throws Exception {
      ch.pipeline()
      .addLast(new LoggingHandler(LogLevel.INFO))
      .addLast(new RpcClientHandler());
      }
      })
      .connect(“localhost”, port)
      .syncUninterruptibly(); log.info(“RPC 服务启动客户端完成,监听端口:” + port);
      channelFuture.channel().closeFuture().syncUninterruptibly();
      log.info(“RPC 服务开始客户端已关闭”);
      } catch (Exception e) {
      log.error(“RPC 客户端遇到异常”, e);
      } finally {
      workerGroup.shutdownGracefully();
      }
      } }
  • .connect(“localhost”, port)

    RpcClientHandler

    客户端处理类也比较简单,暂时留空。

    /
  • Copyright © 2019. houbinbin Inc.
  • rpc All rights reserved.
    */ package com.github.houbb.rpc.client.handler; import io.netty.channel.ChannelHandlerContext;
    import io.netty.channel.SimpleChannelInboundHandler; /**
  • &lt;p&gt; 客户端处理类 &lt;/p&gt;
    *
  • &lt;pre&gt; Created: 2019/10/16 11:30 下午 &lt;/pre&gt;
  • &lt;pre&gt; Project: rpc &lt;/pre&gt;
    *
  • @author houbinbin
  • @since 0.0.2
    */
    public class RpcClientHandler extends SimpleChannelInboundHandler { @Override
    protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
    // do nothing.
    } }
  • 启动测试

    服务端

    首先启动服务端。

    客户端

    然后启动客户端连接服务端,实现如下:

    /**
  • 服务启动代码测试
  • @param args 参数
    */
    public static void main(String[] args) {
    new RpcClient().start();
    }
  • 小结

    为了便于大家学习,以上源码已经开源:

    我是老马,期待与你的下次重逢。