淘先锋技术网

首页 1 2 3 4 5 6 7

1 BIO和NIO简介

BIO、NIO、AIO简介

1)Java BIO : 同步并阻塞(传统阻塞型),服务器实现模式为一个连接一个线程,即客户端有连接请求时服务器端就需要启动一个线程进行处理,如果这个连接不做任何事情会造成不必要的线程开销

2)Java NIO同步非阻塞,服务器实现模式为一个线程处理多个请求(连接),即客户端发送的连接请求都会注册到多路复用器上,多路复用器轮询到连接有I/O请求就进行处理

3)Java AIO(NIO.2)异步非阻塞AIO 引入异步通道的概念,采用了 Proactor 模式,简化了程序编写,有效的请求才启动线程,它的特点是先由操作系统完成后才通知服务端程序启动线程去处理,一般适用于连接数较多且连接时间较长的应用

BIO、NIO、AIO应用场景

  • BIO方式适用于连接数目比较小且固定的架构,这种方式对服务器资源要求比较高,并发局限于应用中,JDK1.4以前的唯一选择,但程序简单易理解。
  • NIO方式适用于连接数目多且连接比较短(轻操作)的架构,比如聊天服务器,弹幕系统,服务器间通讯等。编程比较复杂,JDK1.4开始支持。
  • AIO方式使用于连接数目多且连接比较长(重操作)的架构,比如相册服务器,充分调用OS参与并发操作,编程比较复杂,JDK7开始支持。

NIO相关的系统调用

在发展过程中,其实在BIO和NIO之间还有一个非阻塞的IO,一个线程去处理多个socket连接,在用户态循环的查询每个socket的可用状态。

在后来发展出NIO,同步非阻塞。一个线程通过多路复用去管理多个socket连接,多路复用有几种机制:select、poll、epoll。

首先,每个客户端连接在Linux系统下,表现出现都是在内核态下有一个文件描述符fd,文件描述符有一个编号,不同的编号表示不同的连接。

1、select系统调用

select系统调用有一个重要参数,为fd文件描述符集合,即你要监听哪些文件描述符(哪些连接),这个文件描述符集合rset用一个bitmap位图表示,位图大小为1024,即最多只能监听1024个客户端连接。

当发起系统调用时,会将rset拷贝到内核态,然后内核态监听有没有数据可以处理,监听的所有文件描述符都没有数据的话会一直阻塞,直到有数据时,将有数据的fd索引置一,然后返回给用户态

  • Select缺点:
    • 位图大小默认1024,有上限。
    • 每次都需要创建一个文件描述符位图并拷贝到内核态。
    • 系统调用后返回的还是一个位图,用户态需要去遍历才能知道哪些fd有数据。

2、Poll系统调用

Poll工作原理与Select相同,不同的只是将位图数组改成链表,没有了最大连接数1024的限制,依然有fd集合的拷贝和O(n)的遍历过程。

3、Epoll_wait系统调用

为解决fd集合拷贝的问题,epoll采用用户态和内核态共享epoll_fds集合。epoll是内核空间用一个 红黑树维护所有的fd,只把就绪的fd用链表复制到用户空间。当调用epoll_wait系统调用时,内核态会去检查有哪些fd有事件,检查完毕后会将共享的epoll_fds集合重排序,将有事件的fd放在前面,并返回有事件的fd个数。

客户端收到返回的个数,就不需要全部遍历,而是直接处理fd。

epoll的两种触发模式

  • 水平触发(LT):epoll_wait检测到描述符事件时,通知进程,进程可以不立即处理该事件,下一次epoll_wait会再次通知。
  • 边缘触发(ET):通知之后必须处理,下一次epoll_wait将不会再通知。

io多路复用

Java提供的Selector是使用select还是poll还是epoll?

在windows下,找到Selector的默认创建类如下:

public class DefaultSelectorProvider {
    private DefaultSelectorProvider() {
    }

    public static SelectorProvider create() {
        return new WindowsSelectorProvider();
    }
}


public class WindowsSelectorProvider extends SelectorProviderImpl {
    public WindowsSelectorProvider() {
    }

    public AbstractSelector openSelector() throws IOException {
        return new WindowsSelectorImpl(this);
    }
}

再找到WindowsSelectorImpl,里面出现大量的poll,应该就是采用poll实现。

Linux下为:

public static SelectorProvider create() {
    String osname = AccessController
        .doPrivileged(new GetPropertyAction("os.name"));
    if (osname.equals("SunOS"))
        return createProvider("sun.nio.ch.DevPollSelectorProvider");
    if (osname.equals("Linux"))
        return createProvider("sun.nio.ch.EPollSelectorProvider");
    return new sun.nio.ch.PollSelectorProvider();
}

在Linux中,可能是poll、也可能是epoll。







2 Java的NIO编程

2.1 NIO编程简介

Java NIO三大核心部分

1)Buffer(缓冲区):每个客户端连接都会对应一个Buffer,读写数据通过缓冲区读写。

2)Channel(通道):每个channel用于连接Buffer和Selector,通道可以进行双向读写。

3)Selector(选择器):一个选择器可以对应多个通道,用于监听多个通道的事件。Selector可以监听所有的channel是否有数据需要读取,当某个channel有数据时,就去处理,所有channel都没有数据时,线程可以去执行其他任务。

image-20200717161455112

Buffer介绍

public static void main(String[] args) {
    //创建一个Int型的buffer,大小为5。相当于创建了一个大小为5的int数组
    IntBuffer buffer = IntBuffer.allocate(5);

    //往buffer中添加数据
    for (int i = 0; i < buffer.capacity(); i++) {
        buffer.put(i*2);
    }

    //buffer读写切换,之前为写数据,调用flip后切换为读
    buffer.flip();

    //读取数据
    while (buffer.hasRemaining()){
        System.out.println(buffer.get());
    }
}

Buffer的源码解析:

public abstract class IntBuffer extends Buffer
    implements Comparable<IntBuffer>
{
    final int[] hb;                  // 真正存放数据的数组
    final int offset;
    boolean isReadOnly;
}
public abstract class Buffer {
    // Invariants: mark <= position <= limit <= capacity
    private int mark = -1;     //标记位
    private int position = 0;  //当前读写位置
    private int limit;         //缓冲区的读取终点,小于等于最大容量
    private int capacity;      //最大容量
}

Buffer使用最多的是ByteBuffer,因为在网路传输中一般使用字节传输。

Channel介绍

NIO的Channel通道类似于流,但是通道可以同时读写,而流只能读或写。

Channel只是一个接口,里面有各种实现类。

image-20200717164739923

通过FileChannel和ByteBuffer将数据写入文件。

public static void main(String[] args) throws IOException {
    //创建一个文件输出流
    FileOutputStream fileOutputStream = new FileOutputStream("a.txt");
    //通过文件输出流得到一个FileChannel
    FileChannel fileChannel = fileOutputStream.getChannel();

    //创建一个buffer并写入数据
    ByteBuffer buffer = ByteBuffer.allocate(1024);
    buffer.put("hello".getBytes());
    buffer.flip();  //反转,让指针指向数组开头

    //将Buffer中数据写入FileChannel中
    fileChannel.write(buffer);
    fileOutputStream.close();
}

fileOutputStream.getChannel()代码:可见内部通过文件描述符,文件路径等创建了一个FileChannelImpl

public FileChannel getChannel() {
    synchronized (this) {
        if (channel == null) {
            channel = FileChannelImpl.open(fd, path, false, true, append, this);
        }
        return channel;
    }
}

MappedByteBuffer:可以让文件直接在内存(堆外内存)中修改,不需要操作系统拷贝一次。

public static void main(String[] args) throws IOException {
    //读取文件,能进行读写
    RandomAccessFile randomAccessFile = new RandomAccessFile("a.txt", "rw");
    FileChannel channel = randomAccessFile.getChannel();

    /**
     * 参数一:使用的模式(读写模式)
     * 参数二:可以直接修改的起始位置
     * 参数三:能修改的大小,最多能修改多少字节
     */
    //获取MappedByteBuffer对象
    MappedByteBuffer mBuffer = channel.map(FileChannel.MapMode.READ_WRITE, 0, 5);
    //修改buffer中内容,修改后直接修改了文件内容
    mBuffer.put(0, (byte)'H');
    randomAccessFile.close();
}

Selector

Selector能够检测多个注册的通道上是否有事件发生(注意:多个Channel以事件的方式可以注册到同一个Selector),如果有事件发生,便获取事件然后针对每个事件进行相应的处理。这样就可以只用一个单线程去管理多个通道,也就是管理多个连接和请求。

只有在 连接/通道 真正有读写事件发生时,才会进行读写,就大大地减少了系统开销,并且不必为每个连接都创建一个线程,不用去维护多个线程。

image-20200717183132441

步骤:

  • 当客户端连接时,会通过ServerSocketChannel得到SocketChannel
  • SocketChannel注册到Selector上,一个Selector可以注册多个SocketChannel
  • 注册后会返回一个SelectionKey,会和该Selector关联(加入到集合中)
  • Selector进行监听select方法,返回有事件发生的通道的个数。
  • 进一步得到各个有事件发生的SelectionKey
  • 通过SelectionKey反向获取SocketChannel,然后获取Channel的事件类型,并处理

Selector通过管理SelectionKey的集合从而去监听各个Channel

public void Server() throws IOException {
    //创建ServerSocketChannel -> ServerSocket
    ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
    //得到一个Selector对象
    Selector selector = Selector.open();
    //绑定一个端口6666
    serverSocketChannel.socket().bind(new InetSocketAddress(6666));
    //设置非阻塞
    serverSocketChannel.configureBlocking(false);

    //把 serverSocketChannel 注册到 selector ,关心事件为:OP_ACCEPT,有新的客户端连接
    SelectionKey register = serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
    System.out.println();
    //循环等待客户端连接
    while (true) {
        //等待1秒,如果没有事件发生,就返回
        if (selector.select(1000) == 0) {
            System.out.println("服务器等待了1秒,无连接");
            continue;
        }
        //如果返回的 > 0,表示已经获取到关注的事件
        // 就获取到相关的 selectionKey 集合,反向获取通道
        Set<SelectionKey> selectionKeys = selector.selectedKeys();

        //遍历 Set<SelectionKey>,使用迭代器遍历
        Iterator<SelectionKey> keyIterator = selectionKeys.iterator();
        while (keyIterator.hasNext()) {
            //获取到SelectionKey
            SelectionKey key = keyIterator.next();
            //根据 key 对应的通道发生的事件,做相应的处理
            if (key.isAcceptable()) {//如果是 OP_ACCEPT,有新的客户端连接
                //该客户端生成一个 SocketChannel
                SocketChannel socketChannel = serverSocketChannel.accept();
                System.out.println("客户端连接成功,生成了一个SocketChannel:" + socketChannel.hashCode());
                //将SocketChannel设置为非阻塞
                socketChannel.configureBlocking(false);
                //将socketChannel注册到selector,关注事件为 OP_READ,同时给SocketChannel关联一个Buffer
                socketChannel.register(selector, SelectionKey.OP_READ, ByteBuffer.allocate(1024));
            }
            if (key.isReadable()) {
                //通过key,反向获取到对应的Channel
                SocketChannel channel = (SocketChannel) key.channel();
                //获取到该channel关联的Buffer
                ByteBuffer buffer = (ByteBuffer) key.attachment();
                channel.read(buffer);
                System.out.println("from 客户端:" + new String(buffer.array()));
            }
            //手动从集合中移除当前的 selectionKey,防止重复操作
            keyIterator.remove();
        }

    }
public void client() throws IOException {
    //得到一个网络通道
    SocketChannel socketChannel = SocketChannel.open();
    //设置非阻塞
    socketChannel.configureBlocking(false);
    //提供服务器端的IP和端口
    InetSocketAddress socketAddress = new InetSocketAddress("127.0.0.1", 6666);
    //连接服务器
    if (!socketChannel.connect(socketAddress)){ //如果不成功
        while (!socketChannel.finishConnect()){
            System.out.println("因为连接需要时间,客户端不会阻塞,可以做其他工作。。。");
        }
    }

    //如果连接成功,就发送数据
    String str = "hello!!";
    ByteBuffer byteBuffer = ByteBuffer.wrap(str.getBytes());
    //发送数据,实际上就是将buffer数据写入到channel
    socketChannel.write(byteBuffer);

}









参考链接:https://juejin.im/post/5bea1d2e51882523d3163657#heading-12

大部分笔记来自于尚硅谷Netty教学视频