写点什么

网络编程三 - 原生 JDK 的 BIO 以及应用

  • 2021 年 11 月 12 日
  • 本文字数:7365 字

    阅读完需:约 24 分钟

/**


*@author Darkking



    *类说明:Bio 通信的服务端


    */


    public class Server {


    public static void main(String[] args) throws IOException {


    /服务器必备/


    ServerSocket serverSocket = new ServerSocket();


    /绑定监听端口/


    serverSocket.bind(new InetSocketAddress(10001));


    System.out.println("Server start.......");


    while(true){


    //每监听到一个客户端请求创建一个线程


    new Thread(new ServerTask(serverSocket.accept())).start();


    }


    }


    private static class ServerTask implements Runnable{


    private Socket socket = null;


    public ServerTask(Socket socket) {


    this.socket = socket;


    }


    @Override


    public void run() {


    /拿和客户端通讯的输入输出流/


    try(ObjectInputStream inputStream


    = new ObjectInputStream(socket.getInputStream());


    ObjectOutputStream outputStream


    = new ObjectOutputStream(socket.getOutputStream())){


    /服务器的输入/


    String userName = inputStream.readUTF();


    System.out.println("Accept clinet message:"+userName);


    outputStream.writeUTF("Hello,"+userName);


    outputStream.flush();


    }catch (Exception e){


    e.printStackTrace();


    }


    finally {


    try {


    socket.close();


    } catch (IOException e) {


    e.printStackTrace();


    }


    }


    }


    }


    import java.io.IOException;


    import java.io.ObjectInputStream;


    import java.io.ObjectOutputStream;


    import java.net.InetSocketAddress;


    import java.net.Socket;


    /**


    *@author Darkking



      *类说明:Bio 通信的服务端


      */


      public class Client {


      public static void main(String[] args) throws IOException {


      //客户端启动必备


      Socket socket = null;


      //实例化与服务端通信的输入输出流


      ObjectOutputStream output = null;


      ObjectInputStream input = null;


      //服务器的通信地址


      InetSocketAddress addr


      = new InetSocketAddress("127.0.0.1",10001);


      try{


      socket = new Socket();


      /连接服务器/


      socket.connect(addr);


      output = new ObjectOutputStream(socket.getOutputStream());


      input = new ObjectInputStream(socket.getInputStream());


      /向服务器输出请求/


      output.writeUTF("Darkking");


      output.flush();


      //接收服务器的输出


      System.out.println(input.readUTF());


      }finally{


      if (socket!=null) socket.close();


      if (output!=null) output.close();


      if (input!=null) input.close();


      }


      }


      }


      该模型最大的问题就是缺乏弹性伸缩能力,当客户端并发访问量增加后,服务端的线程个数和客户端并发访问数呈 1:1 的正比关系,Java 中的线程也是比较宝贵的系统资源,线程数量快速膨胀后,系统的性能将急剧下降,随着访问量的继续增大,系统最终就**死****--**


      为了改进这种一连接一线程的模型,我们可以使用线程池来管理这些线程,实现 1 个或多个线程处理 N 个客户端的模型(但是底层还是使用的同步阻塞 I/O),通常被称为“伪异步 I/O 模型“。


      我们知道,如果使用 CachedThreadPool 线程池(如果不太了解线程池,可以查看之前的并发专题线程池的使用),其实除了能自动帮我们管理线程(复用),看起来也就像是 1:1 的客户端:线程数模型,而使用 FixedThreadPool 我们就有效的控制了线程的最大数量,保证了系统有限的资源的控制,实现了 N:M 的伪异步 I/O 模型。


      但是,正因为限制了线程数量,如果发生读取数据较慢时(比如数据量大、网络传输慢等),大量并发的情况下,其他接入的消息,只能一直等待,这就是最大的弊端。


      服务端改进代码如下:


      mport java.io.IOException;


      import java.io.ObjectInputStream;


      import java.io.ObjectOutputStream;


      import java.net.InetSocketAddress;


      import java.net.ServerSocket;


      import java.net.Socket;


      import java.util.concurrent.ExecutorService;


      import java.util.concurrent.Executors;


      /**


      *@author Darkking



        *类说明:Bio 通信的服务端


        */


        public class ServerPool {


        //创建指定线程数的线程池,线程数量为 cpu 数


        private static ExecutorService executorService


        = Executors.newFixedThreadPool(


        Runtime.getRuntime().availableProcessors());


        public static void main(String[] args) throws IOException {


        //服务端启动必备


        ServerSocket serverSocket = new ServerSocket();


        //表示服务端在哪个端口上监听


        serverSocket.bind(new InetSocketAddress(10001));


        System.out.println("Start Server ....");


        try{


        while(true){


        executorService.execute(new ServerTask(serverSocket.accept()));


        }


        }finally {


        serverSocket.close();


        }


        }


        //每个和客户端的通信都会打包成一个任务,交个一个线程来执行


        private static class ServerTask implements Runnable{


        private Socket socket = null;


        public ServerTask(Socket socket){


        this.socket = socket;


        }


        @Override


        public void run() {


        //实例化与客户端通信的输入输出流


        try(ObjectInputStream inputStream =


        new ObjectInputStream(socket.getInputStream());


        ObjectOutputStream outputStream =


        new ObjectOutputStream(socket.getOutputStream())){


        //接收客户端的输出,也就是服务器的输入


        String userName = inputStream.readUTF();


        System.out.println("Accept client message:"+userName);


        //服务器的输出,也就是客户端的输入


        outputStream.writeUTF("Hello,"+userName);


        outputStream.flush();


        }catch(Exception e){


        e.printStackTrace();


        }finally {


        try {


        socket.close();


        } catch (IOException e) {


        e.printStackTrace();


        }


        }


        }


        }


        }


        二、BIO 应用-RPC 框架



        为什么要有 RPC?

        我们最开始开发的时候,一个应用一台机器,将所有功能都写在一起,比如说比较常见的电商场景。



        随着我们业务的发展,我们需要提示性能了,我们会怎么做?将不同的业务功能放到线程里来实现异步和提升性能。


        但是业务越来越复杂,业务量越来越大,单个应用或者一台机器的资源是肯定背负不起的,这个时候,我们会怎么做?将核心业务抽取出来,作为独立的服务,放到其他服务器上或者形成集群。这个时候就会请出 RPC,系统变为分布式的架构。


        为什么说千万级流量分布式、微服务架构必备的 RPC 框架?和 LocalCall 的代码进行比较,因为引入 rpc 框架对我们现有的代码影响最小,同时又可以帮我们实现架构上的扩展。现在的开源 rpc 框架,有什么?dubbo,grpc 等等


        当服务越来越多,各种 rpc 之间的调用会越来越复杂,这个时候我们会引入中间件,比如说 MQ、缓存,同时架构上整体往微服务去迁移,引入了各种比如容器技术 docker,DevOps 等等。最终会变为如图所示来应付千万级流量,但是不管怎样,rpc 总是会占有一席之地。


        什么是 RPC?

        RPC(Remote Procedure Call ——远程过程调用),它是一种通过网络从远程计算机程序上请求服务,而不需要了解底层网络的技术。



        一次完整的 RPC 同步调用流程:


        1)服务消费方(client)以本地调用方式调用客户端存根;


        2)什么叫客户端存根?就是远程方法在本地的模拟对象,一样的也有方法名,也有方法参数,client stub 接收到调用后负责将方法名、方法的参数等包装,并将包装后的信息通过网络发送到服务端;


        3)服务端收到消息后,交给代理存根在服务器的部分后进行解码为实际的方法名和参数


        4) server stub 根据解码结果调用服务器上本地的实际服务;


        5)本地服务执行并将结果返回给 server stub;


        6)server stub 将返回结果打包成消息并发送至消费方;


        7)client stub 接收到消息,并进行解码;


        8)服务消费方得到最终结果。


        RPC 框架的目标就是要中间步骤都封装起来,让我们进行远程方法调用的时候感觉到就像在本地调用一样。

        RPC 和 HTTP

        rpc 字面意思就是远程过程调用,只是对不同应用间相互调用的一种描述,一种思想。具体怎么调用?实现方式可以是最直接的 tcp 通信,也可以是 http 方式,在很多的消息中间件的技术书籍里,甚至还有使用消息中间件来实现 RPC 调用的,我们知道的 dubbo 是基于 tcp 通信的,gRPC 是 Google 公布的开源软件,基于最新的 HTTP2.0 协议,底层使用到了 Netty 框架的支持。所以总结来说,rpc 和 http 是完全两个不同层级的东西,他们之间并没有什么可比性。

        实现 RPC 框架

        实现 RPC 框架需要解决的那些问题呢?


        代理问题


        代理本质上是要解决什么问题?要解决的是被调用的服务本质上是远程的服务,但是调用者不知道也不关心,调用者只要结果,具体的事情由代理的那个对象来负责这件事。既然是远程代理,当然是要用代理模式了。


        代理(Proxy)是一种设计模式,即通过代理对象访问目标对象.这样做的好处是:可以在目标对象实现的基础上,增强额外的功能操作,即扩展目标对象的功能。那我们这里额外的功能操作是干什么,通过网络访问远程服务。


        jdk 的代理有两种实现方式:静态代理和动态代理。


        序列化问题


        序列化问题在计算机里具体是什么呢?我们的方法调用,有方法名,方法参数,这些可能是字符串,可能是我们自己定义的 java 的类,但是在网络上传输或者保存在硬盘的时候,网络或者硬盘并不认得什么字符串或者 javabean,它只认得二进制的 01 串,怎么办?要进行序列化,网络传输后要进行实际调用,就要把二进制的 01 串变回我们实际的 java 的类,这个叫反序列化。java 里已经为我们提供了相关的机制 Serializable。


        通信问题


        我们在用序列化把东西变成了可以在网络上传输的二进制的 01 串,但具体如何通过网络传输?那就要使用 JDK 为我们提供的 BIO 或者其他 IO。


        登记的服务实例化


        登记的服务有可能在我们的系统中就是一个名字,那他怎么变成实际执行的对象实例,当然是使用 JAVA 的反射机制。


        反射机制是什么?


        反射机制是在运行状态中,对于任意一个类,都能够知道这个类的所有属性和方法;对于任意一个对象,都能够调用它的任意一个方法和属性;这种动态获取的信息以及动态调用对象的方法的功能称为 java 语言的反射机制。


        反射机制主要提供了以下功能:


        ?在运行时判断任意一个对象所属的类;


        ?在运行时构造任意一个类的对象;


        ?在运行时判断任意一个类所具有的成员变量和方法;


        ?在运行时调用任意一个对象的方法;


        ?生成动态代理。

        手写 RPC 框架

        了解原理之后接下来我们就手写一个简单地 RPC 框架,包括服务注册发现(服务治理)。服务端提供发送短信接口服务,注册到注册中心,供客户端调用


        注:RPC 只是指远程过程调用,能满足远程服务调用的都可以说是支持 RPC。服务治理是一种实现模式。是为了对远程服务更好地进行管理而产生的。


        1、服务注册中心类


        /**


        *@author Darkking


        • 类说明:注册中心注册服务的实体类


        */


        public class RegisterServiceVo implements Serializable {


        private final String host;//服务提供者的 ip 地址


        private final int port;//服务提供者的端口


        public RegisterServiceVo(String host,int port) {


        this.host = host;


        this.port = port;


        }


        public String getHost() {


        return host;


        }


        public int getPort() {


        return port;


        }


        }


        /**


        *@author Darkking


        • 类说明:服务注册中心,服务提供者在启动时需要在注册中心登记自己的信息


        */


        public class RegisterCenter {


        //key 表示服务名,value 代表服务提供者地址的集合


        private static final Map<String,Set<RegisterServiceVo>> serviceHolder


        = new HashMap<>();


        //注册服务的端口号


        private int port;


        public RegisterCenter(int port) {


        this.port = port;


        }


        //服务注册,考虑到可能有多个提供者同时注册,进行加锁


        private static synchronized void registerSerive(String serviceName,


        String host,int port){


        //获得当前服务的已有地址集合


        Set<RegisterServiceVo> serviceVoSet = serviceHolder.get(serviceName);


        if(serviceVoSet==null){


        //已有地址集合为空,新增集合


        serviceVoSet = new HashSet<>();


        serviceHolder.put(serviceName,serviceVoSet);


        }


        //将新的服务提供者加入集合


        serviceVoSet.add(new RegisterServiceVo(host,port));


        System.out.println("服务已注册["+serviceName+"]," +


        "地址["+host+"],端口["+port+"]");


        }


        //取出服务提供者


        private static Set<RegisterServiceVo> getService(String serviceName){


        return serviceHolder.get(serviceName);


        }


        //处理服务请求的任务


        private static class ServerTask implements Runnable{


        private Socket client = null;


        public ServerTask(Socket client){


        this.client = client;


        }


        public void run() {


        try(ObjectInputStream inputStream =


        new ObjectInputStream(client.getInputStream());


        ObjectOutputStream outputStream =


        new ObjectOutputStream(client.getOutputStream())){


        //检查当前请求是注册服务还是获得服务


        boolean isGetService = inputStream.readBoolean();


        /获得服务提供者/


        if(isGetService){


        String serviceName = inputStream.readUTF();


        //取出服务提供者集合


        Set<RegisterServiceVo> result = getService(serviceName);


        //返回给客户端


        outputStream.writeObject(result);


        outputStream.flush();


        System.out.println("将已注册的服务["+serviceName+"提供给客户端");


        }


        /注册服务/


        else{


        //取得新服务提供方的 ip 和端口


        String serviceName = inputStream.readUTF();


        String host = inputStream.readUTF();


        int port = inputStream.readInt();


        //在注册中心保存


        registerSerive(serviceName,host,port);


        outputStream.writeBoolean(true);


        outputStream.flush();


        }


        }catch(Exception e){


        e.printStackTrace();


        }finally {


        try {


        client.close();


        } catch (IOException e) {


        e.printStackTrace();


        }


        }


        }


        }


        //启动注册服务


        public v


        【一线大厂Java面试题解析+后端开发学习笔记+最新架构讲解视频+实战项目源码讲义】
        浏览器打开:qq.cn.hn/FTf 免费领取
        复制代码


        oid startService() throws IOException {


        ServerSocket serverSocket = new ServerSocket();


        serverSocket.bind(new InetSocketAddress(port));


        System.out.println("RegisterCenter server on:"+port+":运行");


        try{


        while(true){


        new Thread(new ServerTask(serverSocket.accept())).start();


        }


        }finally {


        serverSocket.close();


        }


        }


        public static void main(String[] args) {


        new Thread(new Runnable() {


        public void run() {


        try{


        RegisterCenter serviceServer = new RegisterCenter(9999);


        serviceServer.startService();


        }catch(IOException e){


        e.printStackTrace();


        }


        }


        }).start();


        }


        }


        2、服务端


        对象实体类


        /**


        *@author Darkking


        • 类说明:注册中心注册服务的实体类


        */


        public class RegisterServiceVo implements Serializable {


        private final String host;//服务提供者的 ip 地址


        private final int port;//服务提供者的端口


        public RegisterServiceVo(String host, int port) {


        this.host = host;


        this.port = port;


        }


        public String getHost() {


        return host;


        }


        public int getPort() {


        return port;


        }


        }


        /**


        *@author Darkking



          *类说明:用户的实体类,已实现序列化


          */


          public class UserInfo implements Serializable {


          private final String name;


          private final String phone;


          public UserInfo(String name, String phone) {


          this.name = name;


          this.phone = phone;


          }


          public String getName() {


          return name;


          }


          public String getPhone() {


          return phone;


          }


          }


          服务提供类


          /**


          *@author Darkking



            *类说明:短信息发送接口


            */


            public interface SendSms {


            boolean sendMail(UserInfo user);


            }


            /**


            *@author Darkking



              *类说明:短信息发送服务的实现


              */


              public class SendSmsImpl implements SendSms {


              @Override


              public boolean sendMail(UserInfo user) {


              try {


              Thread.sleep(50);


              } catch (InterruptedException e) {


              e.printStackTrace();


              }


              System.out.println("已发送短信息给:"+user.getName()+"到【"+user.getPhone()+"】");


              return true;


              }


              }


              服务注册类


              **


              *@author Darkking



                *类说明:rpc 框架的服务端部分


                */


                public class RpcServerFrameReg {


                private static ExecutorService executorService


                = Executors.newFixedThreadPool(


                Runtime.getRuntime().availableProcessors());


                //服务在本地的注册中心,主要是接口名和实现类的对照


                private static final Map<String,Class> serviceHolder


                = new HashMap<>();


                //服务的端口号


                private int port;


                public RpcServerFrameReg(int port) {


                this.port = port;


                }


                //服务注册


                public void registerSerive(Class<?> serviceInterface,Class impl) throws IOException {


                Socket socket = null;


                ObjectOutputStream output = null;


                ObjectInputStream input = null;


                /向注册中心注册服务/


                try{


                socket = new Socket();


                socket.connect(new InetSocketAddress("127.0.0.1",9999));


                output = new ObjectOutputStream(socket.getOutputStream());


                output.writeBoolean(false);


                output.writeUTF(serviceInterface.getName());


                output.writeUTF("127.0.0.1");


                output.writeInt(port);


                output.flush();


                input = new ObjectInputStream(socket.getInputStream());


                if(input.readBoolean()){


                serviceHolder.put(serviceInterface.getName(),impl);


                System.out.println(serviceInterface.getName()+"服务注册成功");


                }else{


                System.out.println(serviceInterface.getName()+"服务注册失败");


                };


                }finally {


                if (socket!=null) socket.close();


                if (output!=null) output.close();


                if (input!=null) input.close();


                }


                }


                //处理服务请求任务


                private static class ServerTask implements Runnable{


                private Socket client = null;


                public ServerTask(Socket client){


                this.client = client;


                }


                public void run() {


                try(ObjectInputStream inputStream =


                new ObjectInputStream(client.getInputStream());


                ObjectOutputStream outputStream =


                new ObjectOutputStream(client.getOutputStream())){


                //方法所在类名接口名


                String serviceName = inputStream.readUTF();


                //方法的名字


                String methodName = inputStream.readUTF();


                //方法的入参类型


                Class<?>[] parmTypes = (Class<?>[]) inputStream.readObject();


                //方法入参的值


                Object[] args = (Object[]) inputStream.readObject();


                Class serviceClass = serviceHolder.get(serviceName);


                if (serviceClass == null){


                throw new ClassNotFoundException(serviceName+" Not Found");


                }


                Method method = serviceClass.getMethod(methodName,parmTypes);


                Object result = method.invoke(serviceClass.newInstance(),args);


                outputStream.writeObject(result);


                outputStream.flush();


                }catch(Exception e){


                e.printStackTrace();


                }finally {


                try {


                client.close();

                评论

                发布
                暂无评论
                网络编程三-原生JDK的BIO以及应用