Python并发编程

Python并发编程

区分并发(Concurrency)与并行(Parallelism)

首先,我们要区分并发与并行,这是一对易混的概念。

并发

在Python中,并发并不是指在同一时刻有多个操作(thread、task)同时进行。在某个特定的时刻,它只允许有一个操作发生,只不过线程、任务之间会互相切换,直到结束。

对于线程,操作系统清楚每个线程的所有信心,因而操作系统会自主执行线程切换操作。优点在于代码编写简单,不需要执行任何切换线程的操作;缺点在于线程切换操作可能发生在某个语句的执行过程中,容易出现race condition的情况。

而对于协程,主程序要切换任务必须得到该任务可切换的通知,从而可以避免race condition的情况。

并行

而并行则指的是同一时刻、同时发生,对应Python中的multi-processing。假如电脑中有6核处理器,那么在运行程序时可以同时开6个进程,原理如下图所示:

img

  • 并发操作多用于I/O操作频繁的场景,如下载文件等,I/O操作的时间可能远长于CPU运行时间。
  • 并行操作更多用于CPU消耗大的场景,如并行计算等。

Python中的并发编程-Future

单线程与多线性性能比较

假设有一个任务,从一些网站上下载内容并打印,单线程的实现方式如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
import requests
import time

def download_one(url):
resp = requests.get(url)
print('Read {} from {}'.format(len(resp.content), url))

def download_all(sites):
for site in sites:
download_one(site)

def main():
sites = [
'https://en.wikipedia.org/wiki/Portal:Arts',
'https://en.wikipedia.org/wiki/Portal:History',
'https://en.wikipedia.org/wiki/Portal:Society',
'https://en.wikipedia.org/wiki/Portal:Biography',
'https://en.wikipedia.org/wiki/Portal:Mathematics',
'https://en.wikipedia.org/wiki/Portal:Technology',
'https://en.wikipedia.org/wiki/Portal:Geography',
'https://en.wikipedia.org/wiki/Portal:Science',
'https://en.wikipedia.org/wiki/Computer_science',
'https://en.wikipedia.org/wiki/Python_(programming_language)',
'https://en.wikipedia.org/wiki/Java_(programming_language)',
'https://en.wikipedia.org/wiki/PHP',
'https://en.wikipedia.org/wiki/Node.js',
'https://en.wikipedia.org/wiki/The_C_Programming_Language',
'https://en.wikipedia.org/wiki/Go_(programming_language)'
]
start_time = time.perf_counter()
download_all(sites)
end_time = time.perf_counter()
print('Download {} sites in {} seconds'.format(len(sites), end_time - start_time))

if __name__ == '__main__':
main()

# 输出
Read 129886 from https://en.wikipedia.org/wiki/Portal:Arts
Read 184343 from https://en.wikipedia.org/wiki/Portal:History
Read 224118 from https://en.wikipedia.org/wiki/Portal:Society
Read 107637 from https://en.wikipedia.org/wiki/Portal:Biography
Read 151021 from https://en.wikipedia.org/wiki/Portal:Mathematics
Read 157811 from https://en.wikipedia.org/wiki/Portal:Technology
Read 167923 from https://en.wikipedia.org/wiki/Portal:Geography
Read 93347 from https://en.wikipedia.org/wiki/Portal:Science
Read 321352 from https://en.wikipedia.org/wiki/Computer_science
Read 391905 from https://en.wikipedia.org/wiki/Python_(programming_language)
Read 321417 from https://en.wikipedia.org/wiki/Java_(programming_language)
Read 468461 from https://en.wikipedia.org/wiki/PHP
Read 180298 from https://en.wikipedia.org/wiki/Node.js
Read 56765 from https://en.wikipedia.org/wiki/The_C_Programming_Language
Read 324039 from https://en.wikipedia.org/wiki/Go_(programming_language)
Download 15 sites in 2.464231112999869 seconds

上述代码的实现思路为:

  • 遍历存储网站的列表;
  • 对当前网站执行下载操作;
  • 等待当前操作完成后,再对下一个网站执行同样的操作,直到结束。

上述单线程程序简单明了,但是效率底下,程序的大多数时间都被消耗在I/O等待上。程序每次对一个网站执行下载操作都必须等待前一个网站下载完成后才能开始,这样的程序是无法在实际生产中使用的。

多线程代码如下所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58

import concurrent.futures
import requests
import threading
import time

def download_one(url):
resp = requests.get(url)
print('Read {} from {}'.format(len(resp.content), url))


def download_all(sites):
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
executor.map(download_one, sites)

def main():
sites = [
'https://en.wikipedia.org/wiki/Portal:Arts',
'https://en.wikipedia.org/wiki/Portal:History',
'https://en.wikipedia.org/wiki/Portal:Society',
'https://en.wikipedia.org/wiki/Portal:Biography',
'https://en.wikipedia.org/wiki/Portal:Mathematics',
'https://en.wikipedia.org/wiki/Portal:Technology',
'https://en.wikipedia.org/wiki/Portal:Geography',
'https://en.wikipedia.org/wiki/Portal:Science',
'https://en.wikipedia.org/wiki/Computer_science',
'https://en.wikipedia.org/wiki/Python_(programming_language)',
'https://en.wikipedia.org/wiki/Java_(programming_language)',
'https://en.wikipedia.org/wiki/PHP',
'https://en.wikipedia.org/wiki/Node.js',
'https://en.wikipedia.org/wiki/The_C_Programming_Language',
'https://en.wikipedia.org/wiki/Go_(programming_language)'
]
start_time = time.perf_counter()
download_all(sites)
end_time = time.perf_counter()
print('Download {} sites in {} seconds'.format(len(sites), end_time - start_time))

if __name__ == '__main__':
main()

## 输出
Read 151021 from https://en.wikipedia.org/wiki/Portal:Mathematics
Read 129886 from https://en.wikipedia.org/wiki/Portal:Arts
Read 107637 from https://en.wikipedia.org/wiki/Portal:Biography
Read 224118 from https://en.wikipedia.org/wiki/Portal:Society
Read 184343 from https://en.wikipedia.org/wiki/Portal:History
Read 167923 from https://en.wikipedia.org/wiki/Portal:Geography
Read 157811 from https://en.wikipedia.org/wiki/Portal:Technology
Read 91533 from https://en.wikipedia.org/wiki/Portal:Science
Read 321352 from https://en.wikipedia.org/wiki/Computer_science
Read 391905 from https://en.wikipedia.org/wiki/Python_(programming_language)
Read 180298 from https://en.wikipedia.org/wiki/Node.js
Read 56765 from https://en.wikipedia.org/wiki/The_C_Programming_Language
Read 468461 from https://en.wikipedia.org/wiki/PHP
Read 321417 from https://en.wikipedia.org/wiki/Java_(programming_language)
Read 324039 from https://en.wikipedia.org/wiki/Go_(programming_language)
Download 15 sites in 0.19936635800002023 seconds

时间明显减少。

代码的主要区别为:

1
2
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
executor.map(download_one, sites)

这段代码创建了一个可以调度5个线程的线程池,同时使用executor.map()sites中的每一个元素调用download_one()操作。在download_one()函数中使用的requests.get()方法是线程安全的,因而不会出现race condition问题。

其中,线程的数目并不是越多越好,过多的线程会导致程序在各个线程之间反复切换,线程的创建、删除和维护会造成一定的开销。

除此之外,也可以使用并行的方式:

1
2
3
with futures.ThreadPoolExecutor(workers) as executor
=>
with futures.ProcessPoolExecutor() as executor:

但是,多进程适用于CPU heavy的场景,在I/O heavy的场景中,多数时间都在等待,相比于多线程,使用多进程并不会提升效率。

多线程每次只能有一个线程执行

同一时刻,Python主程序只允许有一个线程执行,因而Python的并发是通过多线程的切换完成的。原因在于,Python的解释器并不是线程安全的,为了解决由此带来的race condition问题,Python引入了全局解释器锁,也就是同一时刻只允许一个线程执行。如果在执行I/O操作时,一个线程被阻塞了,全局解释器锁便会被释放,从而让另一个线程能够继续执行。

Python的并发编程-Asyncio

多线程有以下缺点:

  • 运行时容易被打断,出现race condition的情况;
  • 线程切换存在一定的损耗。

什么是Asyncio?

Sync(同步)和Async(异步):

  • Sync指的是操作一个接一个执行,下一个操作必须等待当前操作完成后才能执行;
  • Async指的是不同的操作可以相互交互执行,如果其中某个操作被阻塞,程序便会切换到可以执行的操作继续执行。

Asyncio的工作原理

Asyncio是单线程的,只有一个主线程,但是可以执行多个不同的任务(task),任务为特殊的future对象。不同的任务使用名为event loop的对象控制。

假设任务只有两个状态:

  • 预备状态:指任务当前空闲,但随时准备运行;
  • 等待状态:任务已经运行,但正在等待外部的操作完成。

此时,event loop会维持两个任务列表,分别对应两个状态;event loop会从预备状态列表中选取一个任务(选取依据与任务的等待时长、占用资源等有关),使其运行,并直到该任务将控制权交还给event loop。

当任务将控制权交还回event loop时,event loop会根据该任务是否完成,采取不同的操作:

  • 完成,则将该任务放到预备状态的列表;
  • 未完成,则将该任务放到等待状态的列表。

原先在预备状态列表的任务位置保持不变,因为这些任务尚未运行。

当所有的任务被重新放置到合适的列表后,新一轮的循环开始:event loop从预备状态的列表中选取一个任务使其执行,直到所有的任务都完成(等待状态的列表为空)。

Asyncio的任务在运行时不会被外部打断,因而不会出现race dondition的情况,因而不需要担心线程安全问题。

用法

以网站下载为例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57

import asyncio
import aiohttp
import time

async def download_one(url):
async with aiohttp.ClientSession() as session:
async with session.get(url) as resp:
print('Read {} from {}'.format(resp.content_length, url))

async def download_all(sites):
tasks = [asyncio.create_task(download_one(site)) for site in sites]
await asyncio.gather(*tasks)

def main():
sites = [
'https://en.wikipedia.org/wiki/Portal:Arts',
'https://en.wikipedia.org/wiki/Portal:History',
'https://en.wikipedia.org/wiki/Portal:Society',
'https://en.wikipedia.org/wiki/Portal:Biography',
'https://en.wikipedia.org/wiki/Portal:Mathematics',
'https://en.wikipedia.org/wiki/Portal:Technology',
'https://en.wikipedia.org/wiki/Portal:Geography',
'https://en.wikipedia.org/wiki/Portal:Science',
'https://en.wikipedia.org/wiki/Computer_science',
'https://en.wikipedia.org/wiki/Python_(programming_language)',
'https://en.wikipedia.org/wiki/Java_(programming_language)',
'https://en.wikipedia.org/wiki/PHP',
'https://en.wikipedia.org/wiki/Node.js',
'https://en.wikipedia.org/wiki/The_C_Programming_Language',
'https://en.wikipedia.org/wiki/Go_(programming_language)'
]
start_time = time.perf_counter()
asyncio.run(download_all(sites))
end_time = time.perf_counter()
print('Download {} sites in {} seconds'.format(len(sites), end_time - start_time))

if __name__ == '__main__':
main()

## 输出
Read 63153 from https://en.wikipedia.org/wiki/Java_(programming_language)
Read 31461 from https://en.wikipedia.org/wiki/Portal:Society
Read 23965 from https://en.wikipedia.org/wiki/Portal:Biography
Read 36312 from https://en.wikipedia.org/wiki/Portal:History
Read 25203 from https://en.wikipedia.org/wiki/Portal:Arts
Read 15160 from https://en.wikipedia.org/wiki/The_C_Programming_Language
Read 28749 from https://en.wikipedia.org/wiki/Portal:Mathematics
Read 29587 from https://en.wikipedia.org/wiki/Portal:Technology
Read 79318 from https://en.wikipedia.org/wiki/PHP
Read 30298 from https://en.wikipedia.org/wiki/Portal:Geography
Read 73914 from https://en.wikipedia.org/wiki/Python_(programming_language)
Read 62218 from https://en.wikipedia.org/wiki/Go_(programming_language)
Read 22318 from https://en.wikipedia.org/wiki/Portal:Science
Read 36800 from https://en.wikipedia.org/wiki/Node.js
Read 67028 from https://en.wikipedia.org/wiki/Computer_science
Download 15 sites in 0.062144195078872144 seconds

关键字asyncawait表示该语句或函数是non-block的,对应event-loop概念。如果任务执行的过程需要等待,则将其放入等待状态的列表中,然后继续执行预备状态列表中的任务。

主函数中的asyncio.run(coro)表示拿到event loop,运行输入的coro,直到结束,并关闭event loop。asyncio.run()由Python3.7+引入,相当于老版本的:

1
2
3
4
5
loop = asyncio.get_event_loop()
try:
loop.run_until_complete(coro)
finally:
loop.close()
1
2
tasks = [asyncio.create_task(download_one(site)) for site in sites]
await asyncio.gather(*task)

asyncio.create_task(coro)表示对输入的协程coro创建一个任务,安排其执行并返回任务对象。asyncio.gather()表示运行输入的序列中的所有任务。

Asyncio的缺陷

  • 可能需要特定的库支持;
  • 要注意任务的调度问题。

选择多线程还是Asyncio?

遵循以下的编程规范:

1
2
3
4
5
6
7
8
9
10
if io_bound:
4# I/O频繁
if io_slow:
# I/O比较耗时
print('Use Asyncio')
else:
print('Use multi-threading')
else if cpu_bound:
4# CPU频繁
print('Use multi-processing')

不同于多线程,Asyncio是单线程的,但其内部event loop的机制使其可以并发地运行多个不同的任务,并且比多线程享有更大的自主控制权。

Asyncio中的任务在运行过程中不会被打断,因而更适用于I/O操作heavy的场景。因为Asyncio内部任务切换的损耗远比线程切换的损耗小;并且Asyncio可以开启的任务数量也比多线程中的线程数量多得多。

参考