Python协程单线程异步IO实现并发

编程 · 2023-08-17 · 189 人浏览

Python中单线程的异步编程模型称为协程。一般情况下,当程序处于IO操作的时候,线程都会处于阻塞状态。线程是CPU控制的,而协程是程序自身控制的,属于程序级别的切换,操作系统完全感知不到,因而更加轻量级。

协程:当程序遇见了IO操作的时候,可以选择性的切换到其他任务上。在单线程条件下,微观上协程是一个任务一个任务的进行切换,宏观上看是多个任务一起在执行(多任务异步操作)。

要用在异步IO编程中依赖的库必须支持异步IO特性。

asyncio

是Python 3.4之后引入的标准库,内置对异步IO的支持。其编程模型是一个消息循环,从asyncio模块中直接获取一个EventLoop的引用,然后把需要执行的协程扔到EventLoop中执行,就实现了异步IO。

import asyncio  # 导入异步编程库
import time

async def func1():
    print("李小龙")
    await asyncio.sleep(5)
    print("李小龙")

async def func2():
    print("李大龙")
    await asyncio.sleep(3)
    print("李大龙")

async def main():
    tasks = [asyncio.create_task(func1()), asyncio.create_task(func2())]
    await asyncio.wait(tasks)

if __name__ == "__main__":
    t1 = time.time()
    asyncio.run(main())
    t2 = time.time()
    print(t2 - t1)

异步爬虫

爬虫中requests库不支持异步,需要用aiohttp。

import asyncio
import aiohttp
import time

async def async_craw(url):
    print("craw url:", url)
    async with aiohttp.ClientSession() as session:
        async with session.get(url) as resp:
            result = await resp.text()
            print(f"craw url: {url}, {len(result)}")

urls = [f"https://news.cnblogs.com/n/page/{page}" for page in range(1, 10 + 1)]

loop = asyncio.get_event_loop()
tasks = [loop.create_task(async_craw(url)) for url in urls]

start = time.time()
loop.run_until_complete(asyncio.wait(tasks))
end = time.time()
print("use time seconds:", end - start)

Python 3.7以上版本的写法:

import asyncio
import aiohttp
import time

urls = [f"https://news.cnblogs.com/n/page/{page}" for page in range(1, 10 + 1)]

async def async_craw(url):
    print("craw url:", url)
    async with aiohttp.ClientSession() as session:
        async with session.get(url) as resp:
            result = await resp.text()
            print(f"craw url: {url}, {len(result)}")

async def main():
    tasks = [asyncio.create_task(async_craw(url)) for url in urls]
    asyncio.wait(tasks)

if __name__ == "__main__":
    start = time.time()
    asyncio.run(main())
    end = time.time()
    print("use time seconds:", end - start)

信号量Semaphore

信号量又称为旗语,是一个计数器,用来设置并发度。

import asyncio
import aiohttp
import time

semaphore = asyncio.Semaphore(5)  # 信号量 5

async def async_craw(url):
    async with semaphore:
        print("craw url:", url)
        async with aiohttp.ClientSession() as session:
            async with session.get(url) as resp:
                result = await resp.text()
                print(f"craw url: {url}, {len(result)}")

urls = [f"https://news.cnblogs.com/n/page/{page}" for page in range(1, 10 + 1)]

loop = asyncio.get_event_loop()
tasks = [loop.create_task(async_craw(url)) for url in urls]

start = time.time()
loop.run_until_complete(asyncio.wait(tasks))
end = time.time()
print("use time seconds:", end - start)
Python
Theme Jasmine by Kent Liao