1. 现实场景
我们试想一下这样的现实场景:
一个餐厅同时有100位客人到店,当然到店后第一件要做的事情就是点菜。但是问题来了,餐厅老板为了节约人力成本目前只有一位大堂服务员拿着唯一的一本菜单等待客人进行服务。
那么最笨(但是最简单)的方法是(方法A),无论有多少客人等待点餐,服务员都把仅有的一份菜单递给其中一位客人,然后站在客人身旁等待这个客人完成点菜过程。在记录客人点菜内容后,把点菜记录交给后堂厨师。然后是第二位客人。。。。然后是第三位客人。很明显,只有脑袋被门夹过的老板,才会这样设置服务流程。因为随后的80位客人,再等待超时后就会离店(还会给差评)。
于是还有一种办法(方法B),老板马上新雇佣99名服务员,同时印制99本新的菜单。每一名服务员手持一本菜单负责一位客人(关键不只在于服务员,还在于菜单。因为没有菜单客人也无法点菜)。在客人点完菜后,记录点菜内容交给后堂厨师(当然为了更高效,后堂厨师最好也有100名)。这样每一位客人享受的就是VIP服务咯,当然客人不会走,但是人力成本可是一个大头哦(亏死你)。
另外一种办法(方法C),就是改进点菜的方式,当客人到店后,自己申请一本菜单。想好自己要点的才后,就呼叫服务员。服务员站在自己身边后记录客人的菜单内容。将菜单递给厨师的过程也要进行改进,并不是每一份菜单记录好以后,都要交给后堂厨师。服务员可以记录号多份菜单后,同时交给厨师就行了。那么这种方式,对于老板来说人力成本是最低的;对于客人来说,虽然不再享受VIP服务并且要进行一定的等待,但是这些都是可接受的;对于服务员来说,基本上她的时间都没有浪费,基本上被老板压杆了最后一滴油水。
如果您是老板,您会采用哪种方式呢?
到店情况: 并发量。到店情况不理想时,一个服务员一本菜单,当然是足够了。所以不同的老板在不同的场合下,将会灵活选择服务员和菜单的配置。
菜单: 文件状态描述符。操作系统对于一个进程能够同时持有的文件状态描述符的个数是有限制的,在linux系统中$ulimit -n查看这个限制值,当然也是可以(并且应该)进行内核参数调整的。
服务员: 操作系统内核用于IO操作的线程(内核线程)
厨师: 应用程序线程(当然厨房就是应用程序进程咯)
餐单传递方式: 包括了阻塞式和非阻塞式两种。
方法B: 使用线程进行处理的 阻塞式/非阻塞式 同步IO
2. 典型的多路复用IO实现
目前流程的多路复用IO实现主要包括四种: select
、poll
、epoll
、kqueue
。下表是他们的一些重要特性的比较:
IO模型 相对性能 关键思路 操作系统 JAVA支持情况 支持,Reactor模式(反应器设计模式)。Linux操作系统的 kernels 2.4内核版本之前,默认使用select;而目前windows下对同步IO的支持,都是select模型
Linux下的JAVA NIO框架,Linux kernels 2.6内核版本之前使用poll进行支持。也是使用的Reactor模式
Linux kernels 2.6内核版本及以后使用epoll进行支持;Linux kernels 2.6内核版本之前使用poll进行支持;另外一定注意,由于Linux下没有Windows下的IOCP技术提供真正的 异步IO 支持,所以Linux下使用epoll模拟异步IO
多路复用IO技术最适用的是“高并发”场景,所谓高并发是指1毫秒内至少同时有上千个连接请求准备好。其他情况下多路复用IO技术发挥不出来它的优势。另一方面,使用JAVA NIO进行功能实现,相对于传统的Socket套接字实现要复杂一些,所以实际应用中,需要根据自己的业务需求进行技术选择。
select,poll,epoll都是IO多路复用的机制。I/O多路复用就是通过一种机制,一个进程可以监视多个描述符,一旦某个描述符就绪(一般是读就绪或者写就绪),能够通知程序进行相应的读写操作。但select,poll,epoll本质上都是同步I/O,因为他们都需要在读写事件就绪后自己负责进行读写,也就是说这个读写过程是阻塞的,而异步I/O则无需自己负责进行读写,异步I/O的实现会负责把数据从内核拷贝到用户空间。(这里啰嗦下)
2.1 select
复制 int select ( int n , fd_set * readfds , fd_set * writefds , fd_set * exceptfds , struct timeval * timeout) ;
select 函数监视的文件描述符分3类,分别是writefds
、readfds
、和exceptfds
。调用后select函数会阻塞,直到有描述副就绪(有数据 可读、可写、或者有except),或者超时(timeout指定等待时间,如果立即返回设为null即可),函数返回。当select函数返回后,可以通过遍历fdset,来找到就绪的描述符。
select目前几乎在所有的平台上支持,其良好跨平台支持也是它的一个优点。select的一 个缺点在于单个进程能够监视的文件描述符的数量存在最大限制 ,在Linux上一般为1024,可以通过修改宏定义甚至重新编译内核的方式提升这一限制,但是这样也会造成效率的降低。
2.2 poll
复制 int poll (struct pollfd * fds , unsigned int nfds , int timeout) ;
不同与select使用三个位图来表示三个fdset的方式,poll使用一个 pollfd的指针实现。
复制 struct pollfd {
int fd ; /* file descriptor */
short events ; /* requested events to watch */
short revents ; /* returned events witnessed */
};
pollfd结构包含了要监视的event和发生的event ,不再使用select“参数-值”传递的方式。同时,pollfd并没有最大数量限制(但是数量过大后性能也是会下降)。 和select函数一样,poll返回后,需要轮询pollfd来获取就绪的描述符。
从上面看,select和poll都需要在返回后,通过遍历文件描述符来获取已经就绪的socket 。事实上,同时连接的大量客户端在一时刻可能只有很少的处于就绪状态,因此随着监视的描述符数量的增长,其效率也会线性下降。
2.3 epoll
epoll是在2.6内核中提出的,是之前的select和poll的增强版本。相对于select和poll来说,epoll更加灵活,没有描述符限制。epoll使用一个文件描述符管理多个描述符,将用户关系的文件描述符的事件存放到内核的一个事件表中,这样在用户空间和内核空间的copy只需一次。
2.3.1 epoll操作过程
epoll操作过程需要三个接口,分别如下:
复制 int epoll_create( int size) ; //创建一个epoll的句柄,size用来告诉内核这个监听的数目一共有多大
int epoll_ctl( int epfd , int op , int fd , struct epoll_event * event) ;
int epoll_wait( int epfd , struct epoll_event * events , int maxevents , int timeout) ;
int epoll_create(int size); 创建一个epoll的句柄,size用来告诉内核这个监听的数目一共有多大,这个参数不同于select()中的第一个参数,给出最大监听的fd+1的值,参数size并不是限制了epoll所能监听的描述符最大个数,只是对内核初始分配内部数据结构的一个建议。 当创建好epoll句柄后,它就会占用一个fd值,在linux下如果查看/proc/进程id/fd/,是能够看到这个fd的,所以在使用完epoll后,必须调用close()关闭,否则可能导致fd被耗尽。
int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event); 函数是对指定描述符fd执行op操作。
epfd:是epoll_create()的返回值。
op:表示op操作,用三个宏来表示:添加EPOLL_CTL_ADD,删除EPOLL_CTL_DEL,修改EPOLL_CTL_MOD。分别添加、删除和修改对fd的监听事件。
epoll_event:是告诉内核需要监听什么事,struct epoll_event结构如下:
复制 struct epoll_event {
__uint32_t events; /* Epoll events */
epoll_data_t data; /* User data variable */
};
//events可以是以下几个宏的集合:
EPOLLIN :表示对应的文件描述符可以读(包括对端SOCKET正常关闭);
EPOLLOUT:表示对应的文件描述符可以写;
EPOLLPRI:表示对应的文件描述符有紧急的数据可读(这里应该表示有带外数据到来);
EPOLLERR:表示对应的文件描述符发生错误;
EPOLLHUP:表示对应的文件描述符被挂断;
EPOLLET: 将 EPOLL设为边缘触发( Edge Triggered) 模式,这是相对于水平触发( Level Triggered)来说的。
EPOLLONESHOT:只监听一次事件,当监听完这次事件之后,如果还需要继续监听这个socket的话,需要再次把这个socket加入到EPOLL队列里
int epoll_wait(int epfd, struct epoll_event * events, int maxevents, int timeout); 等待epfd上的io事件,最多返回maxevents个事件。 参数events用来从内核得到事件的集合,maxevents告之内核这个events有多大,这个maxevents的值不能大于创建epoll_create()时的size,参数timeout是超时时间(毫秒,0会立即返回,-1将不确定,也有说法说是永久阻塞)。该函数返回需要处理的事件数目,如返回0表示已超时。
2.3.2 工作模式
epoll对文件描述符的操作有两种模式:LT(level trigger)和ET(edge trigger)。LT模式是默认模式,LT模式与ET模式的区别如下:
LT模式:当epoll_wait检测到描述符事件发生并将此事件通知应用程序,应用程序可以不立即处理该事件。下次调用epoll_wait时,会再次响应应用程序并通知此事件。
LT(level triggered)是缺省的工作方式,并且同时支持block和no-block socket.在这种做法中,内核告诉你一个文件描述符是否就绪了,然后你可以对这个就绪的fd进行IO操作。如果你不作任何操作,内核还是会继续通知你的。
ET模式:当epoll_wait检测到描述符事件发生并将此事件通知应用程序,应用程序必须立即处理该事件。如果不处理,下次调用epoll_wait时,不会再次响应应用程序并通知此事件。
ET(edge-triggered)是高速工作方式,只支持no-block socket。在这种模式下,当描述符从未就绪变为就绪时,内核通过epoll告诉你。然后它会假设你知道文件描述符已经就绪,并且不会再为那个文件描述符发送更多的就绪通知,直到你做了某些操作导致那个文件描述符不再为就绪状态了(比如,你在发送,接收或者接收请求,或者发送接收的数据少于一定量时导致了一个EWOULDBLOCK 错误)。但是请注意,如果一直不对这个fd作IO操作(从而导致它再次变成未就绪),内核不会发送更多的通知(only once)
ET模式在很大程度上减少了epoll事件被重复触发的次数,因此效率要比LT模式高。epoll工作在ET模式的时候,必须使用非阻塞套接口,以避免由于一个文件句柄的阻塞读/阻塞写操作把处理多个文件描述符的任务饿死。
假如有这样一个例子:
我们已经把一个用来从管道中读取数据的文件句柄(RFD)添加到epoll描述符
调用epoll_wait(2),并且它会返回RFD,说明它已经准备好读取操作
LT模式: 如果是LT模式,那么在第5步调用epoll_wait(2)之后,仍然能受到通知。
ET模式: 如果我们在第1步将RFD添加到epoll描述符的时候使用了EPOLLET标志,那么在第5步调用epoll_wait(2)之后将有可能会挂起,因为剩余的数据还存在于文件的输入缓冲区内,而且数据发出端还在等待一个针对已经发出数据的反馈信息。只有在监视的文件句柄上发生了某个事件的时候 ET 工作模式才会汇报事件。因此在第5步的时候,调用者可能会放弃等待仍在存在于文件输入缓冲区内的剩余数据。
当使用epoll的ET模型来工作时,当产生了一个EPOLLIN事件后, 读数据的时候需要考虑的是当recv()返回的大小如果等于请求的大小,那么很有可能是缓冲区还有数据未读完,也意味着该次事件还没有处理完,所以还需要再次读取:
复制 while( rs ){
buflen = recv ( activeevents[i].data.fd, buf, sizeof ( buf ) , 0 );
if(buflen < 0 ){
// 由于是非阻塞的模式,所以当errno为EAGAIN时,表示当前缓冲区已无数据可读
// 在这里就当作是该次事件已处理处.
if( errno == EAGAIN ){
break ;
}
else{
return ;
}
}
else if( buflen == 0 ){
// 这里表示对端的socket已正常关闭.
}
if( buflen == sizeof ( buf ) {
rs = 1 ; // 需要再次读取
}
else{
rs = 0 ;
}
}
Linux中的EAGAIN含义:
Linux环境下开发经常会碰到很多错误(设置errno),其中EAGAIN是其中比较常见的一个错误(比如用在非阻塞操作中)。 从字面上来看,是提示再试一次。这个错误经常出现在当应用程序进行一些非阻塞(non-blocking)操作(对文件或socket)的时候。
例如,以 O_NONBLOCK的标志打开文件/socket/FIFO,如果你连续做read操作而没有数据可读。此时程序不会阻塞起来等待数据准备就绪返回,read函数会返回一个错误EAGAIN,提示你的应用程序现在没有数据可读请稍后再试。 又例如,当一个系统调用(比如fork)因为没有足够的资源(比如虚拟内存)而执行失败,返回EAGAIN提示其再调用一次(也许下次就能成功)。
2.3.3 代码演示
下面是一段不完整的代码且格式不对,意在表述上面的过程,去掉了一些模板代码。
复制 #define IPADDRESS "127.0.0.1"
#define PORT 8787
#define MAXSIZE 1024
#define LISTENQ 5
#define FDSIZE 1000
#define EPOLLEVENTS 100
listenfd = socket_bind (IPADDRESS , PORT);
struct epoll_event events[EPOLLEVENTS];
//创建一个描述符
epollfd = epoll_create (FDSIZE);
//添加监听描述符事件
add_event (epollfd , listenfd , EPOLLIN);
//循环等待
for ( ; ; ){
//该函数返回已经准备好的描述符事件数目
ret = epoll_wait(epollfd , events , EPOLLEVENTS , - 1 ) ;
//处理接收到的连接
handle_events(epollfd , events , ret , listenfd , buf) ;
}
//事件处理函数
static void handle_events ( int epollfd , struct epoll_event * events , int num , int listenfd , char * buf)
{
int i;
int fd;
//进行遍历;这里只要遍历已经准备好的io事件。num并不是当初epoll_create时的FDSIZE。
for (i = 0 ;i < num;i ++ )
{
fd = events[i] . data . fd;
//根据描述符的类型和事件类型进行处理
if ((fd == listenfd) && (events[i] . events & EPOLLIN))
handle_accpet(epollfd , listenfd) ;
else if (events[i] . events & EPOLLIN)
do_read(epollfd , fd , buf) ;
else if (events[i] . events & EPOLLOUT)
do_write(epollfd , fd , buf) ;
}
}
//添加事件
static void add_event ( int epollfd , int fd , int state){
struct epoll_event ev;
ev . events = state;
ev . data . fd = fd;
epoll_ctl(epollfd , EPOLL_CTL_ADD , fd , & ev) ;
}
//处理接收到的连接
static void handle_accpet ( int epollfd , int listenfd){
int clifd;
struct sockaddr_in cliaddr;
socklen_t cliaddrlen;
clifd = accept(listenfd , ( struct sockaddr * ) & cliaddr , & cliaddrlen) ;
if (clifd == - 1 )
perror( "accpet error:" ) ;
else {
printf("accept a new client: %s:%d\n",inet_ntoa(cliaddr.sin_addr),cliaddr.sin_port); //添加一个客户描述符和事件
add_event(epollfd , clifd , EPOLLIN) ;
}
}
//读处理
static void do_read ( int epollfd , int fd , char * buf){
int nread;
nread = read(fd , buf , MAXSIZE) ;
if (nread == - 1 ) {
perror( "read error:" ) ;
close(fd) ; //记住close fd
delete_event(epollfd , fd , EPOLLIN) ; //删除监听
}
else if (nread == 0 ) {
fprintf(stderr , "client close.\n" ) ;
close(fd) ; //记住close fd
delete_event(epollfd , fd , EPOLLIN) ; //删除监听
}
else {
printf( "read message is : %s " , buf) ;
//修改描述符对应的事件,由读改为写
modify_event(epollfd , fd , EPOLLOUT) ;
}
}
//写处理
static void do_write ( int epollfd , int fd , char * buf) {
int nwrite;
nwrite = write(fd , buf , strlen(buf)) ;
if (nwrite == - 1 ){
perror( "write error:" ) ;
close(fd) ; //记住close fd
delete_event(epollfd , fd , EPOLLOUT) ; //删除监听
} else {
modify_event(epollfd , fd , EPOLLIN) ;
}
memset(buf , 0 , MAXSIZE) ;
}
//删除事件
static void delete_event ( int epollfd , int fd , int state) {
struct epoll_event ev;
ev . events = state;
ev . data . fd = fd;
epoll_ctl(epollfd , EPOLL_CTL_DEL , fd , & ev) ;
}
//修改事件
static void modify_event ( int epollfd , int fd , int state){
struct epoll_event ev;
ev . events = state;
ev . data . fd = fd;
epoll_ctl(epollfd , EPOLL_CTL_MOD , fd , & ev) ;
}
//注:另外一端我就省了
2.3.4 epoll总结
在 select/poll中,进程只有在调用一定的方法后,内核才对所有监视的文件描述符进行扫描,而epoll事先通过epoll_ctl()来注册一 个文件描述符,一旦基于某个文件描述符就绪时,内核会采用类似callback的回调机制,迅速激活这个文件描述符,当进程调用epoll_wait() 时便得到通知。 (此处去掉了遍历文件描述符,而是通过监听回调的的机制。这正是epoll的魅力所在。)
epoll的优点主要是一下几个方面:
监视的描述符数量不受限制,它所支持的FD上限是最大可以打开文件的数目,这个数字一般远大于2048,举个例子,在1GB内存的机器上大约是10万左 右,具体数目可以cat /proc/sys/fs/file-max察看,一般来说这个数目和系统内存关系很大。
select的最大缺点就是进程打开的fd是有数量限制的。这对 于连接数量比较大的服务器来说根本不能满足。虽然也可以选择多进程的解决方案( Apache就是这样实现的),不过虽然linux上面创建进程的代价比较小,但仍旧是不可忽视的,加上进程间数据同步远比不上线程间同步的高效,所以也不是一种完美的方案。
IO的效率不会随着监视fd的数量的增长而下降。epoll不同于select和poll轮询的方式,而是通过每个fd定义的回调函数来实现的。只有就绪的fd才会执行回调函数。
如果没有大量的idle -connection或者dead-connection,epoll的效率并不会比select/poll高很多,但是当遇到大量的idle- connection,就会发现epoll的效率大大高于select/poll。
3. Reactor模型和Proactor模型
4. JAVA对多路复用IO的支持
4.1 重要概念: Channel
通道,被建立的一个应用程序和操作系统交互事件、传递内容的渠道(注意是连接到操作系统)。一个通道会有一个专属的文件状态描述符。那么既然是和操作系统进行内容的传递,那么说明应用程序可以通过通道读取数据,也可以通过通道向操作系统写数据。
JDK API中的Channel的描述是:
A channel represents an open connection to an entity such as a hardware device, a file, a network socket, or a program component that is capable of performing one or more distinct I/O operations, for example reading or writing.
A channel is either open or closed. A channel is open upon creation, and once closed it remains closed. Once a channel is closed, any attempt to invoke an I/O operation upon it will cause a ClosedChannelException to be thrown. Whether or not a channel is open may be tested by invoking its isOpen method.
JAVA NIO 框架中,自有的Channel通道包括:
所有被Selector(选择器)注册的通道,只能是继承了SelectableChannel类的子类。如上图所示
ServerSocketChannel: 应用服务器程序的监听通道。只有通过这个通道,应用程序才能向操作系统注册支持“多路复用IO”的端口监听。同时支持UDP协议和TCP协议。
ScoketChannel: TCP Socket套接字的监听通道,一个Socket套接字对应了一个客户端IP: 端口 到 服务器IP: 端口的通信连接。
DatagramChannel: UDP 数据报文的监听通道。
4.2 重要概念: Buffer
数据缓存区: 在JAVA NIO 框架中,为了保证每个通道的数据读写速度JAVA NIO 框架为每一种需要支持数据读写的通道集成了Buffer的支持。
这句话怎么理解呢? 例如ServerSocketChannel通道它只支持对OP_ACCEPT事件的监听,所以它是不能直接进行网络数据内容的读写的。所以ServerSocketChannel是没有集成Buffer的。
Buffer有两种工作模式: 写模式和读模式。在读模式下,应用程序只能从Buffer中读取数据,不能进行写操作。但是在写模式下,应用程序是可以进行读操作的,这就表示可能会出现脏读的情况。所以一旦您决定要从Buffer中读取数据,一定要将Buffer的状态改为读模式。
如下图:
position: 缓存区目前这在操作的数据块位置
limit: 缓存区最大可以进行操作的位置。缓存区的读写状态正式由这个属性控制的。
capacity: 缓存区的最大容量。这个容量是在缓存区创建时进行指定的。由于高并发时通道数量往往会很庞大,所以每一个缓存区的容量最好不要过大。
在下文JAVA NIO框架的代码实例中,我们将进行Buffer缓存区操作的演示。
4.3 重要概念: Selector
Selector的英文含义是“选择器”,不过根据我们详细介绍的Selector的岗位职责,您可以把它称之为“轮询代理器”、“事件订阅器”、“channel容器管理机”都行。
事件订阅和Channel管理
应用程序将向Selector对象注册需要它关注的Channel,以及具体的某一个Channel会对哪些IO事件感兴趣。Selector中也会维护一个“已经注册的Channel”的容器。以下代码来自WindowsSelectorImpl实现类中,对已经注册的Channel的管理容器:
复制 // Initial capacity of the poll array
private final int INIT_CAP = 8 ;
// Maximum number of sockets for select().
// Should be INIT_CAP times a power of 2
private final static int MAX_SELECTABLE_FDS = 1024 ;
// The list of SelectableChannels serviced by this Selector. Every mod
// MAX_SELECTABLE_FDS entry is bogus, to align this array with the poll
// array, where the corresponding entry is occupied by the wakeupSocket
private SelectionKeyImpl [] channelArray = new SelectionKeyImpl [INIT_CAP];
轮询代理
应用层不再通过阻塞模式或者非阻塞模式直接询问操作系统“事件有没有发生”,而是由Selector代其询问。
实现不同操作系统的支持
之前已经提到过,多路复用IO技术 是需要操作系统进行支持的,其特点就是操作系统可以同时扫描同一个端口上不同网络连接的事件。所以作为上层的JVM,必须要为 不同操作系统的多路复用IO实现 编写不同的代码。同样我使用的测试环境是Windows,它对应的实现类是sun.nio.ch.WindowsSelectorImpl:
4.4 JAVA NIO 框架简要设计分析
通过上文的描述,我们知道了多路复用IO技术是操作系统的内核实现。在不同的操作系统,甚至同一系列操作系统的版本中所实现的多路复用IO技术都是不一样的。那么作为跨平台的JAVA JVM来说如何适应多种多样的多路复用IO技术实现呢? 面向对象的威力就显现出来了: 无论使用哪种实现方式,他们都会有“选择器”、“通道”、“缓存”这几个操作要素,那么可以为不同的多路复用IO技术创建一个统一的抽象组,并且为不同的操作系统进行具体的实现。JAVA NIO中对各种多路复用IO的支持,主要的基础是java.nio.channels.spi.SelectorProvider抽象类,其中的几个主要抽象方法包括:
public abstract DatagramChannel openDatagramChannel()
: 创建和这个操作系统匹配的UDP 通道实现。
public abstract AbstractSelector openSelector()
: 创建和这个操作系统匹配的NIO选择器,就像上文所述,不同的操作系统,不同的版本所默认支持的NIO模型是不一样的。
public abstract ServerSocketChannel openServerSocketChannel()
: 创建和这个NIO模型匹配的服务器端通道。
public abstract SocketChannel openSocketChannel()
: 创建和这个NIO模型匹配的TCP Socket套接字通道(用来反映客户端的TCP连接)
由于JAVA NIO框架的整个设计是很大的,所以我们只能还原一部分我们关心的问题。这里我们以JAVA NIO框架中对于不同多路复用IO技术的选择器 进行实例化创建的方式作为例子,以点窥豹观全局:
很明显,不同的SelectorProvider实现对应了不同的 选择器。由具体的SelectorProvider实现进行创建。另外说明一下,实际上netty底层也是通过这个设计获得具体使用的NIO模型,我们后文讲解Netty时,会讲到这个问题。以下代码是Netty 4.0中NioServerSocketChannel进行实例化时的核心代码片段:
复制 private static ServerSocketChannel newSocket( SelectorProvider provider) {
try {
/**
* Use the {@link SelectorProvider} to open {@link SocketChannel} and so remove condition in
* {@link SelectorProvider#provider()} which is called by each ServerSocketChannel.open() otherwise.
*
* See <a href="See https://github.com/netty/netty/issues/2308">#2308</a>.
*/
return provider . openServerSocketChannel ();
} catch ( IOException e) {
throw new ChannelException(
"Failed to open a server socket." , e) ;
}
}
4.5 JAVA实例
下面,我们使用JAVA NIO框架,实现一个支持多路复用IO的服务器端(实际上客户端是否使用多路复用IO技术,对整个系统架构的性能提升相关性不大):
复制 package testNSocket ;
import java . net . InetSocketAddress ;
import java . net . ServerSocket ;
import java . net . URLDecoder ;
import java . net . URLEncoder ;
import java . nio . ByteBuffer ;
import java . nio . channels . SelectableChannel ;
import java . nio . channels . SelectionKey ;
import java . nio . channels . Selector ;
import java . nio . channels . ServerSocketChannel ;
import java . nio . channels . SocketChannel ;
import java . util . Iterator ;
import org . apache . commons . logging . Log ;
import org . apache . commons . logging . LogFactory ;
import org . apache . log4j . BasicConfigurator ;
public class SocketServer1 {
static {
BasicConfigurator . configure ();
}
/**
* 日志
*/
private static final Log LOGGER = LogFactory . getLog ( SocketServer1 . class );
public static void main ( String [] args) throws Exception {
ServerSocketChannel serverChannel = ServerSocketChannel . open ();
serverChannel . configureBlocking ( false );
ServerSocket serverSocket = serverChannel . socket ();
serverSocket . setReuseAddress ( true );
serverSocket . bind ( new InetSocketAddress( 83 ) );
Selector selector = Selector . open ();
//注意、服务器通道只能注册SelectionKey.OP_ACCEPT事件
serverChannel . register (selector , SelectionKey . OP_ACCEPT );
try {
while ( true ) {
//如果条件成立,说明本次询问selector,并没有获取到任何准备好的、感兴趣的事件
//java程序对多路复用IO的支持也包括了阻塞模式 和非阻塞模式两种。
if ( selector . select ( 100 ) == 0 ) {
//================================================
// 这里视业务情况,可以做一些然并卵的事情
//================================================
continue ;
}
//这里就是本次询问操作系统,所获取到的“所关心的事件”的事件类型(每一个通道都是独立的)
Iterator < SelectionKey > selecionKeys = selector . selectedKeys () . iterator ();
while ( selecionKeys . hasNext ()) {
SelectionKey readyKey = selecionKeys . next ();
//这个已经处理的readyKey一定要移除。如果不移除,就会一直存在在selector.selectedKeys集合中
//待到下一次selector.select() > 0时,这个readyKey又会被处理一次
selecionKeys . remove ();
SelectableChannel selectableChannel = readyKey . channel ();
if ( readyKey . isValid () && readyKey . isAcceptable ()) {
SocketServer1 . LOGGER . info ( "======channel通道已经准备好=======" );
/*
* 当server socket channel通道已经准备好,就可以从server socket channel中获取socketchannel了
* 拿到socket channel后,要做的事情就是马上到selector注册这个socket channel感兴趣的事情。
* 否则无法监听到这个socket channel到达的数据
* */
ServerSocketChannel serverSocketChannel = (ServerSocketChannel)selectableChannel;
SocketChannel socketChannel = serverSocketChannel . accept ();
registerSocketChannel(socketChannel , selector) ;
} else if ( readyKey . isValid () && readyKey . isConnectable ()) {
SocketServer1 . LOGGER . info ( "======socket channel 建立连接=======" );
} else if ( readyKey . isValid () && readyKey . isReadable ()) {
SocketServer1 . LOGGER . info ( "======socket channel 数据准备完成,可以去读==读取=======" );
readSocketChannel(readyKey) ;
}
}
}
} catch ( Exception e) {
SocketServer1 . LOGGER . error ( e . getMessage () , e);
} finally {
serverSocket . close ();
}
}
/**
* 在server socket channel接收到/准备好 一个新的 TCP连接后。
* 就会向程序返回一个新的socketChannel。<br>
* 但是这个新的socket channel并没有在selector“选择器/代理器”中注册,
* 所以程序还没法通过selector通知这个socket channel的事件。
* 于是我们拿到新的socket channel后,要做的第一个事情就是到selector“选择器/代理器”中注册这个
* socket channel感兴趣的事件
* @param socketChannel 新的socket channel
* @param selector selector“选择器/代理器”
* @throws Exception
*/
private static void registerSocketChannel ( SocketChannel socketChannel , Selector selector) throws Exception {
socketChannel . configureBlocking ( false );
//socket通道可以且只可以注册三种事件SelectionKey.OP_READ | SelectionKey.OP_WRITE | SelectionKey.OP_CONNECT
socketChannel . register (selector , SelectionKey . OP_READ , ByteBuffer . allocate ( 2048 ));
}
/**
* 这个方法用于读取从客户端传来的信息。
* 并且观察从客户端过来的socket channel在经过多次传输后,是否完成传输。
* 如果传输完成,则返回一个true的标记。
* @param socketChannel
* @throws Exception
*/
private static void readSocketChannel ( SelectionKey readyKey) throws Exception {
SocketChannel clientSocketChannel = (SocketChannel) readyKey . channel ();
//获取客户端使用的端口
InetSocketAddress sourceSocketAddress = (InetSocketAddress) clientSocketChannel . getRemoteAddress ();
Integer resoucePort = sourceSocketAddress . getPort ();
//拿到这个socket channel使用的缓存区,准备读取数据
//在后文,将详细讲解缓存区的用法概念,实际上重要的就是三个元素capacity,position和limit。
ByteBuffer contextBytes = (ByteBuffer) readyKey . attachment ();
//将通道的数据写入到缓存区,注意是写入到缓存区。
//由于之前设置了ByteBuffer的大小为2048 byte,所以可以存在写入不完的情况
//没关系,我们后面来调整代码。这里我们暂时理解为一次接受可以完成
int realLen = - 1 ;
try {
realLen = clientSocketChannel . read (contextBytes);
} catch ( Exception e) {
//这里抛出了异常,一般就是客户端因为某种原因终止了。所以关闭channel就行了
SocketServer1 . LOGGER . error ( e . getMessage ());
clientSocketChannel . close ();
return ;
}
//如果缓存区中没有任何数据(但实际上这个不太可能,否则就不会触发OP_READ事件了)
if (realLen == - 1 ) {
SocketServer1 . LOGGER . warn ( "====缓存区没有数据? ====" );
return ;
}
//将缓存区从写状态切换为读状态(实际上这个方法是读写模式互切换)。
//这是java nio框架中的这个socket channel的写请求将全部等待。
contextBytes . flip ();
//注意中文乱码的问题,我个人喜好是使用URLDecoder/URLEncoder,进行解编码。
//当然java nio框架本身也提供编解码方式,看个人咯
byte [] messageBytes = contextBytes . array ();
String messageEncode = new String(messageBytes , "UTF-8" ) ;
String message = URLDecoder . decode (messageEncode , "UTF-8" );
//如果收到了“over”关键字,才会清空buffer,并回发数据;
//否则不清空缓存,还要还原buffer的“写状态”
if ( message . indexOf ( "over" ) != - 1 ) {
//清空已经读取的缓存,并从新切换为写状态(这里要注意clear()和capacity()两个方法的区别)
contextBytes . clear ();
SocketServer1 . LOGGER . info ( "端口:" + resoucePort + "客户端发来的信息======message : " + message);
//======================================================
// 当然接受完成后,可以在这里正式处理业务了
//======================================================
//回发数据,并关闭channel
ByteBuffer sendBuffer = ByteBuffer . wrap ( URLEncoder . encode ( "回发处理结果" , "UTF-8" ) . getBytes ());
clientSocketChannel . write (sendBuffer);
clientSocketChannel . close ();
} else {
SocketServer1 . LOGGER . info ( "端口:" + resoucePort + "客户端信息还未接受完,继续接受======message : " + message);
//这是,limit和capacity的值一致,position的位置是realLen的位置
contextBytes . position (realLen);
contextBytes . limit ( contextBytes . capacity ());
}
}
}
代码中的注释是比较清楚的,但是还是要对几个关键点进行一下讲解:
serverChannel.register(Selector sel, int ops, Object att)
: 实际上register(Selector sel, int ops, Object att)
方法是ServerSocketChannel
类的父类AbstractSelectableChannel
提供的一个方法,表示只要继承了AbstractSelectableChannel
类的子类都可以注册到选择器中。通过观察整个AbstractSelectableChannel
继承关系,下图中的这些类可以被注册到选择器中:
SelectionKey.OP_ACCEPT
: 不同的Channel对象可以注册的“我关心的事件”是不一样的。例如ServerSocketChannel除了能够被允许关注OP_ACCEPT事件外,不允许再关心其他事件了(否则运行时会抛出异常)。以下梳理了常使用的AbstractSelectableChannel子类可以注册的事件列表:
通道类 通道作用 可关注的事件 SelectionKey.OP_READ、SelectionKey.OP_WRITE
SelectionKey.OP_READ、SelectionKey.OP_WRITE、SelectionKey.OP_CONNECT
实际上通过每一个AbstractSelectableChannel子类所实现的public final int validOps()方法,就可以查看这个通道“可以关心的IO事件”。
selector.selectedKeys().iterator()
: 当选择器Selector收到操作系统的IO操作事件后,它的selectedKeys将在下一次轮询操作中,收到这些事件的关键描述字(不同的channel,就算关键字一样,也会存储成两个对象)。但是每一个“事件关键字”被处理后都必须移除,否则下一次轮询时,这个事件会被重复处理。
Returns this selector’s selected-key set. Keys may be removed from, but not directly added to, the selected-key set. Any attempt to add an object to the key set will cause an UnsupportedOperationException to be thrown. The selected-key set is not thread-safe.
4.6 JAVA实例改进
上面的代码中,我们为了讲解selector的使用,在缓存使用上就进行了简化。实际的应用中,为了节约内存资源,我们一般不会为一个通道分配那么多的缓存空间。下面的代码我们主要对其中的缓存操作进行了优化:
复制 package testNSocket ;
import java . net . InetSocketAddress ;
import java . net . ServerSocket ;
import java . net . URLDecoder ;
import java . net . URLEncoder ;
import java . nio . ByteBuffer ;
import java . nio . channels . SelectableChannel ;
import java . nio . channels . SelectionKey ;
import java . nio . channels . Selector ;
import java . nio . channels . ServerSocketChannel ;
import java . nio . channels . SocketChannel ;
import java . util . Iterator ;
import java . util . concurrent . ConcurrentHashMap ;
import java . util . concurrent . ConcurrentMap ;
import org . apache . commons . logging . Log ;
import org . apache . commons . logging . LogFactory ;
import org . apache . log4j . BasicConfigurator ;
public class SocketServer2 {
static {
BasicConfigurator . configure ();
}
/**
* 日志
*/
private static final Log LOGGER = LogFactory . getLog ( SocketServer2 . class );
/**
* 改进的java nio server的代码中,由于buffer的大小设置的比较小。
* 我们不再把一个client通过socket channel多次传给服务器的信息保存在beff中了(因为根本存不下)<br>
* 我们使用socketchanel的hashcode作为key(当然您也可以自己确定一个id),信息的stringbuffer作为value,存储到服务器端的一个内存区域MESSAGEHASHCONTEXT。
*
* 如果您不清楚ConcurrentHashMap的作用和工作原理,请自行百度/Google
*/
private static final ConcurrentMap<Integer, StringBuffer> MESSAGEHASHCONTEXT = new ConcurrentHashMap<Integer , StringBuffer>();
public static void main ( String [] args) throws Exception {
ServerSocketChannel serverChannel = ServerSocketChannel . open ();
serverChannel . configureBlocking ( false );
ServerSocket serverSocket = serverChannel . socket ();
serverSocket . setReuseAddress ( true );
serverSocket . bind ( new InetSocketAddress( 83 ) );
Selector selector = Selector . open ();
//注意、服务器通道只能注册SelectionKey.OP_ACCEPT事件
serverChannel . register (selector , SelectionKey . OP_ACCEPT );
try {
while ( true ) {
//如果条件成立,说明本次询问selector,并没有获取到任何准备好的、感兴趣的事件
//java程序对多路复用IO的支持也包括了阻塞模式 和非阻塞模式两种。
if ( selector . select ( 100 ) == 0 ) {
//================================================
// 这里视业务情况,可以做一些然并卵的事情
//================================================
continue ;
}
//这里就是本次询问操作系统,所获取到的“所关心的事件”的事件类型(每一个通道都是独立的)
Iterator < SelectionKey > selecionKeys = selector . selectedKeys () . iterator ();
while ( selecionKeys . hasNext ()) {
SelectionKey readyKey = selecionKeys . next ();
//这个已经处理的readyKey一定要移除。如果不移除,就会一直存在在selector.selectedKeys集合中
//待到下一次selector.select() > 0时,这个readyKey又会被处理一次
selecionKeys . remove ();
SelectableChannel selectableChannel = readyKey . channel ();
if ( readyKey . isValid () && readyKey . isAcceptable ()) {
SocketServer2 . LOGGER . info ( "======channel通道已经准备好=======" );
/*
* 当server socket channel通道已经准备好,就可以从server socket channel中获取socketchannel了
* 拿到socket channel后,要做的事情就是马上到selector注册这个socket channel感兴趣的事情。
* 否则无法监听到这个socket channel到达的数据
* */
ServerSocketChannel serverSocketChannel = (ServerSocketChannel)selectableChannel;
SocketChannel socketChannel = serverSocketChannel . accept ();
registerSocketChannel(socketChannel , selector) ;
} else if ( readyKey . isValid () && readyKey . isConnectable ()) {
SocketServer2 . LOGGER . info ( "======socket channel 建立连接=======" );
} else if ( readyKey . isValid () && readyKey . isReadable ()) {
SocketServer2 . LOGGER . info ( "======socket channel 数据准备完成,可以去读==读取=======" );
readSocketChannel(readyKey) ;
}
}
}
} catch ( Exception e) {
SocketServer2 . LOGGER . error ( e . getMessage () , e);
} finally {
serverSocket . close ();
}
}
/**
* 在server socket channel接收到/准备好 一个新的 TCP连接后。
* 就会向程序返回一个新的socketChannel。<br>
* 但是这个新的socket channel并没有在selector“选择器/代理器”中注册,
* 所以程序还没法通过selector通知这个socket channel的事件。
* 于是我们拿到新的socket channel后,要做的第一个事情就是到selector“选择器/代理器”中注册这个
* socket channel感兴趣的事件
* @param socketChannel 新的socket channel
* @param selector selector“选择器/代理器”
* @throws Exception
*/
private static void registerSocketChannel ( SocketChannel socketChannel , Selector selector) throws Exception {
socketChannel . configureBlocking ( false );
//socket通道可以且只可以注册三种事件SelectionKey.OP_READ | SelectionKey.OP_WRITE | SelectionKey.OP_CONNECT
//最后一个参数视为 为这个socketchanne分配的缓存区
socketChannel . register (selector , SelectionKey . OP_READ , ByteBuffer . allocate ( 50 ));
}
/**
* 这个方法用于读取从客户端传来的信息。
* 并且观察从客户端过来的socket channel在经过多次传输后,是否完成传输。
* 如果传输完成,则返回一个true的标记。
* @param socketChannel
* @throws Exception
*/
private static void readSocketChannel ( SelectionKey readyKey) throws Exception {
SocketChannel clientSocketChannel = (SocketChannel) readyKey . channel ();
//获取客户端使用的端口
InetSocketAddress sourceSocketAddress = (InetSocketAddress) clientSocketChannel . getRemoteAddress ();
Integer resoucePort = sourceSocketAddress . getPort ();
//拿到这个socket channel使用的缓存区,准备读取数据
//在后文,将详细讲解缓存区的用法概念,实际上重要的就是三个元素capacity,position和limit。
ByteBuffer contextBytes = (ByteBuffer) readyKey . attachment ();
//将通道的数据写入到缓存区,注意是写入到缓存区。
//这次,为了演示buff的使用方式,我们故意缩小了buff的容量大小到50byte,
//以便演示channel对buff的多次读写操作
int realLen = 0 ;
StringBuffer message = new StringBuffer() ;
//这句话的意思是,将目前通道中的数据写入到缓存区
//最大可写入的数据量就是buff的容量
while ((realLen = clientSocketChannel . read (contextBytes)) != 0 ) {
//一定要把buffer切换成“读”模式,否则由于limit = capacity
//在read没有写满的情况下,就会导致多读
contextBytes . flip ();
int position = contextBytes . position ();
int capacity = contextBytes . capacity ();
byte [] messageBytes = new byte [capacity];
contextBytes . get (messageBytes , position , realLen);
//这种方式也是可以读取数据的,而且不用关心position的位置。
//因为是目前contextBytes所有的数据全部转出为一个byte数组。
//使用这种方式时,一定要自己控制好读取的最终位置(realLen很重要)
//byte[] messageBytes = contextBytes.array();
//注意中文乱码的问题,我个人喜好是使用URLDecoder/URLEncoder,进行解编码。
//当然java nio框架本身也提供编解码方式,看个人咯
String messageEncode = new String(messageBytes , 0 , realLen , "UTF-8" ) ;
message . append (messageEncode);
//再切换成“写”模式,直接情况缓存的方式,最快捷
contextBytes . clear ();
}
//如果发现本次接收的信息中有over关键字,说明信息接收完了
if ( URLDecoder . decode ( message . toString () , "UTF-8" ) . indexOf ( "over" ) != - 1 ) {
//则从messageHashContext中,取出之前已经收到的信息,组合成完整的信息
Integer channelUUID = clientSocketChannel . hashCode ();
SocketServer2 . LOGGER . info ( "端口:" + resoucePort + "客户端发来的信息======message : " + message);
StringBuffer completeMessage;
//清空MESSAGEHASHCONTEXT中的历史记录
StringBuffer historyMessage = MESSAGEHASHCONTEXT . remove (channelUUID);
if (historyMessage == null ) {
completeMessage = message;
} else {
completeMessage = historyMessage . append (message);
}
SocketServer2.LOGGER.info("端口:" + resoucePort + "客户端发来的完整信息======completeMessage : " + URLDecoder.decode(completeMessage.toString(), "UTF-8"));
//======================================================
// 当然接受完成后,可以在这里正式处理业务了
//======================================================
//回发数据,并关闭channel
ByteBuffer sendBuffer = ByteBuffer . wrap ( URLEncoder . encode ( "回发处理结果" , "UTF-8" ) . getBytes ());
clientSocketChannel . write (sendBuffer);
clientSocketChannel . close ();
} else {
//如果没有发现有“over”关键字,说明还没有接受完,则将本次接受到的信息存入messageHashContext
SocketServer2.LOGGER.info("端口:" + resoucePort + "客户端信息还未接受完,继续接受======message : " + URLDecoder.decode(message.toString(), "UTF-8"));
//每一个channel对象都是独立的,所以可以使用对象的hash值,作为唯一标示
Integer channelUUID = clientSocketChannel . hashCode ();
//然后获取这个channel下以前已经达到的message信息
StringBuffer historyMessage = MESSAGEHASHCONTEXT . get (channelUUID);
if (historyMessage == null ) {
historyMessage = new StringBuffer() ;
MESSAGEHASHCONTEXT . put (channelUUID , historyMessage . append (message));
}
}
}
}
以上代码应该没有过多需要讲解的了。当然,您还是可以加入线程池技术,进行具体的业务处理。注意,一定是线程池,因为这样可以保证线程规模的可控性。
5. 多路复用IO的优缺点
不用再使用多线程来进行IO处理了(包括操作系统内核IO管理模块和应用程序进程而言)。当然实际业务的处理中,应用程序进程还是可以引入线程池技术的
同一个端口可以处理多种协议,例如,使用ServerSocketChannel测测的服务器端口监听,既可以处理TCP协议又可以处理UDP协议。
操作系统级别的优化: 多路复用IO技术可以是操作系统级别在一个端口上能够同时接受多个客户端的IO事件。同时具有之前我们讲到的阻塞式同步IO和非阻塞式同步IO的所有特点。Selector的一部分作用更相当于“轮询代理器”。
都是同步IO: 目前我们介绍的 阻塞式IO、非阻塞式IO甚至包括多路复用IO,这些都是基于操作系统级别对“同步IO”的实现。我们一直在说“同步IO”,一直都没有详细说,什么叫做“同步IO”。实际上一句话就可以说清楚: 只有上层(包括上层的某种代理机制)系统询问我是否有某个事件发生了,否则我不会主动告诉上层系统事件发生了:
6. 参考文章
文章主要来源于: 银文杰,笔名“说好不能打脸”,博客地址 。他的书《高性能服务系统构建与实战》。
https://blog.csdn.net/yinwenjie/article/details/48522403