<返回更多

Python语法-多进程、多线程、协程(异步IO)

2021-11-30    Python林路
加入收藏

相关概念

Python语法-多进程、多线程、协程(异步IO)

 

并发和并行

因为

计算机CPU(CPU核心)在同一时刻只能运行一个程序。

同步和异步

阻塞和非阻塞

CPU密集型和I/O密集型

CPU密集型(CPU-bound):

CPU密集型又叫做计算密集型,指I/O在很短时间就能完成,CPU需要大量的计算和处理,特点是CPU占用高。

例如:压缩解压缩、加密解密、正则表达式搜索。

IO密集型(I/O-bound):

IO密集型是指系统运行时大部分时间时CPU在等待IO操作(硬盘/内存)的读写操作,特点是CPU占用较低。

例如:文件读写、网络爬虫、数据库读写。

多进程、多线程、多协程的对比

类型

优点

缺点

适用

多进程
Process(multiprocessing)

可以利用CPU多核并行运算

占用资源最多
可启动数目比线程少

CPU密集型计算

多线程
Thread(threading)

相比进程更轻量占用资源少

相比进程,多线程只能并发执行,不能利用多CPU(GIL)
相比协程启动数目有限制,占用内存资源有线程切换开销

IO密集型计算、同时运行的任务要求不多

多协程
Coroutine(asyncio)

内存开销最少,启动协程数量最多

支持库的限制
代码实现复杂

IO密集型计算、同时运行的较多任务

GIL全称Global Interpreter Lock

下图为GIL的运行

Python语法-多进程、多线程、协程(异步IO)

 

Python/ target=_blank class=infotextkey>Python的多线程是伪多线程,同时只能有一个线程运行。

一个进程能够启动N个线程,数量受系统限制。

一个线程能够启动N个协程,数量不受限制。

怎么选择

对于其他语言来说,多线程是能同时利用多CPU(核)的,所以是适用CPU密集型计算的,但是Python由于GIL的限制,只能使用IO密集型计算。

所以对于Python来说:

对于IO密集型来说能用多协程就用多协程,没有库支持才用多线程。

对于CPU密集型就只能用多进程了。

Python语法-多进程、多线程、协程(异步IO)

 

协程(异步IO)

简单示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
import asyncio


async def test():
    await asyncio.sleep(3)
    return "123"


async def main():
    result = await test()
    print(result)


if __name__ == '__main__':
    asyncio.run(main())

 

单次请求查看结果

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
import threading
import asyncio


async def myfun(index):
    print(f'[{index}]({threading.currentThread().name})')
    await asyncio.sleep(1)
    return index


def getfuture(future):
    print(f"结果为:{future.result()}")

if __name__ == "__main__":
    loop = asyncio.get_event_loop()
    future = asyncio.ensure_future(myfun(1))
    future.add_done_callback(getfuture)
    loop.run_until_complete(future)
    loop.close()

或者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
import threading
import asyncio


async def myfun(index):
    print(f'[{index}]({threading.currentThread().name})')
    await asyncio.sleep(1)
    return index

if __name__ == "__main__":
    loop = asyncio.get_event_loop()
    future = asyncio.ensure_future(myfun(1))
    loop.run_until_complete(future)
    print(f"结果为:{future.result()}")
    loop.close()

 

多次请求查看结果

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
import threading
import asyncio


async def myfun(index):
    print(f'线程({threading.currentThread().name}) 传入参数({index})')
    await asyncio.sleep(1)
    return index

loop = asyncio.get_event_loop()
future_list = []
for item in range(3):
    future = asyncio.ensure_future(myfun(item))
    future_list.Append(future)
loop.run_until_complete(asyncio.wait(future_list))
for future in future_list:
    print(f"结果为:{future.result()}")
loop.close()

 

asyncio.wait和asyncio.gather

1
2
3
4
5
6
7
8
9
10
11
12
13
14
import threading
import asyncio


async def myfun(index):
    print(f'[{index}]({threading.currentThread().name})')
    await asyncio.sleep(1)


loop = asyncio.get_event_loop()
tasks = [myfun(1), myfun(2)]
loop.run_until_complete(asyncio.wait(tasks))
#loop.run_until_complete(asyncio.gather(*tasks))
loop.close()

asyncio.gather 和asyncio.wait区别:

在内部wait()使用一个set保存它创建的Task实例。因为set是无序的所以这也就是我们的任务不是顺序执行的原因。wait的返回值是一个元组,包括两个集合,分别表示已完成和未完成的任务。wait第二个参数为一个超时值
达到这个超时时间后,未完成的任务状态变为pending,当程序退出时还有任务没有完成此时就会看到如下的错误提示。

gather的使用
gather的作用和wait类似不同的是。

  1. gather任务无法取消。
  2. 返回值是一个结果列表
  3. 可以按照传入参数的 顺序,顺序输出。
Python语法-多进程、多线程、协程(异步IO)

 

协程和多线程结合

同时多个请求

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
import asyncio
import time
from concurrent.futures import ThreadPoolExecutor

import requests


def myquery(url):
    r = requests.get(url)
    print(r.text)
    return r.text


if __name__ == "__main__":
    loop = asyncio.get_event_loop()
    executor = ThreadPoolExecutor(3)
    urls = ["https://www.psvmc.cn/userlist.json", "https://www.psvmc.cn/login.json"]
    tasks = []
    start_time = time.time()
    for url in urls:
        task = loop.run_in_executor(executor, myquery, url)
        tasks.append(task)
    loop.run_until_complete(asyncio.wait(tasks))
    print(f"用时{time.time() - start_time}")

结果

1
2
3
{"code":0,"msg":"success","obj":{"name":"小明","sex":"男","token":"psvmc"}}
{"code":0,"msg":"success","obj":[{"name":"小明","sex":"男"},{"name":"小红","sex":"女"},{"name":"小刚","sex":"未知"}]}
用时0.11207175254821777

 

单个请求添加回调

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
import asyncio
import threading
import time
from concurrent.futures import ThreadPoolExecutor

import requests


def myquery(url):
    print(f"请求所在线程:{threading.current_thread().name}")
    r = requests.get(url)
    return r.text


def myfuture(future):
    print(f"回调所在线程:{threading.current_thread().name}")
    print(future.result())


if __name__ == "__main__":
    loop = asyncio.get_event_loop()
    executor = ThreadPoolExecutor(3)
    url = "https://www.psvmc.cn/userlist.json"
    tasks = []
    start_time = time.time()
    task = loop.run_in_executor(executor, myquery, url)
    future = asyncio.ensure_future(task)
    future.add_done_callback(myfuture)
    loop.run_until_complete(future)
    print(f"用时{time.time() - start_time}")

 

多线程与多进程

多线程

引用模块

1
2
3
4
5
6
7
8
from threading import Thread

def func(num):
    return num

t = Thread(target=func, args=(100,))
t.start()
t.join()

数据通信

1
2
3
4
5
import queue

q = queue.Queue()
q.put(1)
item = q.get()

1
2
3
4
5
from threading import Lock

lock = Lock()
with lock:
    pass

 

池化技术

1
2
3
4
5
6
7
8
from concurrent.futures import ThreadPoolExecutor

with ThreadPoolExecutor() as executor:
    # 方法1
    results = executor.map(func, [1, 2, 3])
    # 方法2
    future = executor.submit(func, 1)
    result = future.result()

示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
from concurrent.futures import ThreadPoolExecutor
import threading
import time


# 定义一个准备作为线程任务的函数
def action(num):
    print(threading.current_thread().name)
    time.sleep(num)
    return num + 100


if __name__ == "__main__":
    # 创建一个包含3条线程的线程池
    with ThreadPoolExecutor(max_workers=3) as pool:
        future1 = pool.submit(action, 3)

        future1.result()
        print(f"单个任务返回:{future1.result()}")

        print('------------------------------')
        # 使用线程执行map计算
        results = pool.map(action, (1, 3, 5))
        for r in results:
            print(f"多个任务返回:{r}")

结果

1
2
3
4
5
6
7
8
9
ThreadPoolExecutor-0_0
单个任务返回:103
------------------------------
ThreadPoolExecutor-0_0
ThreadPoolExecutor-0_1
ThreadPoolExecutor-0_2
多个任务返回:101
多个任务返回:103
多个任务返回:105

 

多进程

引用模块

1
2
3
4
5
6
7
8
from multiprocessing import Process

def func(num):
    return num

t = Process(target=func, args=(100,))
t.start()
t.join()

 

数据通信

1
2
3
4
import multiprocessing
q = multiprocessing.Queue()
q.put(1)
item = q.get()

 

1
2
3
4
5
from multiprocessing import Lock

lock = Lock()
with lock:
    pass

 

池化技术

1
2
3
4
5
6
7
8
from concurrent.futures import ProcessPoolExecutor

with ProcessPoolExecutor() as executor:
    # 方法1
    results = executor.map(func, [1, 2, 3])
    # 方法2
    future = executor.submit(func, 1)
    result = future.result()

 

示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
from concurrent.futures import ProcessPoolExecutor
import multiprocessing
import time


# 定义一个准备作为进程任务的函数
def action(num):
    print(multiprocessing.current_process().name)
    time.sleep(num)
    return num + 100


if __name__ == "__main__":
    # 创建一个包含3条进程的进程池
    with ProcessPoolExecutor(max_workers=3) as pool:
        future1 = pool.submit(action, 3)

        future1.result()
        print(f"单个任务返回:{future1.result()}")

        print('------------------------------')
        # 使用线程执行map计算
        results = pool.map(action, [1, 3, 5])
        for r in results:
            print(f"多个任务返回:{r}")

结果

1
2
3
4
5
6
7
8
9
SpawnProcess-1
单个任务返回:103
------------------------------
SpawnProcess-2
SpawnProcess-3
SpawnProcess-1
多个任务返回:101
多个任务返回:103
多个任务返回:105

 

多进程/多线程/协程对比

异步 IO(asyncio)、多进程(multiprocessing)、多线程(multithreading)

IO 密集型应用CPU等待IO时间远大于CPU 自身运行时间,太浪费;

常见的 IO 密集型业务包括:浏览器交互、磁盘请求、网络爬虫、数据库请求等

Python 世界对于 IO 密集型场景的并发提升有 3 种方法:多进程、多线程、多协程;

理论上讲asyncio是性能最高的,原因如下:

  1. 进程、线程会有CPU上下文切换
  2. 进程、线程需要内核态和用户态的交互,性能开销大;而协程对内核透明的,只在用户态运行
  3. 进程、线程并不可以无限创建,最佳实践一般是 CPU*2;而协程并发能力强,并发上限理论上取决于操作系统IO多路复用(linux下是 epoll)可注册的文件描述符的极限

那asyncio的实际表现是否如理论上那么强,到底强多少呢?我构建了如下测试场景:

请求10此,并sleep 1s模拟业务查询

最后的asyncio+uvloop和官方asyncio 最大不同是用 Cython+libuv 重新实现了asyncio 的事件循环(event loop)部分,

官方测试性能是 node.js的 2 倍,持平 golang。

Python语法-多进程、多线程、协程(异步IO)

 

顺序串行执行

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
import time


def query(num):
    print(num)
    time.sleep(1)


def main():
    for h in range(10):
        query(h)


# main entrance
if __name__ == '__main__':
    start_time = time.perf_counter()
    main()
    end_time = time.perf_counter()
    print(f"时间差:{end_time-start_time}")

 

多进程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
from concurrent import futures
import time


def query(num):
    print(num)
    time.sleep(1)


def main():
    with futures.ProcessPoolExecutor() as executor:
        for future in executor.map(query, range(10)):
            pass


# main entrance
if __name__ == '__main__':
    start_time = time.perf_counter()
    main()
    end_time = time.perf_counter()
    print(f"时间差:{end_time-start_time}")

 

多线程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
from concurrent import futures
import time


def query(num):
    print(num)
    time.sleep(1)


def main():
    with futures.ThreadPoolExecutor() as executor:
        for future in executor.map(query, range(10)):
            pass


# main entrance
if __name__ == '__main__':
    start_time = time.perf_counter()
    main()
    end_time = time.perf_counter()
    print(f"时间差:{end_time-start_time}")

 

asyncio

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
import asyncio
import time


async def query(num):
    print(num)
    await asyncio.sleep(1)


async def main():
    tasks = [asyncio.create_task(query(num)) for num in range(10)]
    await asyncio.gather(*tasks)


# main entrance
if __name__ == '__main__':
    start_time = time.perf_counter()
    asyncio.run(main())
    end_time = time.perf_counter()
    print(f"时间差:{end_time-start_time}")

 

asyncio+uvloop

注意

windows上不支持uvloop。

示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
import asyncio
import uvloop
import time


async def query(num):
    print(num)
    await asyncio.sleep(1)


async def main():
    tasks = [asyncio.create_task(query(host)) for host in range(10)]
    await asyncio.gather(*tasks)


# main entrance
if __name__ == '__main__':
    uvloop.install()
    start_time = time.perf_counter()
    asyncio.run(main())
    end_time = time.perf_counter()
    print(f"时间差:{end_time-start_time}")

 

运行时间对比

方式

运行时间

串行

10.0750972s

多进程

1.1638731999999998s

多线程

1.0146456s

asyncio

1.0110082s

asyncio+uvloop

1.01s

 


可以看出: 无论多进程、多线程还是asyncio都能大幅提升IO 密集型场景下的并发,但asyncio+uvloop性能最高!

Python语法-多进程、多线程、协程(异步IO)

 

原文链接:
https://www.psvmc.cn/article/2021-11-24-python-async.html

声明:本站部分内容来自互联网,如有版权侵犯或其他问题请与我们联系,我们将立即删除或处理。
▍相关推荐
更多资讯 >>>