首页 技术 正文
技术 2022年11月18日
0 收藏 627 点赞 2,514 浏览 20081 个字

在写代码之前我们先要想清楚几个问题。

  1. 我们的框架到底要实现什么功能?

    我们要实现一个远程调用的 RPC 协议。

  2. 最终实现效果是什么样的?

    我们能像调用本地服务一样调用远程的服务。

  3. 怎样实现上面的效果?

    前面几章已经给大家说了,使用动态代理,在客户端生成接口代理类使用,在代理类的 invoke 方法里面将方法参数等信息组装成 request 发给服务端,服务端需要起一个服务器一直等待接收这种消息,接收之后使用反射调

    用对应接口的实现类。

首先我们需要实现底层的通信的服务端和客户端,可以有一下几种实现:

  1. 基于 Socket 的客户端和服务端(同步阻塞式,不推荐),大家可以当作一个编程练习,整个和系统没有进行整合,纯粹练习使用。

    基于 Socket 的服务端。

    启动一个阻塞式的 socket server,加入一个线程池实现伪异步。

    public class SocketServer {     private static SocketServer INSTANCE = new SocketServer();     private SocketServer(){};     public static SocketServer getInstance() {
    return INSTANCE;
    } //没有核心线程数量控制的线程池,最大线程数是 Integer 的最大值,多线程实现伪异步
    ExecutorService executorService = Executors.newCachedThreadPool(); /**
    * 发布服务,bio 模型
    * @param service
    * @param port
    */
    public void publiser(int port){
    try (ServerSocket serverSocket = new ServerSocket(port);)
    {
    while (true){
    Socket socket = serverSocket.accept();//接收请求
    executorService.execute(new SocketHandler(socket));
    }
    } catch (IOException e) {
    e.printStackTrace();
    }
    }
    }

    对应的 hanlder,使用反射调用对应的服务,并通过 sokcet 写回结果。

    public class SocketHandler implements Runnable{     private Socket socket;     public SocketHandler(Socket socket) {
    this.socket = socket;
    } @Override
    public void run() {
    try (ObjectInputStream inputStream = new ObjectInputStream(socket.getInputStream());
    ObjectOutputStream outputStream = new ObjectOutputStream(socket.getOutputStream());)
    {
    Object o = inputStream.readObject(); //readObject 是 java 反序列化的过程
    System.out.println(o);
    Object result = invoke((RpcRequest) o);
    //写回结果
    outputStream.writeObject(result);
    outputStream.flush();
    } catch (IOException e) {
    e.printStackTrace();
    } catch (ClassNotFoundException e) {
    e.printStackTrace();
    }
    } private Object invoke(RpcRequest invocation){ //根据方法名和参数类型在 service 里获取方法
    try {
    String interFaceName = invocation.getInterfaceName();
    Class impClass = Class.forName(invocation.getImpl()); Method method = impClass.getMethod(invocation.getMethodName(),invocation.getParamTypes());
    String result = (String)method.invoke(impClass.newInstance(),invocation.getParams());
    return result;
    } catch (NoSuchMethodException e) {
    e.printStackTrace();
    } catch (IllegalAccessException e) {
    e.printStackTrace();
    } catch (InvocationTargetException e) {
    e.printStackTrace();
    } catch (InstantiationException e) {
    e.printStackTrace();
    } catch (ClassNotFoundException e) {
    e.printStackTrace();
    } return null;
    }
    }

    在看客户端,拼装参数,发送给 socket 服务端。

    public class SocketClient {     private static SocketClient INSTANCE = new SocketClient();     private SocketClient(){};     public static SocketClient getInstance() {
    return INSTANCE;
    } private Socket newSocket(String host, Integer port) {
    System.out.println("创建一个新的 socket 连接");
    try {
    Socket socket = new Socket(host, port);
    return socket;
    } catch (IOException e) {
    System.out.println("建立连接失败");
    e.printStackTrace();
    }
    return null;
    } public Object sendRequest(String host, Integer port,RpcRequest rpcRequest) {
    Socket socket = newSocket(host,port);
    try (
    ObjectOutputStream outputStream = new ObjectOutputStream(socket.getOutputStream());
    ObjectInputStream inputStream = new ObjectInputStream(socket.getInputStream());)
    {
    outputStream.writeObject(rpcRequest);
    outputStream.flush(); Object result = inputStream.readObject(); inputStream.close();
    outputStream.close();
    return result; } catch (Exception e) {
    e.printStackTrace();
    }
    return null;
    }
    }

    通过上面的代码相信大家已经明白了这个流程了,就是一个客户端与服务端通信的过程,将需要调用的方法的参数传到服务端,服务端通过反射完成调用,最后返回结果给客户端。

    下面正式开始。

  2. 基于 Http 请求的客户端和基于 Tomcat 的服务端。

    基于 Tomcat 的服务端,单例模式,只有一个启动服务的 start 方法,监听到的请求通过 DispatcherServlet 处理。

    public class HttpServer { private static HttpServer INSTANCE = new HttpServer(); private HttpServer(){} public static HttpServer getInstance(){
    return INSTANCE;
    } /**
    *
    * servlet 容器,tomcat
    * @param hostname
    * @param port
    */ public void start(String hostname,Integer port){ Tomcat tomcat = new Tomcat();
    Server server = tomcat.getServer();
    Service service = server.findService("Tomcat"); Connector connector = new Connector();
    connector.setPort(port); Engine engine = new StandardEngine();
    engine.setDefaultHost(hostname); Host host = new StandardHost();
    host.setName(hostname); String contextPath = "";
    Context context = new StandardContext();
    context.setPath(contextPath);
    context.addLifecycleListener(new Tomcat.FixContextListener()); //声明周期监听器 host.addChild(context);
    engine.addChild(host); service.setContainer(engine);
    service.addConnector(connector); tomcat.addServlet(contextPath,"dispatcher", new DispatcherServlet());
    context.addServletMappingDecoded("/*","dispatcher"); try {
    tomcat.start();
    tomcat.getServer().await();
    } catch (LifecycleException e) {
    e.printStackTrace();
    }
    }
    }

    下面来看请求分发器 DispatcherServlet 的实现,将请求派发给 HttpServletHandler 实现。

    /**
    * tomcat 是 servlet 容器,写一个 servlet
    *
    */
    public class DispatcherServlet extends HttpServlet { @Override
    protected void service(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException {
    new HttpServletHandler().handler(req,resp);
    }
    }

    HttpServletHandler 的实现其实就是解析 request,通过反射调用最后返回结果。

    public class HttpServletHandler{     public void handler(HttpServletRequest req, HttpServletResponse resp) {         try(InputStream inputStream = req.getInputStream();
    OutputStream outputStream =resp.getOutputStream();){
    ObjectInputStream ois = new ObjectInputStream(inputStream);
    RpcRequest invocation = (RpcRequest) ois.readObject(); // 从注册中心根据接口,找接口的实现类
    String interFaceName = invocation.getInterfaceName();
    Class impClass = Class.forName(invocation.getImpl()); Method method = impClass.getMethod(invocation.getMethodName(),invocation.getParamTypes());
    Object result = method.invoke(impClass.newInstance(),invocation.getParams()); RpcResponse rpcResponse = new RpcResponse();
    rpcResponse.setResponseId(invocation.getRequestId());
    rpcResponse.setData(result);
    IOUtils.write(toByteArray(rpcResponse),outputStream);
    }catch (IOException e){
    e.printStackTrace();
    } catch (ClassNotFoundException e) {
    e.printStackTrace();
    } catch (NoSuchMethodException e) {
    e.printStackTrace();
    } catch (IllegalAccessException e) {
    e.printStackTrace();
    } catch (InvocationTargetException e) {
    e.printStackTrace();
    } catch (InstantiationException e) {
    e.printStackTrace();
    }
    } public byte[] toByteArray (Object obj) {
    byte[] bytes = null;
    ByteArrayOutputStream bos = new ByteArrayOutputStream();
    try {
    ObjectOutputStream oos = new ObjectOutputStream(bos);
    oos.writeObject(obj);
    oos.flush();
    bytes = bos.toByteArray ();
    oos.close();
    bos.close();
    } catch (IOException ex) {
    ex.printStackTrace();
    }
    return bytes;
    } }

    最后来看客户端的实现,通过 post 方法发送数据,最后解析服务端返回的结果。

    public class HttpClient { private static HttpClient INSTANCE = new HttpClient(); private HttpClient(){} public static HttpClient getInstance(){
    return INSTANCE;
    } public Object post(String hostname, Integer port, RpcRequest invocation){ try{
    URL url = new URL("http",hostname,port,"/");
    HttpURLConnection httpURLConnection = (HttpURLConnection) url.openConnection(); httpURLConnection.setRequestMethod("POST");
    httpURLConnection.setDoOutput(true); OutputStream outputStream = httpURLConnection.getOutputStream();
    ObjectOutputStream oos = new ObjectOutputStream(outputStream);
    oos.writeObject(invocation);
    oos.flush();
    oos.close(); InputStream inputStream = httpURLConnection.getInputStream();
    RpcResponse rpcResponse = (RpcResponse)toObject(IOUtils.toByteArray(inputStream));
    return rpcResponse.getData(); } catch (MalformedURLException e) {
    e.printStackTrace();
    } catch (IOException e) {
    e.printStackTrace();
    }
    return null; } public Object toObject (byte[] bytes) {
    Object obj = null;
    try {
    ByteArrayInputStream bis = new ByteArrayInputStream(bytes);
    ObjectInputStream ois = new ObjectInputStream (bis);
    obj = ois.readObject();
    ois.close();
    bis.close();
    } catch (IOException ex) {
    ex.printStackTrace();
    } catch (ClassNotFoundException ex) {
    ex.printStackTrace();
    }
    return obj;
    }
    }
  3. Netty 模型的客户端和服务端。

    基于 Netty 的服务端,里面的编码器和解码器是我们自己实现的,大家可以先用我注释掉的那部分,等我们写到编码解码器的时候再替换。

    public class NettyServer { private static NettyServer INSTANCE = new NettyServer(); private static Executor executor = Executors.newCachedThreadPool();     private final static int MESSAGE_LENGTH = 4;     private NettyServer(){};     public static NettyServer getInstance(){
    return INSTANCE;
    } private SerializeType serializeType = SerializeType.queryByType(Configuration.getInstance().getSerialize()); public static void submit(Runnable t){
    executor.execute(t);
    } public void start(String host, Integer port){
    EventLoopGroup bossGroup = new NioEventLoopGroup(1);
    EventLoopGroup workerGroup = new NioEventLoopGroup(); try{
    final ServerBootstrap bootstrap = new ServerBootstrap();
    bootstrap.group(bossGroup,workerGroup)
    .channel(NioServerSocketChannel.class)
    .option(ChannelOption.SO_BACKLOG, 128)
    .childOption(ChannelOption.SO_KEEPALIVE, true)
    .childHandler(new ChannelInitializer<SocketChannel>(){ @Override
    protected void initChannel(SocketChannel arg0) throws Exception {
    ChannelPipeline pipeline = arg0.pipeline();
    //ObjectDecoder的基类半包解码器LengthFieldBasedFrameDecoder的报文格式保持兼容。因为底层的父类LengthFieldBasedFrameDecoder
    //的初始化参数即为super(maxObjectSize, 0, 4, 0, 4);
    // pipeline.addLast(new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, NettyServer.MESSAGE_LENGTH, 0, NettyServer.MESSAGE_LENGTH));
    //利用LengthFieldPrepender回填补充ObjectDecoder消息报文头
    // pipeline.addLast(new LengthFieldPrepender(NettyServer.MESSAGE_LENGTH));
    // pipeline.addLast(new ObjectEncoder());
    //考虑到并发性能,采用weakCachingConcurrentResolver缓存策略。一般情况使用:cacheDisabled即可
    // pipeline.addLast(new ObjectDecoder(Integer.MAX_VALUE, ClassResolvers.weakCachingConcurrentResolver(this.getClass().getClassLoader())));
    //注册解码器NettyDecoderHandler
    pipeline.addLast(new NettyDecoderHandler(RpcRequest.class, serializeType));
    //注册编码器NettyEncoderHandler
    pipeline.addLast(new NettyEncoderHandler(serializeType));
    pipeline.addLast("handler", new NettyServerHandler()); } });
    Channel channel = bootstrap.bind(host, port).sync().channel();
    System.out.println("Server start listen at " + port);
    }catch(Exception e){
    bossGroup.shutdownGracefully();
    workerGroup.shutdownGracefully();
    }
    } }

    服务端对应的 handler,netty 都是这种 handler 模式,handler 里面也是将这个接收的 request 放入线程池中处理。

        public class NettyServerHandler extends SimpleChannelInboundHandler<RpcRequest> { private ChannelHandlerContext context; @Override
    protected void channelRead0(ChannelHandlerContext ctx, RpcRequest rpcRequest) throws Exception {
    System.out.println("server channelRead...");
    System.out.println(ctx.channel().remoteAddress() + "->server:" + rpcRequest.toString());
    InvokeTask it = new InvokeTask(rpcRequest,ctx);
    NettyServer.submit(it);
    } @Override
    public void handlerAdded(ChannelHandlerContext ctx) throws Exception{
    this.context = ctx;
    } }

    给出 InvokeTask 的对应实现。

    public class InvokeTask implements Runnable{ private RpcRequest invocation;
    private ChannelHandlerContext ctx; public InvokeTask(RpcRequest invocation,ChannelHandlerContext ctx) {
    super();
    this.invocation = invocation;
    this.ctx = ctx;
    } @Override
    public void run() { // 从注册中心根据接口,找接口的实现类
    String interFaceName = invocation.getInterfaceName();
    Class impClass = null;
    try {
    impClass = Class.forName(invocation.getImpl());
    } catch (ClassNotFoundException e) {
    e.printStackTrace();
    } Method method;
    Object result = null;
    try {
    method = impClass.getMethod(invocation.getMethodName(),invocation.getParamTypes());
    //这块考虑实现类,是不是应该在 spring 里面拿
    result = method.invoke(impClass.newInstance(),invocation.getParams());
    } catch (Exception e) {
    e.printStackTrace();
    }
    RpcResponse rpcResponse = new RpcResponse();
    rpcResponse.setResponseId(invocation.getRequestId());
    rpcResponse.setData(result);
    ctx.writeAndFlush(rpcResponse).addListener(new ChannelFutureListener() {
    public void operationComplete(ChannelFuture channelFuture) throws Exception {
    System.out.println("RPC Server Send message-id respone:" + invocation.getRequestId());
    }
    }); } }

    再来看客户端,客户端有两种实现,一种是不能复用 handler(可以立即为 connection)的模式,这种模式并发不太高,另一种是能够复用 handler 的 handlerPool 模式。

    不能复用的模式。

    public class NettyClient {
    private static NettyClient INSTANCE = new NettyClient(); private final static int parallel = Runtime.getRuntime().availableProcessors() * 2; private NettyClient(){}; public static NettyClient getInstance(){
    return INSTANCE;
    } private SerializeType serializeType = SerializeType.queryByType(Configuration.getInstance().getSerialize()); public void start(String host,Integer port){ Bootstrap bootstrap = new Bootstrap();
    EventLoopGroup group = new NioEventLoopGroup(parallel); try{
    bootstrap.group(group)
    .channel(NioSocketChannel.class)
    .handler(new ChannelInitializer<SocketChannel>(){ @Override
    protected void initChannel(SocketChannel arg0) throws Exception {
    ChannelPipeline pipeline = arg0.pipeline();
    //ObjectDecoder的基类半包解码器LengthFieldBasedFrameDecoder的报文格式保持兼容。因为底层的父类LengthFieldBasedFrameDecoder
    //的初始化参数即为super(maxObjectSize, 0, 4, 0, 4);
    //pipeline.addLast(new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4));
    //利用LengthFieldPrepender回填补充ObjectDecoder消息报文头
    //pipeline.addLast(new LengthFieldPrepender(4));
    //pipeline.addLast(new ObjectEncoder());
    //考虑到并发性能,采用weakCachingConcurrentResolver缓存策略。一般情况使用:cacheDisabled即可
    //pipeline.addLast(new ObjectDecoder(Integer.MAX_VALUE, ClassResolvers.weakCachingConcurrentResolver(this.getClass().getClassLoader())));
    //注册Netty编码器
    System.out.println("11111111:"+serializeType.getSerializeType());
    pipeline.addLast(new NettyEncoderHandler(serializeType));
    //注册Netty解码器
    pipeline.addLast(new NettyDecoderHandler(RpcResponse.class, serializeType));
    pipeline.addLast("handler", new NettyClientHandler()); } });
    ChannelFuture future = bootstrap.connect(host,port).sync();
    }catch(Exception e){
    group.shutdownGracefully();
    } }
    }

    在看可复用的模式,固定 handler 数量,目前框架中使用的是可复用模式,上面的不可复用的没用上,为了给大家理解,没有删除。

    public class NettyChannelPoolFactory {     //初始化Netty Channel阻塞队列的长度,该值为可配置信息
    private static final int channelConnectSize = 10; //Key为服务提供者地址,value为Netty Channel阻塞队列
    private static final Map<URL, ArrayBlockingQueue<Channel>> channelPoolMap = new ConcurrentHashMap<>(); private static NettyChannelPoolFactory INSTANCE = new NettyChannelPoolFactory(); private NettyChannelPoolFactory(){}; public static NettyChannelPoolFactory getInstance(){
    return INSTANCE;
    } private List<ServiceProvider> serviceMetaDataList = new ArrayList<>(); //根据配置文件里面需要调用的接口信息来初始化 channel
    public void initNettyChannelPoolFactory(Map<String, List<ServiceProvider>> providerMap){ //将服务提供者信息存入serviceMetaDataList列表
    Collection<List<ServiceProvider>> collectionServiceMetaDataList = providerMap.values();
    for (List<ServiceProvider> serviceMetaDataModels : collectionServiceMetaDataList) {
    if (CollectionUtils.isEmpty(serviceMetaDataModels)) {
    continue;
    }
    serviceMetaDataList.addAll(serviceMetaDataModels);
    } //获取服务提供者地址列表
    Set<URL> set = new HashSet<>();
    for (ServiceProvider serviceMetaData : serviceMetaDataList) {
    String serviceIp = serviceMetaData.getIp();
    int servicePort = serviceMetaData.getPort();
    URL url = new URL(serviceIp,servicePort);
    set.add(url);
    } for(URL url:set){
    //为每个 ip端口 建立多个 channel,并且放入阻塞队列中
    int channelSize = 0;
    while(channelSize < channelConnectSize){
    Channel channel = null;
    while(channel == null){
    channel = registerChannel(url);
    } channelSize ++; ArrayBlockingQueue<Channel> queue = channelPoolMap.get(url);
    if(queue == null){
    queue = new ArrayBlockingQueue<Channel>(channelConnectSize);
    channelPoolMap.put(url, queue);
    }
    queue.offer(channel); }
    } } public Channel registerChannel(URL url) {
    final SerializeType serializeType = SerializeType.queryByType(Configuration.getInstance().getSerialize());
    Bootstrap bootstrap = new Bootstrap();
    EventLoopGroup group = new NioEventLoopGroup(10); try{
    bootstrap.group(group)
    .channel(NioSocketChannel.class)
    .handler(new ChannelInitializer<SocketChannel>(){ @Override
    protected void initChannel(SocketChannel arg0) throws Exception {
    ChannelPipeline pipeline = arg0.pipeline();
    //ObjectDecoder的基类半包解码器LengthFieldBasedFrameDecoder的报文格式保持兼容。因为底层的父类LengthFieldBasedFrameDecoder
    //的初始化参数即为super(maxObjectSize, 0, 4, 0, 4);
    // pipeline.addLast(new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4));
    //利用LengthFieldPrepender回填补充ObjectDecoder消息报文头
    // pipeline.addLast(new LengthFieldPrepender(4));
    // pipeline.addLast(new ObjectEncoder());
    //考虑到并发性能,采用weakCachingConcurrentResolver缓存策略。一般情况使用:cacheDisabled即可
    // pipeline.addLast(new ObjectDecoder(Integer.MAX_VALUE, ClassResolvers.weakCachingConcurrentResolver(this.getClass().getClassLoader())));
    pipeline.addLast(new NettyEncoderHandler(serializeType));
    //注册Netty解码器
    pipeline.addLast(new NettyDecoderHandler(RpcResponse.class, serializeType));
    pipeline.addLast("handler", new NettyClientHandler()); } });
    ChannelFuture future = bootstrap.connect(url.getHost(),url.getPort()).sync();
    Channel channel = future.channel();
    //等待Netty服务端链路建立通知信号
    final CountDownLatch connectedLatch = new CountDownLatch(1); final List<Boolean> isSuccess = new ArrayList<>(1);
    future.addListener(new ChannelFutureListener(){ @Override
    public void operationComplete(ChannelFuture future)
    throws Exception {
    if(future.isSuccess()){
    isSuccess.add(true);
    }else{
    isSuccess.add(false);
    }
    connectedLatch.countDown();
    } });
    connectedLatch.await();
    if(isSuccess.get(0)){
    return channel;
    }
    }catch(Exception e){
    group.shutdownGracefully();
    e.printStackTrace();
    }
    return null;
    }
    //根据 url 获取阻塞队列
    public ArrayBlockingQueue<Channel> acqiure(URL url){
    System.out.println(channelPoolMap.toString());
    return channelPoolMap.get(url);
    } //channel 使用完毕后进行回收
    public void release(ArrayBlockingQueue<Channel> queue, Channel channel, URL url){
    if(queue == null){
    return;
    }
    //需要检查 channel 是否可用,如果不可用,重新注册一个放入阻塞队列中
    if(channel == null || !channel.isActive() || !channel.isOpen()|| !channel.isWritable()){
    if (channel != null) {
    channel.deregister().syncUninterruptibly().awaitUninterruptibly();
    channel.closeFuture().syncUninterruptibly().awaitUninterruptibly();
    }
    Channel c = null;
    while(c == null){
    c = registerChannel(url);
    }
    queue.offer(c);
    return;
    }
    queue.offer(channel);
    } }

    给出对应的 handler 实现,在 channelread0 里面读取 server 端返回的信息,因为 netty 是异步的,所以需要 MessageCallBack 来实现我们的同步调用。

    public class NettyClientHandler extends SimpleChannelInboundHandler<RpcResponse> {     private ChannelHandlerContext context;     @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
    throws Exception {
    cause.printStackTrace();
    ctx.close();
    } @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
    System.out.println("停止时间是:"+new Date());
    System.out.println("HeartBeatClientHandler channelInactive");
    } @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
    this.context = ctx;
    System.out.println("激活时间是:"+ctx.channel().id());
    } @Override
    protected void channelRead0(ChannelHandlerContext channelHandlerContext, RpcResponse rpcResponse) throws Exception {
    // String res = (String)msg;
    //RpcResponse rpcResponse = (RpcResponse)msg;
    String responseId = rpcResponse.getResponseId();
    MessageCallBack callBack = ResponseHolder.getInstance().mapCallBack.get(responseId);
    if(callBack != null){
    ResponseHolder.getInstance().mapCallBack.remove(responseId);
    callBack.over(rpcResponse);
    }
    }
    }

    MessageCallBack 的实现。

    public class MessageCallBack { private RpcRequest rpcRequest; private RpcResponse rpcResponse; private Lock lock = new ReentrantLock(); private Condition finish = lock.newCondition(); public MessageCallBack(RpcRequest request) {
    this.rpcRequest = request;
    } public Object start() throws InterruptedException {
    try {
    lock.lock();
    //设定一下超时时间,rpc服务器太久没有相应的话,就默认返回空吧。
    finish.await(10*1000, TimeUnit.MILLISECONDS);
    if (this.rpcResponse != null) {
    return this.rpcResponse.getData();
    } else {
    return null;
    }
    } finally {
    lock.unlock();
    }
    } public void over(RpcResponse reponse) {
    try {
    lock.lock();
    this.rpcResponse = reponse;
    finish.signal();
    } finally {
    lock.unlock();
    }
    } }

    既然是可插拔式框架,那么底层协议一定要是可选择的,所以我们定义一个顶层接口来支持我们选择协议。

    start 方法是启动服务端,send 方法是客户端发送数据。

    public interface Procotol {     void start(URL url);
    Object send(URL url, RpcRequest invocation);
    }

    对应的三个协议的接口实现。

    Netty 的实现

    public class DubboProcotol implements Procotol {
    @Override
    public void start(URL url) {
    NettyServer nettyServer = NettyServer.getInstance();
    nettyServer.start(url.getHost(),url.getPort());
    } @Override
    public Object send(URL url, RpcRequest invocation) {
    ArrayBlockingQueue<Channel> queue = NettyChannelPoolFactory.getInstance().acqiure(url);
    Channel channel = null;
    try {
    channel = queue.poll(invocation.getTimeout(), TimeUnit.MILLISECONDS);
    if(channel == null || !channel.isActive() || !channel.isOpen()|| !channel.isWritable()){
    channel = queue.poll(invocation.getTimeout(), TimeUnit.MILLISECONDS);
    if(channel == null){
    channel = NettyChannelPoolFactory.getInstance().registerChannel(url);
    }
    }
    //将本次调用的信息写入Netty通道,发起异步调用
    ChannelFuture channelFuture = channel.writeAndFlush(invocation);
    channelFuture.syncUninterruptibly();
    MessageCallBack callback = new MessageCallBack(invocation);
    ResponseHolder.getInstance().mapCallBack.put(invocation.getRequestId(), callback);
    try {
    return callback.start();
    } catch (InterruptedException e) {
    e.printStackTrace();
    }
    return null;
    } catch (InterruptedException e1) {
    e1.printStackTrace();
    }finally{
    System.out.println("release:"+channel.id());
    NettyChannelPoolFactory.getInstance().release(queue, channel, url);
    }
    return null;
    }
    }

    http 的实现

    public class HttpProcotol implements Procotol {
    @Override
    public void start(URL url) {
    HttpServer httpServer = HttpServer.getInstance();
    httpServer.start(url.getHost(),url.getPort());
    } @Override
    public Object send(URL url, RpcRequest invocation) {
    HttpClient httpClient = HttpClient.getInstance();
    return httpClient.post(url.getHost(),url.getPort(),invocation);
    }
    }

    Socket 的实现

    public class SocketProcotol implements Procotol {
    @Override
    public void start(URL url) {
    SocketServer socketServer = SocketServer.getInstance();
    socketServer.publiser(url.getPort());
    } @Override
    public Object send(URL url, RpcRequest invocation) {
    SocketClient socketClient = SocketClient.getInstance();
    return socketClient.sendRequest(url.getHost(),url.getPort(),invocation);
    }
    }

    这样一个可选择协议的模型就实现了,我们可已通过这个模块选择协议,并且与服务端通信。

相关推荐
python开发_常用的python模块及安装方法
adodb:我们领导推荐的数据库连接组件bsddb3:BerkeleyDB的连接组件Cheetah-1.0:我比较喜欢这个版本的cheeta…
日期:2022-11-24 点赞:878 阅读:9,023
Educational Codeforces Round 11 C. Hard Process 二分
C. Hard Process题目连接:http://www.codeforces.com/contest/660/problem/CDes…
日期:2022-11-24 点赞:807 阅读:5,513
下载Ubuntn 17.04 内核源代码
zengkefu@server1:/usr/src$ uname -aLinux server1 4.10.0-19-generic #21…
日期:2022-11-24 点赞:569 阅读:6,361
可用Active Desktop Calendar V7.86 注册码序列号
可用Active Desktop Calendar V7.86 注册码序列号Name: www.greendown.cn Code: &nb…
日期:2022-11-24 点赞:733 阅读:6,143
Android调用系统相机、自定义相机、处理大图片
Android调用系统相机和自定义相机实例本博文主要是介绍了android上使用相机进行拍照并显示的两种方式,并且由于涉及到要把拍到的照片显…
日期:2022-11-24 点赞:512 阅读:7,774
Struts的使用
一、Struts2的获取  Struts的官方网站为:http://struts.apache.org/  下载完Struts2的jar包,…
日期:2022-11-24 点赞:671 阅读:4,853