java网络编程模型之BIO、NIO、AIO

阅读: 评论:0

java网络编程模型之BIO、NIO、AIO

java网络编程模型之BIO、NIO、AIO

目录

前言介绍

Java BIO[Blocking I/O] | 同步阻塞I/O模式

Java NIO[New I/O] | 同步非阻塞模式

​编辑

Java AIO[Asynchronous I/O] | 异步非阻塞I/O模型

AIO模型

同步/异步、阻塞/非阻塞

订阅-通知模式

AIO框架简析

基本执行流程

代码实现

BIO模型

基础补充

执行基本流程

代码实现

NIO模型

基础补充

执行基本流程

代码实现



前言介绍

被水印遮住的部分:处理完成

Java BIO[Blocking I/O] | 同步阻塞I/O模式

BIO是最常见的一种网络编程模型,它使用阻塞式I/O操作,即在进行I/O操作时,当前线程会被阻塞,直到数据准备好或者超时。在BIO模型中,通常需要为每个客户端连接创建一个独立的线程来处理I/O操作,因此对于大量并发连接的情况,会导致系统资源消耗较大。

被水印遮住的部分:处理完成

Java NIO[New I/O] | 同步非阻塞模式

  • Java NIO,全程 Non-Block IO ,是Java SE 1.4版以后,针对网络传输效能优化的新功能。是一种非阻塞同步的通信模式。
  • NIO 与原来的 I/O 有同样的作用和目的, 他们之间最重要的区别是数据打包和传输的方式。原来的 I/O 以流的方式处理数据,而 NIO 以块的方式处理数据。
  • 面向流的 I/O 系统一次一个字节地处理数据。一个输入流产生一个字节的数据,一个输出流消费一个字节的数据。
  • 面向块的 I/O 系统以块的形式处理数据。每一个操作都在一步中产生或者消费一个数据块。按块处理数据比按(流式的)字节处理数据要快得多。但是面向块的 I/O - 缺少一些面向流的 I/O 所具有的优雅性和简单性。
  • 还有与原来的I/O不同的是Java NIO 采用了非阻塞 I/O 操作机制,这是通过 Selector(选择器)实现的。Selector 会不断轮询注册在其上的 Channel(通道),如果某个 Channel 上面发生读或者写事件,这个 Channel 就处于就绪状态,会被 Selector 接收到,从而进行后续的 I/O 操作。这种机制使得一个线程可以同时处理多个 Channel 的 I/O 操作,从而大大提高了系统的并发处理能力。
  • 总之,与传统的阻塞 I/O 不同,Java NIO 采用了基于块的 I/O 操作方式和非阻塞 I/O 操作机制,可以更好地提高系统的处理性能和并发处理能力。

被水印遮住的部分:通知回调

Java AIO[Asynchronous I/O] | 异步非阻塞I/O模型

  • Java AIO,全程 Asynchronous IO,是异步非阻塞的IO。是一种非阻塞异步的通信模式。在NIO的基础上引入了新的异步通道的概念,并提供了异步文件通道和异步套接字通道的实现。
  • 在AIO模型中,应用程序提交I/O操作后,可以继续执行其他任务。当I/O操作完成后,会通过回调函数的方式通知应用程序,从而避免了I/O操作期间线程的阻塞。也就是说,AIO的异步操作机制可以使得应用程序不必等待I/O操作的完成,而是可以在I/O操作完成之后再进行相关处理。
  • AIO的异步操作机制是通过操作系统提供的异步I/O接口来实现的。在Java中,AIO提供了一个异步通道(AsynchronousChannel),通过该通道可以进行异步I/O操作。在进行异步I/O操作时,需要使用异步回调的方式进行处理,即当I/O操作完成后,系统会自动调用相关的回调函数来通知应用程序。

AIO模型

同步/异步、阻塞/非阻塞

 我们先来了解下什么是同步/异步,以及什么是阻塞/非阻塞。在IO操作中,IO分两阶段(一旦拿到数据后就变成了数据操作,不再是IO):

  1. 数据准备阶段
  2. 内核空间复制数据到用户进程缓冲区(用户空间)阶段 在操作系统中,程序运行的空间分为内核空间和用户空间。 应用程序都是运行在用户空间的,所以它们能操作的数据也都在用户空间。
  • 同步和异步IO的概念:同步是用户线程发起I/O请求后需要等待或者轮询内核I/O操作完成后才能继续执行 异步是用户线程发起I/O请求后仍需要继续执行,当内核I/O操作完成后会通知用户线程,或者调用用户线程注册的回调函数。
  • 阻塞和非阻塞IO的概念: 阻塞是指I/O操作需要彻底完成后才能返回用户空间 非阻塞是指I/O操作被调用后立即返回一个状态值,无需等I/O操作彻底完成。

  一般来讲: 阻塞IO模型、非阻塞IO模型、IO复用模型(select/poll/epoll)、信号驱动IO模型都属于同步IO,因为阶段2是阻塞的(尽管时间很短)。同步IO和异步IO的区别就在于第二个步骤是否阻塞: 如果不阻塞,而是操作系统帮你做完IO操作再将结果返回给你,那么就是异步IO。

订阅-通知模式

AIO模型采用“订阅-通知”模式:即应用程序向操作系统注册IO监听,然后继续做自己的事情。当操作系统发生IO事件,并且准备好数据后,再主动通知应用程序,触发相应的函数。流程图如下

AIO框架简析

下面的图片展示了AIO模型中各个类与接口之间的关系

基本执行流程

首先来介绍一下aio模型的基于客户端的基本执行流程

  1. 通过AsynchronousChannelGroup线程组创建一个AsynchronousServerSocketChannel对象,用于监听和接受客户端的连接请求。
  2. 调用AsynchronousServerSocketChannel的bind()方法,将其绑定到指定的IP地址和端口号。
  3. 调用AsynchronousServerSocketChannel的accept()方法,开始监听客户端的连接请求。
  4. 一旦监听到客户端的连接请求,在accept()方法中传入AioServerChannelInitializer实例,将服务端的通道初始化。
  5. 在通道初始化时会创建一个AioServerHandler实例作为参数传入,AioServerHandler继承了ChannelAdapter(通用消息处理器),在ChannelAdapter类中定义了三个抽象类,分别是channelActive(通道激活前)、channelInactive(通道断开前)、channelRead(读取消息抽象类前),在AioServerHandler中对上述方法进行重写,在这些方法可以增加相应的业务逻辑。
  6. ChannelAdapter中的completed方法会被触发,该方法是连接建立后进行异步读取数据的回调函数,在该回调函数中首先会读取数据进行判断是否关闭通道(在关闭通道之前会调用channelInactive),然后利用缓冲流进行数据读取(channelRead),然后调用异步通道对象的read方法注册下一次的异步读取事件,进行循环读取。
  7. 由于该模型的通信方式是全双工通信模式,因此客户端和服务端可以同时互相发送消息,服务端要进行消息发送时会通过ChannelHandler中的writeAndFlush方法进行消息推送。

代码实现

客户端

package com.kjz.NettyDemo.Aio.client;import java.nio.channels.AsynchronousSocketChannel; // 异步Socket通道
import java.InetSocketAddress; // InetSocketAddress类用于封装IP地址和端口号
import java.nio.ByteBuffer; // ByteBuffer类用于读写操作
import java.nio.charset.Charset; // Charset类用于指定字符编码方式
import urrent.Future; // Future接口用于获取异步操作的结果
public class AioClient {public static void main(String[] args) throws Exception {// 创建异步Socket通道AsynchronousSocketChannel socketChannel = AsynchronousSocketChannel.open();// 建立连接并返回Future对象Future<Void> future = t(new InetSocketAddress("192.168.1.116", 7397));// 打印启动提示信息System.out.println("kjz-demo-netty client start done.");// 等待连接完成();// 通过通道进行数据读取操作//参数一:一个容量为1024的字节缓冲区对象,用于存储服务器发送的数据。//参数二:是一个附件对象,可以在异步操作完成后传递给回调函数。//参数三:回调函数,用于在异步读取完成后处理读取的结果(消息处理器&#ad(ByteBuffer.allocate(1024),  null,new AioClientHandler(socketChannel, Charset.forName("GBK")));// 睡眠100秒,保持客户端运行//通过调用 Thread.sleep 方法可以暂停当前线程的执行,确保客户端的事件循环(Event Loop)持续执行,// 从而保持与服务器的通信。Thread.sleep(100000);}}

客户端消息处理器 

package com.kjz.NettyDemo.Aio.client;import com.kjz.NettyDemo.Aio.ChannelAdapter;
import com.kjz.NettyDemo.Aio.ChannelHandler;import java.io.IOException;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.charset.Charset;
import java.util.Date;
//客户端消息处理器
public class AioClientHandler extends ChannelAdapter {public AioClientHandler(AsynchronousSocketChannel channel, Charset charset) {super(channel, charset);}//channelActive方法在AIO客户端与服务器建立连接后被调用,输出连接信息。@Overridepublic void channelActive(ChannelHandler ctx) {try {System.out.println("连接远程服务端:" +//ctx通道处理器,从通道处理器中获取当前连接的异步通道对象ctx.channel().getRemoteAddress()+"成功");//通知客户端链接建立成功} catch (IOException e) {e.printStackTrace();}}//断开连接@Overridepublic void channelInactive(ChannelHandler ctx) {}/*channelRead方法在AIO客户端接收到服务器发送的消息时被调用,首先输出接收到的消息,然后通过ctx.writeAndFlush方法向服务器发送响应消息,告知服务器已经成功处理该消息。全双共通信方式*/@Overridepublic void channelRead(ChannelHandler ctx, Object msg) {System.out.println("服务端收到:" + new Date() + " " + msg + "rn");ctx.writeAndFlush("客户端信息处理Success!rn");}}

服务端

package com.kjz.NettyDemo.Aio.server;import java.InetSocketAddress;
import java.nio.channels.AsynchronousChannelGroup;
import java.nio.channels.AsynchronousServerSocketChannel;
import urrent.CountDownLatch;
import urrent.Executors;
//服务端
public class AioServer extends  Thread{//异步服务端socket通道对象private AsynchronousServerSocketChannel serverSocketChannel;@Overridepublic void run() {try {//使用AsynchronousChannelGroup是为了管理异步通道资源,它可以将多个异步通道共享同一线程池。// 在这里,使用固定大小的线程池来处理异步请求,避免了每次请求都创建新线程的开销。serverSocketChannel = AsynchronousServerSocketChannel.open(AsynchronousChannelGroup.wCachedThreadPool(),10));//绑定端口serverSocketChannel.bind(new InetSocketAddress(7397));System.out.println("kjz-demo-netty server start done.");//通过CountDownLatch等待客户端连接CountDownLatch latch = new CountDownLatch(1);//监听客户端连接serverSocketChannel.accept(this, new AioServerChannelInitializer());latch.await();} catch (Exception e) {e.printStackTrace();}}public AsynchronousServerSocketChannel serverSocketChannel() {return serverSocketChannel;}public static void main(String[] args) {new AioServer().start();}
}

服务端初始化通道

package com.kjz.NettyDemo.Aio.server;import com.kjz.NettyDemo.Aio.ChannelInitializer;import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.charset.Charset;
import urrent.TimeUnit;
//服务端初始化通道
public class AioServerChannelInitializer extends ChannelInitializer {@Overrideprotected void initChannel(AsynchronousSocketChannel channel) throws Exception {ad(ByteBuffer.allocate(1024), 10, TimeUnit.SECONDS,null, new AioServerHandler(channel, Charset.forName("GBK")));}}

服务端消息处理器

package com.kjz.NettyDemo.Aio.server;import com.kjz.NettyDemo.Aio.ChannelAdapter;
import com.kjz.NettyDemo.Aio.ChannelHandler;import java.io.IOException;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.charset.Charset;
import java.util.Date;public class AioServerHandler extends ChannelAdapter {public AioServerHandler(AsynchronousSocketChannel channel, Charset charset) {super(channel, charset);}@Overridepublic void channelActive(ChannelHandler ctx) {try {System.out.println("连接远程客户端:" +ctx.channel().getRemoteAddress()+"成功");//通知客户端链接建立成功ctx.writeAndFlush("服务端连接建立成功" + " " + new Date() + " " +ctx.channel().getRemoteAddress() + "rn");} catch (IOException e) {e.printStackTrace();}}@Overridepublic void channelInactive(ChannelHandler ctx) {}@Overridepublic void channelRead(ChannelHandler ctx, Object msg) {System.out.println("服务端收到:" + new Date() + " " + msg + "rn");ctx.writeAndFlush("服务端信息处理Success!rn");}}

通用消息处理器

package com.kjz.NettyDemo.Aio;import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.channels.CompletionHandler;
import java.nio.charset.Charset;
import urrent.TimeUnit;
//通用消息处理器
public abstract class ChannelAdapter implements CompletionHandler<Integer, Object> {//异步socket通道private AsynchronousSocketChannel channel;//字符集private Charset charset;//构造方法public ChannelAdapter(AsynchronousSocketChannel channel, Charset charset) {this.channel = channel;this.charset = charset;//通道打开时if (channel.isOpen()) {channelActive(new ChannelHandler(channel, charset));}}//异步读取到数据后的回调函数,该方法根据异步读取结果判断是否需要关闭通道,// 然后将缓冲区中的字节数组转换为字符串,并通过channelRead方法处理读取到的消息。@Overridepublic void completed(Integer result, Object attachment) {try {//创建一个大小为1024的ByteBuffer对象,用于存储读取到的数据final ByteBuffer buffer = ByteBuffer.allocate(1024);//定义一个超时时间,单位为秒,默认为1小时。final long timeout = 60 * 60L;//调用ad方法进行异步读取//buffer参数是要读取的缓冲区//timeout是读取超时时间//TimeUnit.SECONDS表示超时时间的单位//attachment;附件//new CompletionHandler<Integer, Object>()返回一个新的CompletionHandler对象用于处理读取结果。ad(buffer, timeout, TimeUnit.SECONDS, null,//匿名类new CompletionHandler<Integer, Object>() {@Overridepublic void completed(Integer result, Object attachment) {//判断是否需要关闭通道if (result == -1) {try {//首先判断result是否等于-1,如果是则表示通道已关闭,// 需要执行关闭通道的逻辑。在关闭通道之前,会调用channelInactive方法表示通道已断开,// 并关闭通道。channelInactive(new ChannelHandler(channel, charset));channel.close();} catch (IOException e) {e.printStackTrace();}return;}//使用buffer.flip()反转缓冲区,将读取到的数据准备为读取状态。buffer.flip();//创建一个新的ChannelHandler对象,并通过channelRead方法处理读取到的消息,// 同时传入通道和字符集。channelRead(new ChannelHandler(channel, charset), charset.decode(buffer));//调用buffer.clear()清空缓冲区,以便重新写入数据。buffer.clear();//注册下一次的异步读取事件,使用之前定义的超时时间和时间单位,null表示附件,// this表示当前的CompletionHandler对象。循环读取ad(buffer, timeout, TimeUnit.SECONDS, null, this);}//异步读取出现异常时的回调函数,将异常信息输出到控制台。@Overridepublic void failed(Throwable exc, Object attachment) {exc.printStackTrace();}});} catch (Exception e) {e.printStackTrace();}}@Overridepublic void failed(Throwable exc, Object attachment) {StackTrace();}//通道激活public abstract void channelActive(ChannelHandler ctx);//通道断开public abstract void channelInactive(ChannelHandler ctx);// 读取消息抽象类public abstract void channelRead(ChannelHandler ctx, Object msg);}

通道处理器

package com.kjz.NettyDemo.Aio;import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.charset.Charset;
//通道处理器
public class ChannelHandler {private AsynchronousSocketChannel channel;private Charset charset;public ChannelHandler(AsynchronousSocketChannel channel, Charset charset) {this.channel = channel;this.charset = charset;}//用于将数据写入异步Socket通道中,并发送到远程节点public void writeAndFlush(Object msg) {//方法接收一个Object类型的参数msg,首先将其转换为字节数组byte[] bytes = String().getBytes(charset);//根据该字节数组的长度创建一个新的ByteBuffer对象writeBuffer。ByteBuffer writeBuffer = ByteBuffer.allocate(bytes.length);//字节数组写入缓冲区中writeBuffer.put(bytes);//将缓冲区准备为写入状态writeBuffer.flip();channel.write(writeBuffer);}public AsynchronousSocketChannel channel() {return channel;}public void setChannel(AsynchronousSocketChannel channel) {this.channel = channel;}
}

初始化处理通道的回调方法

package com.kjz.NettyDemo.Aio;import com.kjz.NettyDemo.Aio.server.AioServer;import java.nio.channels.AsynchronousSocketChannel;
import java.nio.channels.CompletionHandler;
//初始化处理通道的回调方法
public abstract class ChannelInitializer implementsCompletionHandler<AsynchronousSocketChannel, AioServer> {@Overridepublic void completed(AsynchronousSocketChannel channel, AioServer attachment) {try {//调用initChannel(channel)方法对通道进行初始化操作initChannel(channel);} catch (Exception e) {e.printStackTrace();} finally {//都会执行// 再此接收客户端连接,保持持续监听attachment.serverSocketChannel().accept(attachment, this);        }}@Overridepublic void failed(Throwable exc, AioServer attachment) {StackTrace();}//初始化通道方法,具体的初始化逻辑由子类实现,因此该方法被声明为抽象方法。protected abstract void initChannel(AsynchronousSocketChannel channel) throws Exception;}

BIO模型

基础补充

BIO模型是基于Socket实现的,Socket对象是实现网络通信的基础类之一,用于在客户端和服务器之间建立可靠的双向通信连接。下面首先先介绍一下该对象

  1. Socket和ServerSocket:

         Socket类用于客户端,可以与服务端建立连接,发送请求和接收响应。

         ServerSocket类用于服务器端,监听指定的端口,接受客户端的连接请求,并创建相应的Socket对象进行通信。

  2. 构造方法:

          Socket类的常用构造方法有以下几种:

             Socket(String host, int port):根据主机名和端口号创建Socket对象。
             Socket(InetAddress address, int port):根据IP地址和端口号创建Socket对象。
             Socket(String host, int port, InetAddress localAddr, int localPort):根据主机名、           端口号、本地IP地址和本地端口号创建Socket对象。
          ServerSocket类的常用构造方法有以下几种:

             ServerSocket(int port):创建一个绑定到指定端口号的ServerSocket对象。
             ServerSocket(int port, int backlog):创建一个绑定到指定端口号,并指定连接请           求队列长度的ServerSocket对象。
             ServerSocket(int port, int backlog, InetAddress bindAddr):创建一个绑定到指               定端口号和本地IP地址,并指定连接请求队列长度的ServerSocket对象。

  3. 常用方法:

            Socket类的常用方法:

                getInputStream():获取与Socket关联的输入流,用于接收服务器发送的数据。
                getOutputStream():获取与Socket关联的输出流,用于向服务器发送数据。
                 isConnected():判断Socket是否连接到远程主机。
                 isClosed():判断Socket是否已关闭。
                 close():关闭Socket连接。
             ServerSocket类的常用方法:

                accept():监听并接受客户端的连接请求,并返回一个新的Socket对象供通信                使用。
                 isBound():判断ServerSocket是否已绑定到指定的端口。
                 isClosed():判断ServerSocket是否已关闭。
                 close():关闭ServerSocket。
    Java中的Socket对象是网络通信的基础类,用于建立客户端与服务器之间的连接并进行数据交换。它提供了丰富的方法和功能,方便开发者在网络编程中进行数据传输和操作。

执行基本流程

从服务端来分析:

  1. 创建ServerSocket对象,绑定监听端口。
  2. 进入循环等待客户端连接请求。在循环中调用accept方法接受客户端的连接请求,一旦有客户端连接成功,就创建一个新的Socket对象来处理与该客户端的通信。BIO模型的特点是采用阻塞式I/O,即当服务器执行accept方法时,如果没有客户端连接到来,服务器会一直阻塞在这一步,直到有新的连接请求到达才会继续执行。
  3. 创建BioServerHandler线程处理客户端的请求和数据传输。每个客户端连接都会占用一个独立的线程来处理通信。
  4. 服务器持续监听客户端的连接请求。

代码实现

客户端

package com.kjz.NettyDemo.Bio.client;import java.io.IOException;
import java.Socket;
import java.nio.charset.Charset;public class BioClient {public static void main(String[] args) {try {Socket socket = new Socket("192.168.1.116", 7397);System.out.println("kjz-demo-netty client start done.");BioClientHandler bioClientHandler = new BioClientHandler(socket,Charset.forName("utf-8"));bioClientHandler.start();} catch (IOException e) {e.printStackTrace();}}
}

客户端消息处理器

package com.kjz.NettyDemo.Bio.client;import com.kjz.NettyDemo.Bio.ChannelAdapter;
import com.kjz.NettyDemo.Bio.ChannelHandler;import java.Socket;
import java.nio.charset.Charset;
SimpleDateFormat;
import java.util.Date;
//客户端消息处理器
public class BioClientHandler extends ChannelAdapter {public BioClientHandler(Socket socket, Charset charset) {super(socket, charset);}@Overridepublic void channelActive(ChannelHandler ctx) {System.out.println("连接报告LocalAddress:" + ctx.socket().getLocalAddress());ctx.writeAndFlush("hi! My name is KJZ BioClient to msg for you rn");}@Overridepublic void channelRead(ChannelHandler ctx, Object msg) {System.out.println(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()) + " 接收到消息:" + msg);ctx.writeAndFlush("hi 我已经收到你的消息Success!rn");}
}

服务端

package com.kjz.NettyDemo.Bio.server;import com.kjz.NettyDemo.Bio.server.BioServerHandler;import java.io.IOException;
import java.InetSocketAddress;
import java.ServerSocket;
import java.Socket;
import java.nio.charset.Charset;public class BioServer extends Thread {private ServerSocket serverSocket = null;public static void main(String[] args) {BioServer bioServer = new BioServer();bioServer.start();}@Overridepublic void run() {try {serverSocket = new ServerSocket();serverSocket.bind(new InetSocketAddress(7397));System.out.println("kjz-demo-netty bio server start done. ");//进入循环等待客户端连接请求:while (true) {//在循环中调用accept方法接受客户端的连接请求,一旦有客户端连接成功,// 就创建一个新的Socket对象来处理与该客户端的通信。Socket socket = serverSocket.accept();//创建BioServerHandler线程处理客户端的请求和数据传输BioServerHandler handler = newBioServerHandler(socket, Charset.forName("GBK"));handler.start();}} catch (IOException e) {e.printStackTrace();}}
}

服务端消息处理器

package com.kjz.NettyDemo.Bio.server;import com.kjz.NettyDemo.Bio.ChannelAdapter;
import com.kjz.NettyDemo.Bio.ChannelHandler;import java.Socket;
import java.nio.charset.Charset;
SimpleDateFormat;
import java.util.Date;
//服务端消息处理器
public class BioServerHandler extends ChannelAdapter {public BioServerHandler(Socket socket, Charset charset) {super(socket, charset);}@Overridepublic void channelActive(ChannelHandler ctx) {System.out.println("连接LocalAddress:" + ctx.socket().getLocalAddress());ctx.writeAndFlush("hi! My name is KJZ BioServer to msg for you rn");}@Overridepublic void channelRead(ChannelHandler ctx, Object msg) {System.out.println(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()) + " 接收到消息:" + msg);ctx.writeAndFlush("hi 我已经收到你的消息Success!rn");}
}

通用消息处理器

package com.kjz.NettyDemo.Bio;import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.Socket;
import java.nio.charset.Charset;
//通用消息处理器
public abstract class ChannelAdapter extends Thread {private Socket socket;private ChannelHandler channelHandler;private Charset charset;public ChannelAdapter(Socket socket, Charset charset) {this.socket = socket;this.charset = charset;while (!socket.isConnected()) {break;}channelHandler = new ChannelHandler(this.socket, charset);channelActive(channelHandler);}@Overridepublic void run() {try {BufferedReader input = new BufferedReader(new InputStreamReader(InputStream(), charset));String str = null;while ((str = adLine()) != null) {channelRead(channelHandler, str);}} catch (IOException e) {e.printStackTrace();}}// 连接通知抽象类public abstract void channelActive(ChannelHandler ctx);// 读取消息抽象类public abstract void channelRead(ChannelHandler ctx, Object msg);}

通道处理器

package com.kjz.NettyDemo.Bio;import java.io.IOException;
import java.io.OutputStream;
import java.Socket;
import java.nio.charset.Charset;
//通道处理器
public  class ChannelHandler {private Socket socket;private Charset charset;public ChannelHandler(Socket socket, Charset charset) {this.socket = socket;this.charset = charset;}//消息写入public void writeAndFlush(Object msg) {OutputStream out = null;try {out = OutputStream();out.write((String()).getBytes(charset));out.flush();} catch (IOException e) {e.printStackTrace();}}public Socket socket() {return socket;}
}

NIO模型

基础补充

Selector:

       Selector是Java NIO中的一个重要组件,用于实现非阻塞IO操作。它可以通过一个线程同时监听多个Channel的IO事件,从而实现高效的IO多路复用。


       Selector的主要作用是管理多个Channel,并监听这些Channel上的IO事件。它可以通过调用select()方法阻塞等待就绪的IO事件,然后返回就绪的IO事件的数量。通过selectedKeys()方法可以获取到已经就绪的IO事件的集合,然后可以遍历这个集合进行相应的处理。

       Selector的常用方法包括:
              open():创建一个新的Selector对象。


              close():关闭Selector对象。


              select():阻塞等待就绪的IO事件,返回就绪的IO事件的数量。


              select(long timeout):阻塞等待就绪的IO事件,最多等待timeout毫秒,返回就绪的                IO事件的数量。


              selectNow():非阻塞立即返回就绪的IO事件的数量。


              wakeup():唤醒阻塞在select()方法上的线程。


              keys():返回当前注册在Selector上的所有Channel的SelectionKey。
             

              selectedKeys():返回已经就绪的IO事件的集合。


              在使用Selector时,需要将Channel注册到Selector上,并指定感兴趣的IO事件,如读、写、连接、接收等事件。通过SelectionKey可以获取到注册的Channel以及感兴趣的IO事件。

执行基本流程

从服务端的角度进行分析:

  1. 启动服务器时,创建一个Selector实例,并打开一个ServerSocketChannel通道,并对ServerSocketChannel进行响应配置。

  2. 使用ServerSocketChannel绑定指定的端口号,并设置最大连接数(socketChannel.socket().bind(new InetSocketAddress(port), 1024))。

  3. ServerSocketChannel注册到Selector上,设置关注的事件为SelectionKey.OP_ACCEPT,表示对客户端连接事件感兴趣()。

  4. 创建一个NioServerHandler实例,并将Selector和字符集传递给它。

  5. 开始进入事件循环(while(true)),不断轮询Selector上发生的事件。

  6. 调用Selectorselect()方法,阻塞等待事件发生。

  7. 当某个事件发生时,select()方法返回,并返回一组发生事件的SelectionKey集合。

  8. 遍历处理每个SelectionKey,判断其对应的事件类型。

  9. 如果是OP_ACCEPT事件,表示有客户端连接请求,调用NioServerHandlerchannelActive()方法处理连接请求。

  10. 如果是OP_READ事件,表示有数据可读,调用NioServerHandlerchannelRead()方法处理读取的数据。

  11. channelActive()channelRead()方法中,可以通过ChannelHandlerwriteAndFlush()方法将要发送的数据写入通道。

  12. 继续循环执行步骤6-12,处理下一个事件。

代码实现

客户端

package com.kjz.NettyDemo.Nio.client;import java.io.IOException;
import java.InetSocketAddress;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.nio.charset.Charset;
public class NioClient {public static void main(String[] args) throws IOException {//创建Selector对象Selector selector = Selector.open();//创建SocketChannel对象SocketChannel socketChannel = SocketChannel.open();//配置为非阻塞模式figureBlocking(false);//尝试建立连接boolean isConnect = t(new InetSocketAddress("192.168.1.116", 7397));if (isConnect) {//如果连接成功,则表示该SocketChannel已经可以进行读操作(OP_READ),// 因此将其注册到Selector上,等待IO事件的发生。ister(selector, SelectionKey.OP_READ);} else {//如果连接失败,则需要等待连接建立完成(OP_CONNECT)的IO事件。// 同样将该SocketChannel注册到Selector上,等待IO事件的发生。ister(selector, SelectionKey.OP_CONNECT);}System.out.println("kjz-demo-netty nio client start done.");//创建客户端消息处理器对象,,并传入Selector和字符集参数,然后启动该对象的线程。new NioClientHandler(selector, Charset.forName("GBK")).start();}}

客户端消息处理器

package com.kjz.NettyDemo.Nio.client;import com.kjz.NettyDemo.Nio.ChannelAdapter;
import com.kjz.NettyDemo.Nio.ChannelHandler;import java.io.IOException;
import java.nio.channels.Selector;
import java.nio.charset.Charset;
SimpleDateFormat;
import java.util.Date;
//客户端消息处理器
public class NioClientHandler extends ChannelAdapter {public NioClientHandler(Selector selector, Charset charset) {super(selector, charset);}@Overridepublic void channelActive(ChannelHandler ctx) {try {System.out.println("连接报告LocalAddress:" + ctx.channel().getLocalAddress());//向服务端响应连接成功的信息ctx.writeAndFlush("hi! My name is KJZ NioClient to msg for you rn");} catch (IOException e) {e.printStackTrace();}}@Overridepublic void channelRead(ChannelHandler ctx, Object msg) {System.out.println(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date())+ " 接收到消息:" + msg);ctx.writeAndFlush("hi 我已经收到你的消息Success!rn");}}

服务端

package com.kjz.NettyDemo.Nio.server;import java.io.IOException;
import java.InetSocketAddress;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.charset.Charset;public class NioServer {private Selector selector;private ServerSocketChannel socketChannel;public static void main(String[] args) throws IOException {new NioServer().bind(7397);}public void bind(int port) {try {selector = Selector.open();socketChannel = ServerSocketChannel.open();figureBlocking(false);socketChannel.socket().bind(new InetSocketAddress(port), 1024);ister(selector, SelectionKey.OP_ACCEPT);System.out.println("itstack-demo-netty nio server start done. ");new NioServerHandler(selector, Charset.forName("GBK")).start();} catch (IOException e) {e.printStackTrace();}}}

服务端消息处理器

package com.kjz.NettyDemo.Nio.server;import com.kjz.NettyDemo.Nio.ChannelAdapter;
import com.kjz.NettyDemo.Nio.ChannelHandler;import java.io.IOException;
import java.nio.channels.Selector;
import java.nio.charset.Charset;
SimpleDateFormat;
import java.util.Date;
//服务端消息处理器
public class NioServerHandler extends ChannelAdapter {public NioServerHandler(Selector selector, Charset charset) {super(selector, charset);}@Overridepublic void channelActive(ChannelHandler ctx) {try {System.out.println("连接报告LocalAddress:" + ctx.channel().getLocalAddress());ctx.writeAndFlush("hi! My name is KJZ NioServer to msg for you rn");} catch (IOException e) {e.printStackTrace();}}@Overridepublic void channelRead(ChannelHandler ctx, Object msg) {System.out.println(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()) + " 接收到消息:" + msg);ctx.writeAndFlush("hi 我已经收到你的消息Success!rn");}}

通用消息处理器

package com.kjz.NettyDemo.Nio;import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.nio.charset.Charset;
import java.util.Iterator;
import java.util.Set;//通用消息处理类
public abstract class ChannelAdapter extends Thread {private Selector selector;private ChannelHandler channelHandler;private Charset charset;public ChannelAdapter(Selector selector, Charset charset) {this.selector = selector;this.charset = charset;}//线程执行逻辑@Overridepublic void run() {while (true) {try {//阻塞等待就绪的IO事件/*** 常见的等待就绪的IO事件包括:* 可读事件(OP_READ):表示SocketChannel中有数据可读取。* 可写事件(OP_WRITE):表示SocketChannel可以写入数据。* 连接建立完成事件(OP_CONNECT):表示SocketChannel的连接已经建立完成。* 新的客户端连接事件(OP_ACCEPT):表示ServerSocketChannel有新的客户端连接请求。*/selector.select(1000);//遍历等待的IO事件Set<SelectionKey> selectedKeys = selector.selectedKeys();Iterator<SelectionKey> it = selectedKeys.iterator();SelectionKey key = null;while (it.hasNext()) {key = it.next();it.remove();handleInput(key);}} catch (Exception ignore) {}}}//处理IO事件private void handleInput(SelectionKey key) throws IOException {if (!key.isValid()) return;// 获取IO事件类型Class<?> superclass = key.channel().getClass().getSuperclass();//根据不同的IO类型进行处理//客户端SocketChannelif (superclass == SocketChannel.class){SocketChannel socketChannel = (SocketChannel) key.channel();if (key.isConnectable()) {//判断是否连接成功if (socketChannel.finishConnect()) {channelHandler = newChannelHandler(socketChannel, charset);channelActive(channelHandler);//事件注册ister(selector,SelectionKey.OP_READ);} else {it(1);}}}// 服务端ServerSocketChannelif (superclass == ServerSocketChannel.class){if (key.isAcceptable()) {ServerSocketChannel serverSocketChannel = (ServerSocketChannel) key.channel();//通过accept方法获取SocketChannelSocketChannel socketChannel = serverSocketChannel.accept();//配置为非阻塞模式figureBlocking(false);//注册读事件(OP_READ&#ister(selector, SelectionKey.OP_READ);//创建ChannelHandler对象,并调用channelActive方法进行连接通知。channelHandler = new ChannelHandler(socketChannel, charset);channelActive(channelHandler);}}//如果是读事件(isReadableif (key.isReadable()) {//从key中获取SocketChannel对象SocketChannel socketChannel = (SocketChannel) key.channel();ByteBuffer readBuffer = ByteBuffer.allocate(1024);//从SocketChannel中读取数据到ByteBuffer中int readBytes = ad(readBuffer);if (readBytes > 0) {//调用flip()方法将Buffer从写模式切换为读模式readBuffer.flip();//根据剩余可读数据的长度,创建一个字节数组。byte[] bytes = new aining()];//将缓冲区中的数据读取到字节数组中(bytes);channelRead(channelHandler, new String(bytes, charset));} else if (readBytes < 0) {key.cancel();socketChannel.close();}}}// 连接通知抽象类public abstract void channelActive(ChannelHandler ctx);// 读取消息抽象类public abstract void channelRead(ChannelHandler ctx, Object msg);}

通道处理器

package com.kjz.NettyDemo.Nio;import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;
import java.nio.charset.Charset;
//通道处理器
public class ChannelHandler {private SocketChannel channel;private Charset charset;public ChannelHandler(SocketChannel channel, Charset charset) {this.channel = channel;this.charset = charset;}//将数据写入通道public void writeAndFlush(Object msg) {try {byte[] bytes = String().getBytes(charset);ByteBuffer writeBuffer = ByteBuffer.allocate(bytes.length);writeBuffer.put(bytes);writeBuffer.flip();channel.write(writeBuffer);} catch (IOException e) {e.printStackTrace();}}public SocketChannel channel() {return channel;}}

最后都看到这里了,麻烦给个点赞收藏加关注吧

你们的点赞收藏加关注是我持续更新的最大动力~

本文发布于:2024-02-02 20:12:42,感谢您对本站的认可!

本文链接:https://www.4u4v.net/it/170687596146168.html

版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。

标签:网络编程   模型   java   AIO   NIO
留言与评论(共有 0 条评论)
   
验证码:

Copyright ©2019-2022 Comsenz Inc.Powered by ©

网站地图1 网站地图2 网站地图3 网站地图4 网站地图5 网站地图6 网站地图7 网站地图8 网站地图9 网站地图10 网站地图11 网站地图12 网站地图13 网站地图14 网站地图15 网站地图16 网站地图17 网站地图18 网站地图19 网站地图20 网站地图21 网站地图22/a> 网站地图23