题外话:服务器单机理论最大能连接多少个客户端?
答案是:对于IPV4而言,粗略估计有2^48
个连接。计算机标识一个唯一的socket连接依赖的是唯一四元组-(源ip,源端口,本机ip,本机端口),其中本机的ip和socket初始化的端口是不能变的,因此源ip和源端口都是可变的。那么本机的一个进程,理论上可以连接2^32*2^16=2^48
个连接,32表示IPV4的地址数,2^16
表示本机端口号的数目(65535)。
当然,理论值只是理论值。限制连接数的不只是ip和端口两个因素,还和内核的各种限制也有关系,每一个socket在内核中都是一个文件句柄的存在,并且Linux下单个进程的可用句柄数量为1024个,可以通过修改系统配置来增大这个数目。
多进程模型
原始的IO多路复用就是多进程模型,为每一个socket连接分配一个进程来处理。主进程负责监听accept(),每次有新的连接就开辟一个子进程来处理。由此可见,主进程和子进程承担了不同的角色。
这种朴素的复用方式在并发量小的时候还可以应对,当并发量大了之后,这种方法不足以支撑起高并发的场景。进程的上下文切换不仅包含了虚拟内存、栈、全局变量等用户空间的资源,还包括了内核堆栈、寄存器等内核空间的资源。频繁的切换会极大地浪费资源。
多线程模型
多线程模型相较于多进程而言,节省了进程切换所需要的资源。
如果每来一个连接就创建一个线程,线程运行完后,还得操作系统还得销毁线程,虽说线程切换的上写文开销不大,但是如果频繁创建和销毁线程,系统开销也是不小的。
那么,我们可以使用线程池的方式来避免线程的频繁创建和销毁,所谓的线程池,就是提前创建若干个线程,这样当由新连接建立时,将这个已连接的 Socket 放入到一个队列里,然后线程池里的线程负责从队列中取出已连接 Socket 进程处理。
需要注意的是,保存已连接的socket队列是全局的,每个线程都可以操作,为了避免多线程竞争,线程在操作这个队列前要加锁。
select
select 实现多路复用的方式是,将已连接的 Socket 都放到一个集合文件描述符集(fd_set),然后调用 select 函数将文件描述符集合拷贝(第一次拷贝)到内核里,让内核来检查是否有网络事件产生,内核检查的方式很简单粗暴,就是通过遍历文件(第一次遍历)描述符集合的方式,当检查到有事件产生后,将此 Socket 标记为可读或可写,接着再把整个文件描述符集合拷贝(第二次拷贝)回用户态里,然后用户态还需要再通过遍历(第二次遍历)的方法找到可读或可写的Socket,然后再对其处理。
通过上面的分析可以看出,对于select而言,需要两次拷贝以及对文件描述符集的遍历。同时,select有最大监听事件数量的限制,最大为1024。
以下为select实现读写分离的服务器代码和客户端代码。
# -*- coding: utf-8 -*-
# @Time : 2021/10/24 20:04
# @Author : Chuqiao Yi
# @File : select_server.py
# @Software: PyCharm
"""
select application on server
"""
import select
import socket
server = socket.socket(socket.AF_INET, socket.SOCK_STREAM) # TCP
# server.setblocking(flag=False) # None block
server_addr = ('0.0.0.0', 18888) # ip-port
print('TCP listening on ', server_addr)
server.bind(server_addr)
server.listen()
inputs = [server, ] # ready to read
outputs = [] # ready to write
message_queues = {} # socket: Queue
while inputs:
print('Waiting for next event')
readable, writable, exceptional = select.select(inputs, outputs, inputs, 10)
# select.select(rlist, wlist, xlist[, timeout]),
# select句柄中,rlist,wlist,xlist都需要是可携带对象,其中的每一个可以是整数(文件描述符),
# 或是python的文件对象,或者是socket对象。亦或是自己定义的类,只要有合适的fileno()方法返回一个真正的文件描述符
print('正在监听的socket对象为', len(inputs))
print(readable)
for serv_or_conn in readable:
if serv_or_conn == server:
# 新的连接到来
conn, addr = serv_or_conn.accept()
print('new user ', addr)
inputs.append(conn)
message_queues[conn] = []
else:
# 老用户进行连接
try:
data_bytes = serv_or_conn.recv(2048)
except Exception as ex:
inputs.remove(serv_or_conn)
else:
data_str = str(data_bytes, encoding='utf-8')
message_queues[serv_or_conn].append(data_str)
outputs.append(serv_or_conn)
for conn in writable:
recv_str = message_queues[conn][0]
del message_queues[conn][0]
conn.sendall(bytes(recv_str + ' I have received', encoding='utf-8'))
outputs.remove(conn)
for ec in exceptional:
inputs.remove(ec) # 移除所有的出错连接
# client code
import socket
import random
import time
import threading
class SocketThread(threading.Thread):
def __init__(self, name):
super().__init__(name=name)
self.socket_name = name
def run(self) -> None:
obj = socket.socket()
obj.connect(('ip address', 18888))
while True:
inp = str(random.randint(0, 10000))
print('thread number ', self.name, inp)
try:
obj.sendall(bytes(inp, encoding='utf-8'))
ret = str(obj.recv(2048), encoding='utf-8')
except:
break
else:
print(ret)
time.sleep(1)
obj.close()
ths=[SocketThread(i) for i in range(5)]
for t in ths:
t.start()
for t in ths:
t.join()