poll
select, poll, epoll是IO多路复用当中的重要的三种实现方式,poll和epoll相对于select而言,只能在Linux下使用,但是select是跨平台的。同时poll相对于select而言,没有最大监听数量的限制。但是也是监管一系列的文件描述符,阻塞的去轮询看这些文件描述符是否可读/可写/异常,再去调用io函数读写。
但是select和poll都存在一个很大的“缺点”,当存在大量的连接且其中绝大部分的连接都是活跃的时候,那么poll/select的效率实际上是很低下的,这也是由于二者的轮询机制实现,如果存在大量的活跃连接,轮询的效率会十分的低下,那么此时和传统的多线程/多进程实现模型本质上也没有区别,甚至于效率低于多进程/多线程模型。
和select监听采用fd_set位数组不同,poll监听采用的是pollfd事件结构体数组,也就是先定义一个事件结构体数组,然后在事件结构体数组中,设定好要监听事件的一个文件描述符,及要监听的事件等等信息。也就是每一个监听的事件(文件描述符)都是对应的一个初始化的结构体
// pollfd结构体的原型为:
struct pollfd {
int fd; /* 需要关注/监听的文件描述符 */
short events; /* 注册的事件,由用户提供,例如POLLIN表示监听的是读就绪事件 */
short revents; /* 实际发生的事件,由内核填充,在之后的操作可以直接获取这个文件描述符的就绪状态 */
};
常见的事件注册如下:
使用的最多的就是POLLIN
,POLLOUT
,POLLERR
三种状态,可以类比于select当中传入的readfds
,writefds
,exceptfds
三个监听事件。
poll的函数句柄为int poll(struct pollfd *fds, nfds_t nfds, int timeout);
,其中fds参数为数组的0元素指针,nfds为数组的大小,实际使用的时候可以随便设置(因为poll没有监听大小的限制),timeout为设置超时时间。同时相比于select的三个返回列表(可读,可写,错误),poll返回值为int类型的数值,表示的含义为:
(1)返回值小于0,表示出错
(2)返回值等于0,表示poll函数等待超时
(3)返回值大于0,表示poll由于监听的文件描述符就绪返回,
并且返回结果就是就绪的文件描述符的个数。
poll函数使用前面提到的pollfd结构体中的revents参数,
revents变量在每一次poll函数调用完成后
内核设置会设置revents的值, 这个值其实也就是上面列出来的那些events的宏(例如POLLIN等),
以说明对该描述符发生了什么事件
比如 调用完poll函数后要查看某一个文件描述符是否处于激活状态(比如可读)
是通过调用pollfd参数的revents参数与POLLIN做比较如果相等,
则说明该文件描述符处现在是可读的
使用if语句:if(poll_fd.revents==POLLIN)
同时就绪情况可以细分为读就绪,写就绪,异常就绪,具体而言:
- 读就绪:
- 对于socket而言,如果socket缓冲区的字节数大于等于标记位的时候,此时可以无阻塞的读取文件描述符,且poll返回值大于0
- TCP通信中如果对方关闭连接,那么对这个socket读的时候,会返回0
- 监听的socket有新的连接,也属于读就绪
- socket上有未处理的错误
- 写就绪
- socket缓冲区累计的字节数大于指定的标记就可以无阻塞的写,并且返回值大于0
- socket的写操作被关闭(close或shutdown),会触发SIGPIPE信号,
- socket使用非阻塞connect连接成功或者失败之后
- socket上有未处理的错误
- 异常就绪
- socket上收到额外的带外数据
poll的实现
服务端的实现
import select
import socket
server = socket.socket()
server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
server.setblocking(False)
server.bind(('0.0.0.0', 18888))
server.listen()
poll = select.poll() # 初始化poll
poll.register(server.fileno(), select.POLLIN) # 将serverz注册到poll当中,追踪的是文件描述符
# 关注的事件为是否具有可写事件的发生,采用register的方式修改链表,而不是以select的数组方式进行传递
connections = {}
while True:
for fd, event in poll.poll(3600): # timeout设置为3600s,对注册的时间进行轮询
# 根据event的状态进行读写错误处理分离
if event & select.POLLIN:
# 此时是可读事件
if fd == server.fileno(): # 有新的连接到来
conn, addr = server.accept()
poll.register(conn.fileno(), select.POLLIN)
connections[conn.fileno()] = conn
else:
# 旧的连接具有可读事件
conn = connections[fd] # 通过文件描述符找到对应的socket连接进行数据传输
try:
data = conn.recv(2048) # 获取数据,接收请求
except Exception as ex:
poll.unregister(fd)
conn=connections[fd]
connections.pop(fd)
conn.close()
else:
if data:
poll.register(fd, select.POLLOUT) # 重复注册
# 使用modify修改注册的事件类型
# poll.modify(fd, select.POLLOUT)
elif event & select.POLLOUT: # 具有可写事件
conn=connections[fd]
conn.sendall(bytes('i have received', encoding='utf-8'))
# 变换事件的监听状态,发送完之后变为可读监听
poll.modify(fd, select.POLLIN)
elif event & select.POLLERR or event & select.POLLHUP : # 连接发生错误了
poll.unregister(fd) # 停止监听
conn=connections[fd]
del connections[fd] # 删除记录
conn.close() # 关闭此次连接
客户端基于多线程的实现:
import socket
import random
import time
import threading
class SocketThread(threading.Thread):
def __init__(self, name):
super().__init__(name=name)
self.socket_name = name
def run(self) -> None:
obj = socket.socket()
obj.connect(('*.*.*.*', 18888))
while True:
inp = str(random.randint(0, 10000))
print('thread number ', self.name, inp)
try:
obj.sendall(bytes(inp, encoding='utf-8'))
ret = str(obj.recv(2048), encoding='utf-8')
except:
break
else:
print(ret)
# time.sleep(0.1)
obj.close()
ths=[SocketThread('{0}'.format(i)) for i in range(50)]
for t in ths:
t.start()
for t in ths:
t.join()