关于协程
协程 coroutine 微线程,一种用户态的轻量级线程
好处: 无需线程上下文切换的开销 无需原子操作锁定及同步的开销 方便切换控制流,简化编程模型 高并发+高扩展+低成本,一个cup支持上万的协程都不是问题
缺点: 无法利用多核资源,协程的本质是单线程, 进程阻塞blocking操作如io时会阻塞整个程序
单线程下实现并发效果:遇到io就切换
服务器处理模型: 1.一个进程处理一个请求 2.一个线程处理一个请求 3.主进程处理事件队列的请求
事件驱动模型: 多个事件 -> 消息队列 -> 处理线程
事件驱动编程是一种编程范式,这里程序的执行流由外部事件来决定。 它的特点是包含一个事件循环,当外部事件发生时使用回调机制来触发相应的处理。 另外两种常见的编程范式是(单线程)同步以及多线程编程。
异步asynchronous
用户空间application 和 内核空间kernel 进程控制块(Processing Control Block)
参考: http://www.cnblogs.com/alex3714/articles/5876749.html
io操作两个阶段:内核缓冲区数据准备,拷贝到应用程序地址空间
阻塞io blocking 一直等待 非阻塞io nonblocking 准备数据立刻返回 多路复用 select,poll,epoll 异步io 没有被阻塞
twisted框架
手动切换协程from greenlet import greenlet
def foo1():
print("foo11")
gr2.switch()
print("foo12")
gr2.switch()
def foo2():
print("foo21")
gr1.switch()
print("foo22")
gr1 = greenlet(foo1) # 创建协程
gr2 = greenlet(foo2)
print("main")
gr1.switch() # 切换协程
print("done")
"""
main
foo11
foo21
foo12
foo22
done
"""
自动切换协程
import gevent # 第三方库
def foo1():
print("foo11")
gevent.sleep(2)
print("foo12")
def foo2():
print("foo21")
gevent.sleep(2)
print("foo22")
def foo3():
print("foo31")
gevent.sleep(0)
print("foo32")
gevent.joinall([
gevent.spawn(foo1),
gevent.spawn(foo2),
gevent.spawn(foo3)
])
"""
foo11
foo21
foo31
foo32
foo12
foo22
"""
gevent 多并发socket
import socket
import gevent
from gevent import monkey
monkey.patch_all() # 猴子补丁
def server(port):
s = socket.socket()
s.bind(("0.0.0.0", port))
s.listen(500)
print("服务已启动")
while True:
conn, addr = s.accept()
print(addr)
gevent.spawn(handle_request, conn)
def handle_request(conn): # 处理请求的协程
try:
while True:
data = conn.recv(1024)
print(conn, data.decode())
conn.send(data)
if not data:
conn.shutdown(socket.SHUT_WR)
except Exception as e:
print(e)
finally:
print("关闭", conn)
conn.close()
if __name__ == "__main__":
server(6969)
select 多并发socket
# fd 文件描述符
import select
import socket
import queue
server = socket.socket()
server.setblocking(False) # 不阻塞
server.bind(("localhost", 6969))
server.listen(5)
print("服务已开启")
inputs = [server, ] # server本身也是一个文件描述符
outputs = []
message_dict = {}
while True:
# 如果没有任何 fd 就绪,就会阻塞在这里
readable, writeable, exceptional = select.select(inputs, outputs, inputs)
print("readable", readable)
print("writeable", writeable)
print("exceptional", exceptional)
for r in readable:
if r is server: # server就绪,新连接来了
conn, addr = r.accept()
print("已连接", conn)
conn.setblocking(False)
inputs.append(conn) # 为了不阻塞程序,收到连接对象后放入列表,如果接收到信息,fd就绪
message_dict[conn] = queue.Queue() # 新建消息队列,不立刻返回,逐个处理
else: # 如果不是服务器,就是客户端
data = r.recv(1024)
if not data:
print("连接断开")
else:
print("收到数据", data.decode("utf-8"))
message_dict[r].put(data) # 放入消息队列
outputs.append(r) # 不影响其他客户端连接,后续处理发送信息
for s in outputs: # 要返回给客户端的连接表
data = message_dict[s].get()
s.send(data)
outputs.remove(s) # 处理完就删除
for e in exceptional: # 删除异常连接
if e in outputs:
outputs.remove(e)
inputs.remove(e)
del message_dict[e]
selectors 多并发socket
import selectors
import socket
def accept(server, mask): # 接受连接
conn, addr = server.accept()
print("conn:", conn, "addr:", addr, "mask:", mask)
conn.setblocking(False)
selector.register(conn, selectors.EVENT_READ, action)
def action(conn, mask): # 接收数据
data = conn.recv(1024)
if data:
print("conn:", conn, "data:", data.decode("utf-8"))
conn.send(data)
else:
print("断开连接")
conn.close()
selector.unregister(conn)
server = socket.socket()
server.setblocking(False)
address = ("localhost", 6969)
server.bind(address)
server.listen(1000)
selector = selectors.DefaultSelector()
selector.register(server, selectors.EVENT_READ, accept) # accept回调函数
print("服务启动")
while True:
events = selector.select() # 默认阻塞,有活动则返回活动列表
for key, mask in events:
callback = key.data # accept
callback(key.fileobj, mask) # fileobj 文件句柄
异步爬虫(并行)
from urllib import request
from gevent import monkey
import time, gevent
monkey.patch_all() # 把当前程序所有io操作都做上标记
def get_html(url):
response = request.urlopen(url)
html = response.read()
print("receive:", len(html))
urls = [
"https://www.python.org/",
"https://www.yahoo.com/",
"https://github.com/"
]
start_time = time.time()
for url in urls:
html = get_html(url)
end_time = time.time()
print("串行:", end_time - start_time)
async_start_time = time.time()
gevent.joinall([
gevent.spawn(get_html, urls[0]),
gevent.spawn(get_html, urls[1]),
gevent.spawn(get_html, urls[2])
])
async_end_time = time.time()
print("并行:", async_end_time - async_start_time)
"""
receive: 48893
receive: 511654
receive: 52225
串行: 4.339118003845215
receive: 48893
receive: 502566
receive: 52223
并行: 1.4489901065826416
"""