freeBuf
主站

分类

漏洞 工具 极客 Web安全 系统安全 网络安全 无线安全 设备/客户端安全 数据安全 安全管理 企业安全 工控安全

特色

头条 人物志 活动 视频 观点 招聘 报告 资讯 区块链安全 标准与合规 容器安全 公开课

官方公众号企业安全新浪微博

FreeBuf.COM网络安全行业门户,每日发布专业的安全资讯、技术剖析。

FreeBuf+小程序

FreeBuf+小程序

简单实现mini版RPC框架
2021-12-10 17:11:58

RPC(Remote Procedure Call)就是一句话:在客户端直接调用服务端方法,就像调用本地方法一样!虽然基于REST的远程调用框架也能实现,但是他们和HTTP协议强绑定在一起,并不算是严格的RPC框架。

大致调用链如下图:

上面有一个HelloServiceStub, stub(桩),他是客户端实现HelloService服务的代理类。生成这个stub桩,可能是编译期,也可能是运行时动态生成。

Client:

上面构造好了服务的桩之后,调用HelloServiceStub.hello("world")时,这里会加点料->构造一个请求

请求包括:1.请求的服务名,这里是HelloService#hello(String)

2.请求服务的参数,这里是world

Server:

而RPC的服务端收到请求后,根据服务名找到真正的实现类,也就是HelloServiceImpl并调用hello方法,结果会组装成一个响应,返回给客户端。

但是有一个问题:客户端是如何找到服务端地址的呢?

这就引出了一个概念:NamingService,负责保存集群内所有节点的路由信息来帮助访问集群的客户端寻找集群中的节点。在RPC中,我更喜欢注册中心这个概念。

服务端的业务代码在向 RPC 框架中注册服务之后,RPC 框架就会把这个服务的名称和地址发布到注册中心上。客户端的桩在调用服务端之前,会向注册中心请求服务端的地址,请求的参数就是服务名称,也就是我们上面例子中的方法签名 HelloService#hello,注册中心会返回提供这个服务的地址,然后客户端再去请求服务端。

而请求和响应都是RPC提供的,那么他就要实现序列化和网络传输,而如果请求和响应都使用相同的序列化协议那么就实现了跨语言通信了!

所以一个的RPC框架是由:序列化+网络传输+注册中心+桩 组成的。如下图:

在开始之前引入一个SPI的概念,就是通过在META-INF/services 上写一个以接口为名,内容是接口实现类的配置文件,在运行的时候使用ServiceLoader.load来动态加载,这样调用方和实现方都是基于接口编程,解耦--调用方根据不同的配置来选择不同的实现。

序列化篇

序列化的产品有很多,gRPC的Protobuf,Dubbo的hession2,kryo等等。这里为了简单,采用自己实现的序列化。生产中还是尽量用已经成熟的产品。

/**
对外提供序列化的方式
*/
public class SerializeSupport {
  //不同的序列化类型,不同序列化实现 这是用于序列化 
  //value需要用SPI注入进去
  private static Map<Class<?>/* 序列化对象类型 */, Serializer<?>/* 序列化实现 */> serializerMap = new HashMap<>();
  //不同的序列化类型 不同序列化对象类型 这是用于反序列化
  private static Map<Byte/* 序列化实现类型 */, Class<?>/* 序列化对象类型 */> typeMap = new HashMap<>();
  //真正序列化的接口类
  Serializer serializer
   //反序列化
  public static  <E> E parse(byte [] buffer) {}
 	//序列化
  public static <E> byte [] serialize(E  entry) {}
}

//所有的序列化都实现这个接口
public interface Serializer<T> {
    /**
     * 计算对象序列化后的长度,主要用于申请存放序列化数据的字节数组
     * @param entry 待序列化的对象
     * @return 对象序列化后的长度
     */
    int size(T entry);
 
    /**
     * 序列化对象。将给定的对象序列化成字节数组
     * @param entry 待序列化的对象
     * @param bytes 存放序列化数据的字节数组
     * @param offset 数组的偏移量,从这个位置开始写入序列化数据
     * @param length 对象序列化后的长度,也就是{@link Serializer#size(java.lang.Object)}方法的返回值。
     */
    void serialize(T entry, byte[] bytes, int offset, int length);
 
    /**
     * 反序列化对象
     * @param bytes 存放序列化数据的字节数组
     * @param offset 数组的偏移量,从这个位置开始写入序列化数据
     * @param length 对象序列化后的长度
     * @return 反序列化之后生成的对象
     */
    T parse(byte[] bytes, int offset, int length);
 
    /**
     * 用一个字节标识对象类型,每种类型的数据应该具有不同的类型值
     */
    byte type();
 
    /**
     * 返回序列化对象类型的 Class 对象。
     */
    Class<T> getSerializeClass();
}

这里hello的参数是一个String类型,所以要实现一个StringSerializer,后面会用到注册中心元数据的序列化MetadataSerializer,和stub发出的请求序列化RpcRequestSerializer。

网络通信篇

高效的网络通信必然少不了Netty这种NIO异步网络通信的组件。

我们知道可以把系统大致分成IO密集型系统和计算密集型系统(也叫CPU密集型),而大部分业务都是IO密集型的,而IO密集型主要是网络IO和磁盘IO,现在SSD硬盘速度越来越快,所以网络IO才是提升速度的关键所在。

Netty简单实用demo:

// 创建一组线性 来执行收发数据的业务逻辑。
EventLoopGroup group = new NioEventLoopGroup();
 
try{
    // 初始化 Server
    ServerBootstrap serverBootstrap = new ServerBootstrap();
    serverBootstrap.group(group);
    serverBootstrap.channel(NioServerSocketChannel.class);
	//创建一个socket server 绑定到9999端口上
    serverBootstrap.localAddress(new InetSocketAddress("localhost", 9999));
 
    // 设置收到数据后的处理的 Handler 预先来定义回调方法
    serverBootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
        protected void initChannel(SocketChannel socketChannel) throws Exception {
            socketChannel.pipeline().addLast(new MyHandler());
        }
    });
    // 绑定端口,开始提供服务
    ChannelFuture channelFuture = serverBootstrap.bind().sync();
    channelFuture.channel().closeFuture().sync();
} catch(Exception e){
    e.printStackTrace();
} finally {
    group.shutdownGracefully().sync();
}

而在这里,我们需求的是,使用Netty 发送stub 序列化好的请求给服务端,并且可以异步得到服务端返回的响应就可以。那我们定义一个接口,返回值是一个CompletableFuture java8提供的异步组件(可以参考我之前写的博客《java并发实战》),而请求的数据和响应的数据都抽象在Command这里,注意 不管是请求还是响应 都是序列化的后的byte[]!

public interface Transport {
    /**
     * 发送请求命令
     */
    CompletableFuture<Command> send (Command request);
}

public class Command {
  	//请求头
    protected Header header;
  	//请求负载
    private byte [] payload;
    //...
}
 
public class Header {
  	//请求id 
    private int requestId;
  	//版本 这里是为了向下兼容 版本一直才能提供RPC服务
    private int version;
  	//识别是什么命令
    private int type;
    // ...
}
//返回的header 加上状态码和报错信息
public class ResponseHeader extends Header {
    private int code;
    private String error;
    // ...
}

因为我们使用的是TCP传输 协议,他是一种双工收发协议,什么意思呢?就是发送的数据和接受的数据不用对齐一来一回的回答,只要保证回答就行,顺序不用管。就像下图,张大爷问的和李大爷回答的可能牛头不对马嘴,但是回答了,我们只要在发送的时候给一个requestId,返回的时候带回这个id 就能把问和答对起来。

只要实现Transport就可以发送请求出去,这里使用NettyTransport来实现send。这个 send 方法的实现,本质上就是一个异步方法,在把请求数据发出去之后就返回了,并不会阻塞当前这个线程去等待响应返回来。为了防止发出的请求没有正常的返回 或者有报错,这里做了三个处理:

●异常处理 catch住异常 并结束任务

●失败处理 手动关闭任务 completableFuture.completeExceptionally(channelFuture.cause());

●超时处理 防止迟迟不回应 在InFlightRequests跑一个定时任务每十秒扫描一次map,看是否执行时间超过十秒, 超过就手动移除任务 释放锁。

package com.cumstom.rpc.transport;

import io.netty.channel.Channel;
import io.netty.channel.ChannelFutureListener;

import java.util.concurrent.CompletableFuture;

public class NettyTransport implements Transport {
  	//netty channel
    private final Channel channel;
  	//存储发出的请求还未回应的 map<requestId,ResponseFuture>
    private final InFlightRequests inFlightRequests;

    NettyTransport(Channel channel, InFlightRequests inFlightRequests) {
        this.channel = channel;
        this.inFlightRequests = inFlightRequests;
    }




    @Override
    public  CompletableFuture<Command> send(Command request) {
        // 构建返回值
        CompletableFuture<Command> completableFuture = new CompletableFuture<>();
        try {
            // 将在途请求放到inFlightRequests中
            inFlightRequests.put(new ResponseFuture(request.getHeader().getRequestId(), completableFuture));
            // 发送命令
            channel.writeAndFlush(request).addListener((ChannelFutureListener) channelFuture -> {
                // 处理发送失败的情况
                if (!channelFuture.isSuccess()) {
                    completableFuture.completeExceptionally(channelFuture.cause());
                    channel.close();
                }
            });
        } catch (Throwable t) {
            // 处理发送异常
            inFlightRequests.remove(request.getHeader().getRequestId());
            completableFuture.completeExceptionally(t);
        }
        return completableFuture;
    }


}

package com.cumstom.rpc.transport;


import java.io.Closeable;
import java.io.IOException;
import java.util.Map;
import java.util.concurrent.*;

public class InFlightRequests implements Closeable {
    private final static long TIMEOUT_SEC = 10L;
    //限制客户端请求 弥补异步请求的背压(Back pressure)机制 只能最多处理10个请求
    private final static Semaphore semaphore = new Semaphore(10);
    private final static Map<Integer, ResponseFuture> futureMap = new ConcurrentHashMap<>();
    private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
    private final ScheduledFuture scheduledFuture;

    public InFlightRequests() {
        //超时容错
        this.scheduledFuture = scheduledExecutorService.scheduleAtFixedRate(this::removeTimeoutFutures, TIMEOUT_SEC, TIMEOUT_SEC, TimeUnit.SECONDS);
    }

    public void put(ResponseFuture responseFuture) throws InterruptedException, TimeoutException {
        if (semaphore.tryAcquire(TIMEOUT_SEC, TimeUnit.SECONDS)) {
            futureMap.put(responseFuture.getRequestId(), responseFuture);
        } else {
            throw new TimeoutException();
        }
    }

    private void removeTimeoutFutures() {
        futureMap.entrySet().removeIf(entry -> {
            if (System.nanoTime() - entry.getValue().getTimestamp() > TIMEOUT_SEC * 1000000000L) {
                semaphore.release();
                return true;
            } else {
                return false;
            }
        });
    }

    public ResponseFuture remove(int requestId) {
        ResponseFuture future = futureMap.remove(requestId);
        if (null != future) {
            semaphore.release();
        }
        return future;
    }

    @Override
    public void close() {
        scheduledFuture.cancel(true);
        scheduledExecutorService.shutdown();
    }
}

InFlightRequests使用了10个信号量Semaphore 来限制请求弥补异步请求的背压(Back pressure)机制。解释一下这句话是什么意思:

如果是同步请求,客户端发送请求,必须等服务端响应完成才能做其他事情,这就是背压机制,服务端处理速度会天然地限制客户端请求的速度。

如果是异步请求,因为没有这个背压机制,导致客户端不会考虑服务端的处理速度,不断的请求堆积直到服务端内存占满,再请求就失败。为了解决这种问题,加一个锁/信号量,只有拿到了锁才能发出请求。

客户端篇

客户端最重要的就是实现Stub桩。桩是 RPC 框架在客户端的服务代理,它和远程服务具有相同的方法签名(因为服务端就是以<name,class>存的),或者说是实现了相同的接口,客户端在调用 RPC 框架提供的服务时,实际调用的就是“桩”提供的方法,在桩的实现方法中,它会发请求到服务端获取调用结果并返回给调用方。

那么Stub的功能很明确,两点:

1.把方法名和参数封装成请求,发送给服务端

2.把服务端返回的调用结果返回给调用方

代理对象就是HelloServiceStub 在客户端,委托对象就是HelloServiceImpl 在服务端。

我们需要一个接口来创建Stub

public interface StubFactory {
  //transport 是需要我们组装好请求之后 用transport发送请求
    <T> T createStub(Transport transport, Class<T> serviceClass);
}

但是如何创建Stub桩呢?

在gRPC 它是在编译 IDL 的时候就把桩生成好了,直接生成java源代码,再经过编译期编译就可以直接使用。

在Dubbo 它是在运行时动态生成,生成的是一些 class 文件,JVM 在运行的时候,读取这些 Class 文件来创建对应类的实例,因为class文件符合jvm规范,不需要经过源代码、编译这些过程,直接动态来创建一个桩。

这里使用比较简单容易理解的方式:用字符串拼写出一个源代码->编译->加载在JVM,其实都是动态代理的设计思想。

//定义一个抽象类 所有的代理类都根据抽象类来创建
public abstract class AbstractStub implements ServiceStub {
    protected Transport transport;

    protected byte [] invokeRemote(RpcRequest request) {
        Header header = new Header(0, 1, RequestIdSupport.next());
        byte [] payload = SerializeSupport.serialize(request);
        Command requestCommand = new Command(header, payload);
        try {
           	//根据不同的transport 发送消息
            Command responseCommand = transport.send(requestCommand).get();
			//收到响应
            ResponseHeader responseHeader = (ResponseHeader) responseCommand.getHeader();
            if(responseHeader.getCode() == 0) {
                return responseCommand.getPayload();
            } else {
                throw new Exception(responseHeader.getError());
            }

        } catch (ExecutionException e) {
            throw new RuntimeException(e.getCause());
        } catch (Throwable e) {
            throw new RuntimeException(e);
        }
    }

    @Override
    public void setTransport(Transport transport) {
        this.transport = transport;
    }
}
//实现桩的工厂类 来创建桩
public class DynamicStubFactory implements StubFactory{
    private final static String STUB_SOURCE_TEMP =
            "package com.cumstom.rpc.client;\n" +
                    "import com.cumstom.rpc.serialize.SerializeSupport;\n" +
                    "\n" +
                    "public class %s extends AbstractStub implements %s {\n" +
                    "    @Override\n" +
                    "    public String %s(String arg) {\n" +
                    "        return SerializeSupport.parse(\n" +
                    "                invokeRemote(\n" +
                    "                        new RpcRequest(\n" +
                    "                                \"%s\",\n" +
                    "                                \"%s\",\n" +
                    "                                SerializeSupport.serialize(arg)\n" +
                    "                        )\n" +
                    "                )\n" +
                    "        );\n" +
                    "    }\n" +
                    "}";

    @Override
    @SuppressWarnings("unchecked")
    public <T> T createStub(Transport transport, Class<T> serviceClass) {
        try {
            // 填充模板
            String stubSimpleName = serviceClass.getSimpleName() + "Stub";
            String classFullName = serviceClass.getName();
            String stubFullName = "com.cumstom.rpc.client." + stubSimpleName;
            String methodName = serviceClass.getMethods()[0].getName();
			//拼接成一个源代码
            String source = String.format(STUB_SOURCE_TEMP, stubSimpleName, classFullName, methodName, classFullName, methodName);
            // 编译源代码
            JavaStringCompiler compiler = new JavaStringCompiler();
            Map<String, byte[]> results = compiler.compile(stubSimpleName + ".java", source);
            // 加载编译好的类
            Class<?> clazz = compiler.loadClass(stubFullName, results);

            // 把Transport赋值给桩
            ServiceStub stubInstance = (ServiceStub) clazz.newInstance();
            stubInstance.setTransport(transport);
            // 返回这个桩
            return (T) stubInstance;
        } catch (Throwable t) {
            throw new RuntimeException(t);
        }
    }
}

服务端篇

服务端要做的就两件事情:

1.把服务注册到注册中心(NamingService

2.提供服务(实现类)

一般来说注册中心会有客户端和服务端,客户端提供注册的api和提供调用方与服务端通信,服务端记录每个 RPC 服务发来的注册信息,并保存到它的元数据中。可以用redis,zookeeper甚至数据库mysql存储元数据,为了简单这里放在一个文件里面,服务端和客户端都读取这个文件来获得元数据。

多注册中心,根据注册中心的地址得到注册中心的实例。这里也是使用SPI思想,通过配置把实现类都加载进去。我们这里是读取文件的注册中心,读取文件一定要用文件系统提供的锁,而不是语言提供的锁

public interface RpcAccessPoint extends Closeable{
    /**
     * 客户端获取远程服务的引用
     */
    <T> T getRemoteService(URI URI,Class<T> serviceClass);

    /**
     * 服务端注册服务的实现实例
     */
    <T> URI addServiceProvider (T service ,Class<T> serviceClass);

    /**
     * 服务端启动RPC框架,监听接口,开始提供远程服务
     * @return 服务实例,用于程序停止的时候安全关闭服务。
     */
    Closeable startServer() throws Exception;

    //系统可以根据 URI 中的协议,动态地来选择不同的注册中心实现。增加一种注册中心的实现,也不需要修改任何代码
    default NameService getNameService(URI nameServiceUri){
        Collection<NameService> nameServices = ServiceSupport.loadAll(NameService.class);
        for (NameService nameService : nameServices) {
            if (nameService.supportedSchemes().contains(nameServiceUri.getScheme())) {
                nameService.connect(nameServiceUri);
                return nameService;
            }
        }
        return null;
    }
}
public interface NameService {
 
    void registerService(String serviceName, URI uri) throws IOException;

    /**
     * 查询服务地址
     */
    URI lookupService(String serviceName) throws IOException;

    /**
     * 所有支持的协议
     */
    Collection<String> supportedSchemes();

    /**
     * 连接注册中心
     * @param nameServiceUri 注册中心地址
     */
    void connect(URI nameServiceUri);
}

有了注册中心之后,请求过来怎么处理呢?

使用RequestHandler处理客户端请求,需要做的事情是

1.反序列化

2.根据服务名来找注册中心的实现类

3.利用java反射,使用方法invoke,拿到结果

4.封装成reponse返回

注意这个类需要单例,保证拿到的都是同一个。

private Map<String/*service name*/, Object/*service provider*/>
				serviceProviders = new HashMap<>();
@Override
public Command handle(Command requestCommand) {
    Header header = requestCommand.getHeader();
    // 从 payload 中反序列化 RpcRequest
    RpcRequest rpcRequest = SerializeSupport.parse(requestCommand.getPayload());
    // 查找所有已注册的服务提供方,寻找 rpcRequest 中需要的服务
    Object serviceProvider = serviceProviders.get(rpcRequest.getInterfaceName());
    // 找到服务提供者,利用 Java 反射机制调用服务的对应方法
    String arg = SerializeSupport.parse(rpcRequest.getSerializedArguments());
    Method method = serviceProvider.getClass().getMethod(rpcRequest.getMethodName(), String.class);
    String result = (String ) method.invoke(serviceProvider, arg);
    // 把结果封装成响应命令并返回
    return new Command(new ResponseHeader(type(), header.getVersion(), header.getRequestId()), SerializeSupport.serialize(result));
    // ...
}

总结

至此RPC的核心的点已经差不多讲完了。那么开始运行吧

Server 和Client 都有一个main方法,先启动server把服务注册到注册中心再启动client。

以上只是简单的RPC实现,而成熟的RPC,比如dubbo或者gRPC都会兼容和扩展很多东西,并且非常灵活,所以源码看起来会比较吃力,但是核心逻辑大概就是如此 。有兴趣的小伙伴可以去读读他们的源码~

本文作者:魄罗@涂鸦智能安全实验室

本文作者:, 转载请注明来自FreeBuf.COM

# RPC
被以下专辑收录,发现更多精彩内容
+ 收入我的专辑
评论 按热度排序

登录/注册后在FreeBuf发布内容哦

相关推荐
\
  • 0 文章数
  • 0 评论数
  • 0 关注者
文章目录
登录 / 注册后在FreeBuf发布内容哦
收入专辑