您当前的位置: 首页 >  Python

Python编程:协程coroutine

彭世瑜 发布时间:2018-01-28 11:40:59 ,浏览量:2

关于协程

协程 coroutine 微线程,一种用户态的轻量级线程

好处: 无需线程上下文切换的开销 无需原子操作锁定及同步的开销 方便切换控制流,简化编程模型 高并发+高扩展+低成本,一个cup支持上万的协程都不是问题

缺点: 无法利用多核资源,协程的本质是单线程, 进程阻塞blocking操作如io时会阻塞整个程序

单线程下实现并发效果:遇到io就切换

服务器处理模型: 1.一个进程处理一个请求 2.一个线程处理一个请求 3.主进程处理事件队列的请求

事件驱动模型: 多个事件 -> 消息队列 -> 处理线程

事件驱动编程是一种编程范式,这里程序的执行流由外部事件来决定。 它的特点是包含一个事件循环,当外部事件发生时使用回调机制来触发相应的处理。 另外两种常见的编程范式是(单线程)同步以及多线程编程。

异步asynchronous

用户空间application 和 内核空间kernel 进程控制块(Processing Control Block)

参考: http://www.cnblogs.com/alex3714/articles/5876749.html

io操作两个阶段:内核缓冲区数据准备,拷贝到应用程序地址空间

阻塞io blocking 一直等待 非阻塞io nonblocking 准备数据立刻返回 多路复用 select,poll,epoll 异步io 没有被阻塞

twisted框架

手动切换协程
from greenlet import greenlet

def foo1():
    print("foo11")
    gr2.switch()
    print("foo12")
    gr2.switch()

def foo2():
    print("foo21")
    gr1.switch()
    print("foo22")

gr1 = greenlet(foo1)  # 创建协程
gr2 = greenlet(foo2)

print("main")

gr1.switch()  # 切换协程

print("done")
"""
main
foo11
foo21
foo12
foo22
done
"""
自动切换协程
import gevent  # 第三方库

def foo1():
    print("foo11")
    gevent.sleep(2)
    print("foo12")

def foo2():
    print("foo21")
    gevent.sleep(2)
    print("foo22")

def foo3():
    print("foo31")
    gevent.sleep(0)
    print("foo32")

gevent.joinall([
    gevent.spawn(foo1),
    gevent.spawn(foo2),
    gevent.spawn(foo3)
])

"""
foo11
foo21
foo31
foo32
foo12
foo22
"""
gevent 多并发socket
import socket
import gevent
from gevent import monkey

monkey.patch_all()  # 猴子补丁

def server(port):
    s = socket.socket()
    s.bind(("0.0.0.0", port))
    s.listen(500)
    print("服务已启动")
    while True:
        conn, addr = s.accept()
        print(addr)
        gevent.spawn(handle_request, conn)

def handle_request(conn): # 处理请求的协程
    try:
        while True:
            data = conn.recv(1024)
            print(conn, data.decode())
            conn.send(data)
            if not data:
                conn.shutdown(socket.SHUT_WR)

    except Exception as e:
        print(e)

    finally:
        print("关闭", conn)
        conn.close()

if __name__ == "__main__":
    server(6969)
select 多并发socket
# fd  文件描述符

import select
import socket
import queue

server = socket.socket()

server.setblocking(False)  # 不阻塞

server.bind(("localhost", 6969))
server.listen(5)
print("服务已开启")

inputs = [server, ]  # server本身也是一个文件描述符
outputs = []

message_dict = {}

while True:
    # 如果没有任何 fd 就绪,就会阻塞在这里
    readable, writeable, exceptional = select.select(inputs, outputs, inputs)
    print("readable", readable)
    print("writeable", writeable)
    print("exceptional", exceptional)

    for r in readable:
        if r is server:  # server就绪,新连接来了
            conn, addr = r.accept()
            print("已连接", conn)
            conn.setblocking(False)
            inputs.append(conn)  # 为了不阻塞程序,收到连接对象后放入列表,如果接收到信息,fd就绪
            message_dict[conn] = queue.Queue()  # 新建消息队列,不立刻返回,逐个处理

        else:  # 如果不是服务器,就是客户端
            data = r.recv(1024)
            if not data:
                print("连接断开")
            else:
                print("收到数据", data.decode("utf-8"))
                message_dict[r].put(data)  # 放入消息队列
                outputs.append(r)  # 不影响其他客户端连接,后续处理发送信息

    for s in outputs:  # 要返回给客户端的连接表
        data = message_dict[s].get()
        s.send(data)
        outputs.remove(s)  # 处理完就删除

    for e in exceptional:  # 删除异常连接
        if e in outputs:
            outputs.remove(e)
        inputs.remove(e)
        del message_dict[e]
selectors 多并发socket
import selectors
import socket

def accept(server, mask):  # 接受连接
    conn, addr = server.accept()
    print("conn:", conn, "addr:", addr, "mask:", mask)
    conn.setblocking(False)
    selector.register(conn, selectors.EVENT_READ, action)

def action(conn, mask):  # 接收数据
    data = conn.recv(1024)
    if data:
        print("conn:", conn, "data:", data.decode("utf-8"))
        conn.send(data)
    else:
        print("断开连接")
        conn.close()
        selector.unregister(conn)

server = socket.socket()
server.setblocking(False)
address = ("localhost", 6969)
server.bind(address)
server.listen(1000)

selector = selectors.DefaultSelector()
selector.register(server, selectors.EVENT_READ, accept)  # accept回调函数
print("服务启动")

while True:
    events = selector.select()  # 默认阻塞,有活动则返回活动列表
    for key, mask in events:
        callback = key.data  # accept
        callback(key.fileobj, mask)  # fileobj 文件句柄
异步爬虫(并行)
from urllib import request
from gevent import monkey
import time, gevent

monkey.patch_all()  # 把当前程序所有io操作都做上标记

def get_html(url):
    response = request.urlopen(url)
    html = response.read()
    print("receive:", len(html))

urls = [
    "https://www.python.org/",
    "https://www.yahoo.com/",
    "https://github.com/"
    ]

start_time = time.time()

for url in urls:
    html = get_html(url)

end_time = time.time()

print("串行:", end_time - start_time)

async_start_time = time.time()

gevent.joinall([
    gevent.spawn(get_html, urls[0]),
    gevent.spawn(get_html, urls[1]),
    gevent.spawn(get_html, urls[2])
])

async_end_time = time.time()

print("并行:", async_end_time - async_start_time)

"""
receive: 48893
receive: 511654
receive: 52225
串行: 4.339118003845215
receive: 48893
receive: 502566
receive: 52223
并行: 1.4489901065826416
"""
关注
打赏
1688896170
查看更多评论

彭世瑜

暂无认证

  • 2浏览

    0关注

    2727博文

    0收益

  • 0浏览

    0点赞

    0打赏

    0留言

私信
关注
热门博文
立即登录/注册

微信扫码登录

0.2431s