Please enable Javascript to view the contents

Python 中的进程、线程、协程

 ·  ☕ 6 分钟

1. 进程

进程是正在运行的程序实例,是内核分配资源的最基本的单元。进程拥有自己独立的堆和栈,独立的地址空间,资源句柄。进程由 OS 调度,调度开销较大,在并发的切换过程效率较低。

Python 提供了一个跨平台的多进程模块 multiprocessing,模块中使用 Process 类来代表一个进程对象。

1.1 多进程示例

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
import os
from multiprocessing import Process

# 子进程执行的代码
def run_proc(name):
    print('Run child process %s (%s)...' % (name, os.getpid()))

if __name__=='__main__':
    print('Parent process %s.' % os.getpid())
    p = Process(target=run_proc, args=('test',))  # target 指定要执行的函数,args 指定参数
    print('Child process will start.')
    p.start() #启动 Process 实例
    p.join() #等待子进程结束后,继续往下执行
    print('Child process end.')
Parent process 274.
Child process will start.
Run child process test (298)...
Child process end.

1.2 进程池示例

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
import os, time
from multiprocessing import Pool

def long_time_task(name):
    print('Run task %s (%s)...' % (name, os.getpid()))
    start = time.time()
    time.sleep(3)
    end = time.time()
    print('Task %s runs %0.2f seconds.' % (name, (end - start)))

if __name__=='__main__':
    print('Parent process %s.' % os.getpid())
    p = Pool(2) # 创建对象池,并设置进程池大小,默认大小是 CPU 核数
    for i in range(5):
        p.apply_async(long_time_task, args=(i,)) # 设置每个进程要执行的函数和参数,异步执行
    print('Waiting for all subprocesses done...')
    p.close() # 关闭进程池,不允许继续添加新的 Process
    p.join() # 等待全部子进程执行完毕
    print('All subprocesses done.')
Parent process 274.
Run task 1 (431)...
Run task 0 (430)...
Waiting for all subprocesses done...
Task 1 runs 3.00 seconds.
Run task 2 (431)...
Task 0 runs 3.00 seconds.
Run task 3 (430)...
Task 2 runs 3.00 seconds.
Task 3 runs 3.00 seconds.
Run task 4 (431)...
Task 4 runs 3.00 seconds.
All subprocesses done.

1.3 进程间通信

multiprocessing 模块封装了底层的通信机制,提供了 Queue、Pipes 等多种方式来交换数据。以 Queue 为例,在父进程中创建两个子进程,一个往 Queue 里写数据,一个从 Queue 里读数据。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
import os, time, random
from multiprocessing import Process, Queue

def write(q): # 写数据进程执行的代码
    print("Process to write: %s" % os.getpid())
    for value in ['A', 'B', 'C']:
        print("Put %s to queue..." % value)
        q.put(value)
        time.sleep(random.random())

def read(q): # 读数据进程执行的代码
    print("Process to read: %s" % os.getpid())
    while True:
        value = q.get(True)
        print("Get %s from queue." % value)

if __name__ == '__main__':
    q = Queue() # 父进程创建Queue,并传给各个子进程
    pw = Process(target=write, args=(q,))
    pr = Process(target=read, args=(q,))
    pw.start() # 启动子进程pw,写入
    pr.start() # 启动子进程pr,读取
    pw.join()  # 等待pw结束
    pr.terminate() # pr进程里的死循环,无法等待结束,只能强制终止
Process to write: 211
Put A to queue...
Process to read: 212
Get A from queue.
Put B to queue...
Get B from queue.
Put C to queue...
Get C from queue.

2. 线程

线程是一种轻量进程,是 CPU 调度和分派的基本单元。线程并不产生新的地址空间和资源描述符表,而是复用父进程的。线程只拥有程序计数器、一组寄存器和栈,同一进程的线程共享其他全部资源。

线程由 OS 调度,相较于进程,线程调度的成本非常小。线程间通信主要通过共享内存,上下文切换很快,资源开销较少,但相比进程不够稳定容易丢失数据。

2.1 解释器

在谈 Python 的线程之前,先了解下 Python 的几个解释器版本:

  • CPython ,Python 的官方版本,使用 C 语言实现,使用最为广泛,大部分人使用的都是这个版本。
  • Jython,Python 的 Java 实现,相比于 CPython,与 Java 语言之间的互操作性要远远高于 CPython 和 C 语言之间的互操作性。
  • Python for .NET,CPython 实现的 .NET 托管版本,与 .NET 库和程序代码有很好的互操作性。
  • IronPython,不同于 Python for .NET,它是 Python 的 C# 实现,并且它将 Python 代码编译成 C# 中间代码(与 Jython 类似),与.NET语言的互操作性也非常好。
  • PyPy,Python 的 Python 实现版本。PyPy 运行在 CPython(或者其它实现)之上,用户程序运行在 PyPy 之上。目标是成为 Python 语言自身的试验场,可以很容易地修改 PyPy 解释器的实现(因为是使用Python写的)。
  • Stackless,Stackless Python 是 CPython 的一个增强版本,它使程序员从基于线程的编程方式中获得好处,并避免传统线程所带来的性能与复杂度问题。

2.2 全局锁 GIL

GIL 是 CPython 中特有的全局解释器锁(其它 Python 版本解释器,有自己的线程调度机制,没有GIL机制)。本质上,GIL 就是 Python 进程中的一把超大锁,在解释器进程中是全局有效。GIL 主要锁定的是 CPU 执行资源,实现线程独占。

在 CPython 解释器中,当一个线程需要使用 CPU 资源时,首先得获取 GIL,直到遇到 I/O 操作时,才会释放 GIL。

如果是 I/O 密集型线程,多线程能比单线程显著提高性能;如果是 CPU 密集型线程,多线程并不能提高性能,因为等待 GIL,多线程也只能依次按顺序执行。

在单核 CPU 中,同一时刻仅有一个线程占用 CPU,GIL 不会对 CPU 的使用率产生影响。但是在多核 CPU 中,由于 GIL 的存在,同一时刻,不同核的线程会竞争 GIL。获取到 GIL 的线程能够占用 CPU,而其他线程将处于闲置状态,即使这些线程有空闲的 CPU 资源。

在 Python 3 中 GIL 也没有去掉,因为有大量的第三方库依赖 GIL。去掉 GIL 之后,需要引入复杂的锁机制保护众多全局状态。

2.3 多线程示例

Python 的标准库提供了两个模块:thread 和 threading,thread 是低级模块,threading 是高级模块,对 thread 进行了封装。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
import time, os, threading

start = time.time()
def doubler(number):
    print(threading.currentThread().getName())
    print('Parent process %s.' % os.getpid())
    print(number * 2)
    time.sleep(2)# 或者 IO 请求
    print('thread run %0.2f s end'% (time.time() - start))

if __name__ == '__main__':
    
    for i in range(3):
        my_thread = threading.Thread(target=doubler, args=(i,))
        my_thread.start()
        #my_thread.join()
Thread-98
Parent process 426.
0
Thread-99
Parent process 426.
2
Thread-100
Parent process 426.
4
thread run 2.00 s end
thread run 2.01 s end
thread run 2.01 s end    

由于线程中执行了 sleep ,释放了 CPU 资源,其他线程得以执行。如果新增注释部分的代码 my_thread.join(), 那么线程将串行执行:

Thread-101
Parent process 426.
0
thread run 2.01 s end
Thread-102
Parent process 426.
2
thread run 4.01 s end
Thread-103
Parent process 426.
4
thread run 6.02 s end

2.4 multiprocessing.dummy

multiprocessing.dummy 模块与 multiprocessing 模块的区别: dummy 模块是多线程,而 multiprocessing 是多进程, 调用方式相同。

1
2
from multiprocessing import Pool
from multiprocessing.dummy import Pool

与 multiprocessing 类似,dummy 模块提供了多线程池,可以很方便将代码在多线程和多进程之间切换。dummy 模块在大量的开源项目中有所应用,十分推荐使用。

3. 协程

协程是一种轻量级的线程。协程拥有独立的寄存器上下文和栈,同一个线程,共享堆。协程不由 OS 调度,OS 对于协程的一无所知,完全由程序员编码进行控制。

具体点就是,执行函数 A 时,可以随时中断,去执行函数 B,接着中断 B ,继续执行函数A。而这些切换完全由程序吱声控制。协程调度实际上是在同一线程中,进行程序函数的切换,没有切换线程带来的开销。

协程比较适合处理 IO 密集型的任务。

3.1 Gevent

Gevent 是第三方库,通过 Greenlet 实现协程,其基本实现原理是:

当一个 Greenlet 遇到 IO 操作时,比如访问网络,就自动切换到其他的 Greenlet,等到 IO 操作完成,再在适当的时候切换回来继续执行。由于 IO 操作非常耗时,经常使程序处于等待状态,有了 Gevent 为我们自动切换协程,就保证总有Greenlet 在运行,而不是等待 IO。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
import gevent
import time, os, threading
from gevent import monkey;
monkey.patch_all() # 将默认阻塞的模块替换成非阻塞

start = time.time()
def doubler(number):
    print('Parent process %s.' % os.getpid())
    print(number * 2)
    time.sleep(2)
    print('run %0.2f s end'% (time.time() - start))

if __name__ == '__main__':
    tasks=[gevent.spawn(doubler,i) for i in range(3)] # gevent.spawn 启动协程,参数为函数名称和参数名称
    gevent.joinall(tasks) # gevent.joinall 等待执行完毕
Parent process 871.
0
Parent process 871.
2
Parent process 871.
4
run 2.00 s end
run 2.00 s end
run 2.00 s end

从结果来看,Python 中多线程和多协程的效果类似,在当前执行阻塞时,切换执行流程。不同的是,多线程切换的是线程,而协程切换的是正在执行的函数上下文。

使用 Gevent,可以获得极高的并发性能,但 Gevent 只能在 Unix/Linux下运行,在 Windows 下不保证正常安装和运行。

3.2 Django

在 Django 中也会使用 Gevent 来增强并发能力,特别是对于 IO 密集型的请求较多时:

1
2
3
4
# 使用 uwsgi 部署
uwsgi --gevent 100 --gevent-monkey-patch --http :8000 -M  --processes 4 --wsgi-file wsgi.py
# 使用 gunicorn 部署
gunicorn --worker-class=gevent wsgi:application -b 0.0.0.0:8000

3.3 Celery

Celery 支持几种并发模式,有 prefork,threading,协程(gevent,eventlet)。在 Celery 中使用并发模式,能显著提高处理效率,特别是 IO 操作较多时。

1
celery worker -A celery_worker.celery -P gevent -c 10 -l INFO

-P 选项指定 pool,默认是 prefork,这里指定为 gevent, -c 设置并发数。

4. 最佳实践

  • IO 密集型的任务(例如,网络调用等)中使用线程和协程
  • CPU 密集的任务,需要使用多个进程,绕开 GIL 限制,充分利用多核 CPU ,提高效率
  • 为了充分利用 CPU ,可以结合多进程+协程进行部署,多个进程,每个进程中多个协程。

5. 参考


微信公众号
作者
微信公众号