本文共 3350 字,大约阅读时间需要 11 分钟。
I/O多路复用(I/O multiplexing),单个进程,就可以实现并发处理多个网络连接。目前有三种实现方式:select(全平台)、poll(linux)、epoll(linux)。前两者的基本原理是,轮询其监听的所有socket,一旦有数据到达,就将目标soket取出,就通知用户处理。epoll的实现略有不同,它为每个socket绑定一个回调函数,一旦socket有数据到达,就会触发回调函数执行。推荐使用epoll,其效率和并发能力是最好的,
下面我们看一下用select模块实现并发处理tcp连接:
服务端:import socket, selectser = socket.socket()ser.bind(('0.0.0.0',8080))ser.listen(5)# ser.setblocking(False) 设置套接字I/O阻塞和非阻塞,最终效果是一样的,因为整个程序被select阻塞着# 一旦select监听到套接字ser有数据到达,就会取出ser,执行其方法accept(),执行时数据已经到达了,已经是非阻塞的了。inputs = [ser,] # 把服务端套接字放入监听列表print('waiting for connection...')while True: i, o, e = select.select(inputs,[],[]) # select可以监听多种对象([inputs],[outputs],[errors],timeout),对应括号内的参数。 # 轮询inputs内的监听对象,一旦该有数据到达,取出该对象放到i里面,开始下面的处理 for obj in i: if obj == ser: # 服务端socket conn, addr = ser.accept() print('got a connection: ',addr) inputs.append(conn) # 将客户端连接socket加入监听列表 else: # conn try: msg = obj.recv(1024) if not msg: print('客户端正常断开') inputs.remove(obj) continue print(msg.decode('utf-8')) back_msg = input('anser client%s >>'%i.index(obj)) obj.send(back_msg.encode('utf-8')) except Exception: print('客户端异常断开') inputs.remove(obj)
客户端:
import socketcli = socket.socket(socket.AF_INET, socket.SOCK_STREAM)cli.connect_ex(('127.0.0.1',8080))while True: try: msg = input('>> ') if not msg: continue if msg == 'quit':break cli.send(msg.encode('utf-8')) msg = cli.recv(1024) print(msg.decode('utf-8')) except Exception: # 处理服务端异常断开 break
从服务端代码可以看出,如果多个客户端连接,同时发送消息到服务端,那么服务端一次显示一个消息并阻塞等待回复消息,然后for循环显示下一个套接字的消息,再阻塞回复。
这个模块更为高级,可以根据操作系统自动选择模式,比如epoll(优先选择),select,并且提供了统一的操作接口。
服务端:import selectorsimport socketsel_obj = selectors.DefaultSelector() # 创建selectors监听对象def accept(tcpser): # 定义套接字的回调函数 conn, addr = tcpser.accept() print('got a connection from',addr) conn.setblocking(False) sel_obj.register(conn, selectors.EVENT_READ, communicate) # 注册conn到监听对象def communicate(conn): # 定义套接字的回调函数 try: msg = conn.recv(1024) if not msg: print('-- lost connection with client --') sel_obj.unregister(conn) # 连接套接字断开,那么从监听对象中注销 conn.close() # 关闭套接字,释放OS资源 else: print('message: ',msg.decode('utf-8')) back_msg = input('anser: ').strip() conn.send(back_msg.encode('utf-8')) except Exception:# 客户端异常断开 print('-- client error --') sel_obj.unregister(conn) conn.close()tcpser = socket.socket()tcpser.bind(('0.0.0.0',8080))tcpser.listen(5)tcpser.setblocking(False)sel_obj.register(tcpser, selectors.EVENT_READ, accept)# 注册到监听对象:套接字对象和其执行函数绑定,括号内第一个参数是套接字对象,第三个是执行函数,中间固定的。# 相当于上面select模块的栗子,将服务端套接字加入监听列表print('waiting for connection...')while True: events = sel_obj.select() # 不论是select对象,还是epoll对象,调用监听,接口都是obj.select(), # 监听返回值是有数据到达的套接字对象,放在events里 print(events) for key, mask in events: # 请自动忽略mask... # 通过key.data可以取到套接字对象绑定的方法比如tcpser绑定accept, conn绑定communicate(recv+send) # 通过key.fileobj可以取到套接字对象 callback_func = key.data # 拿到回调函数 callback_func(key.fileobj) # 执行函数
客户端同select的栗子。
转载地址:http://lhdib.baihongyu.com/