进程与线程

多任务的需求促使进程线程概念的诞生

而对于多核CPU, 多任务的最简单实现便是将每一个任务分别分配在不同的核心, 此时多个任务真正的同时运行, 这种模式称为并行; 而任务数量远远多于CPU的核心数量, 此时操作系统会让一个核心在多个任务中**“左右横跳”, 实现宏观层面的“同时”, 这种模式称为并发**

对于操作系统而言, 一个任务就是一个进程(Process), 两个进程之间不共享资源, 而在一个进程内部, 共享资源但行为不同的“子任务”便是线程

由此, 可以知道多任务的实现有3种模式:

  • 多进程模式
  • 多线程模式
  • 多进程 + 多线程模式

多进程

多进程的创建一般有以下三种方法

fork()

在Unix/Linux系统下(包括Mac系统)提供了一个fork()系统调用; 这个函数将会被调用一次, 但返回两次, 因为操作系统自动把当前进程(父进程)复制了一份(子进程), 然后分别在父进程和子进程中返回; 子进程永远返回0, 而父进程返回子进程的ID, 此时子进程可以调用getppid()获取父进程的ID(getpid()是获取当前进程的ID)

Python的os模块封装了常见的系统调用, 其中就包括fork; 使用fork调用, 一个进程在接到新任务时就可以复制出一个子进程来处理新任务, 常见的Apache服务器就是由父进程监听端口, 每当有新的http请求时, 就fork出子进程来处理新的http请求

: Windows系统下没有fork调用

multiprocessing

Python也提供了一个跨平台的多进程支持, 就是multiprocessing模块; 该模块提供了一个Process类来代表一个进程对象;
创建子进程时, 只需要传入一个执行函数和函数的参数, 创建一个Process实例, 用start()方法启动, 这样创建进程比fork()还要简单join()方法可以等待子进程结束后再继续往下运行, 通常用于进程间的同步

Pool

如果要启动大量的子进程, 可以用进程池的方式批量创建子进程

:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
from multiprocessing import Pool
import os, time, random

def long_time_task(name):
print("Run task {0} ({1})".format(name, os.getpid()))
start = time.time()
time.sleep(random.random() * 3)
end = time.time()
print("Task {0} runs {1:0.2f} seconds".format(name, end - start))

if __name__ == '__main__':
print("Parent process {0}".format(os.getpid()))
p = Pool(4)
for i in range(5):
p.apply_async(long_time_task, args=(i,))
print("Waiting for all subprocesses done...")
p.close()
p.join()
print('All subprocesses done.')

代码解读: 对Pool对象调用join()方法会等待所有子进程执行完毕, 调用join()之前必须先调用close(), 调用close()之后就不能继续添加新的Process了; 开始立刻执行的进程数为创建Pool对象时传入的数值, 这是Pool有意设计的限制, 而非操作系统的限制, 而Pool的默认大小是CPU的最大线程数

子进程

很多时候, 子进程并不是自身, 而是一个外部进程, 在创建之后, 还需要控制子进程的输入和输出, 而subprocess模块可以方便的启动一个子进程, 并控制其输入和输出

  1. run()

    语法: subprocess.run(args, *, stdin=None, input=None, stdout=None, stderr=None, capture_output=False, shell=False, cwd=None, timeout=None, check=False, encoding=None, errors=None, text=None, env=None, universal_newlines=None)

    参数:

    • args: 执行的命令, 字符串/字符串列表
    • stdin/stdout/stderr: 子进程的标准输入, 输出和错误; 可取:
      • subprocess.PIPE 表示为子进程创建新的管道
      • subprocess.DEVNULL 表示使用os.devnull
      • 一个已经存在的文件描述符 / 已经打开的文件对象
      • None
    • timeout: 设置命令超时时间, 如果超时, 则子进程将被杀死, 并弹出TimeoutExpired异常
    • shell: 若为True, 则通过操作系统的shell执行指定的指令
    • check: 若为True, 则进程退出状态码不是0, 弹出CalledProcessError异常

    用法: run()方法调用方式返回CompletedProcess实例, 和直接Popen()差不多, 实现是一样的, 本质上也是调用Popen

  2. Popen类

    1
    class subprocess.Popen(args, bufsize=-1, executable=None, stdin=None, stdout=None, stderr=None, preexec_fn=None, close_fds=True, shell=False, cwd=None, env=None, universal_newlines=False, startupinfo=None, creationflags=0, restore_signals=True, start_new_session=False, pass_fds=(), *, encoding=None, errors=None)

    参数:

    • args: shell 指令, 字符串/字符列表
    • bufsize: 缓冲区大小, 默认为-1
      • 0: 不使用缓冲区
      • 1: 表示行缓冲, 仅当universal_newlines=True时可用, 也就是文本模式
      • 正数: 表示缓冲区大小; 负数: 表示使用系统默认的缓冲区大小
    • stdin/stdout/stderr: 分别表示程序的标准输入/输出/错误句柄
    • preexec_fn: 只在Unix平台下有效, 用于指定一个可执行对象, 他将在子进程运行之前被调用
    • shell: 若为True, 则通过操作系统的shell执行指定的指令
    • cwd: 由于设置子进程的当前目录
    • env: 用于指定子进程的环境变量, 如果值为None, 则意味着从父进程中继承

    方法:

    • poll(): 检查进程是否终止, 如果终止返回 returncode, 否则返回 None
    • wait(timeout): 等待子进程终止
    • communicate(input, timeout): 和子进程交互, 发送和读取数据
    • send_signal(singnal): 发送信号到子进程
    • terminate(): 停止子进程(也就是发送SIGTERM信号到子进程)
    • kill(): 杀死子进程(发送SIGKILL信号到子进程)

还可以通过call(args[, timeout])函数来创建子进程, 本质上也是通过Popen()来实现

进程间通信

操作系统提供了许多机制来实现进程间的通信, Python的multiprocessing模块包装了底层的机制, 提供了Queue, Pipes等多种方式来交换数据

Queue为例, 在父进程中创建两个子进程, 一个往Queue里写数据, 一个从Queue里读数据:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
from multiprocessing import Process, Queue
import os, time, random

# 写数据进程执行的代码:
def write(q):
print('Process to write: %s' % os.getpid())
for value in ['A', 'B', 'C']:
print('Put %s to queue...' % value)
q.put(value)
time.sleep(random.random())

# 读数据进程执行的代码:
def read(q):
print('Process to read: %s' % os.getpid())
while True:
value = q.get(True)
print('Get %s from queue.' % value)

if __name__=='__main__':
# 父进程创建Queue,并传给各个子进程:
q = Queue()
pw = Process(target=write, args=(q,))
pr = Process(target=read, args=(q,))
# 启动子进程pw,写入:
pw.start()
# 启动子进程pr,读取:
pr.start()
# 等待pw结束:
pw.join()
# pr进程里是死循环,无法等待其结束,只能强行终止:
pr.terminate()

在Unix/Linux下,multiprocessing模块封装了fork()调用,使我们不需要关注fork()的细节。由于Windows没有fork调用,因此,multiprocessing需要“模拟”出fork的效果,父进程所有Python对象都必须通过pickle序列化再传到子进程去,所以,如果multiprocessing在Windows下调用失败了,要先考虑是不是pickle失败了。

多线程

线程为操作系统直接支持的执行单元,同时Python的线程是真正的Posix Thread, 而不是模拟出来的线程

Python提供了两个模块来操作线程: _thread, threading; 其中_thread是低级模块, 而threading是高级模块, 对_thread进行了封装, 大部分情况下, 我们只需使用threading

启动线程就是将一个函数传入并创建Thread实例, 然后调用start()开始执行

由于任何进程默认就会启动一个线程,我们把该线程称为主线程,主线程又可以启动新的线程,Python的threading模块有个current_thread()函数,它永远返回当前线程的实例。主线程实例的名字叫MainThread,子线程的名字在创建时指定,我们用LoopThread命名子线程。名字仅仅在打印时用来显示,完全没有其他意义,如果不起名字Python就自动给线程命名为Thread-1Thread-2……

Lock

原因: 多线程与多进程最大的不同就在于, 多进程中, 进程之间不共享资源, 而多线程中, 线程之间是共享资源的, 都在同一个进程之下

也正是因为这种特性导致如果在不同线程中同时修改同一个变量, 则可能出现不符合预期的结果, 因为在将改变的操作变为汇编语言时可能不是一个原子操作, 则可能修改后还未存回原变量便切换到另一个线程, 重新读取了修改前的值进行操作, 则每次程序运行后的结果都不相同

解决方法: 此时, 我们可以通过很多方式来解决这个问题; 从源头上解决可以将这个操作在编译时对应成一个原子操作, C++的新规范采取了这个方法; 但上述方法需要对程序的编译器或解释器修改来实现, 这对于平常情况来说不现实, 所以我们通常使用各种各样的锁, 信号量, 条件变量(*信号量和条件变量一般用于 *等方式来实现; 是其中最常用一种方法

threading模块中, 提供了一个函数来返回一个, 然后在线程运行时, 对该线程上锁, 然后在该线程运行完之后对其解锁; 例子如下:

1
2
3
4
5
6
7
8
balance = 0
lock = threading.Lock()
def run_thread(n):
lock.acquire() # 上锁
try:
chage_it(n)
finally:
lock.release() # 解锁

: 在线程运行结束后一定要释放锁, 否则其它线程将永远等带下去, 成为死线程, 所以一般使用try...finally...来确保锁一定会被释放

优缺点: 锁的好处就是确保了某段关键代码只能由一个线程从头到尾完整地执行,坏处当然也很多,首先是阻止了多线程并发执行,包含锁的某段代码实际上只能以单线程模式执行,效率就大大地下降了。其次,由于可以存在多个锁,不同的线程持有不同的锁,并试图获取对方持有的锁时,可能会造成死锁,导致多个线程全部挂起,既不能执行,也无法结束,只能靠操作系统强制终止

扩展:

锁的种类 功能
互斥锁 (Mutex) 加锁失败后, 线程会释放CPU给其它线程
自旋锁 (Spinlock) 加锁失败后, 线程会忙等待, 直到它拿到锁
读写锁 (Readers-Writer Lock) 读锁可以同时被多个读线程拥有, 但写锁只能被一个写线程拥有
悲观锁 (Pessimistic Lock) 每次修改资源前都会上锁; 以上提到的均是悲观锁
乐观锁 (Optimistic Lock) 先修改数据, 在验证是否发生资源竞争; 如果不存在竞争则操作完成; 否则放弃本次操作

GIL

虽然Python的线程是真正的线程, 但解释器执行代码时, 有一个GIL锁(Global Interpreter Lock), 在任何Python线程执行前, 必须先获取GIL锁, 然后, 每执行100条字节码, 解释器自动释放GIL锁; 因此多线程在Python中只能交替执行, 即使在100个核的CPU上也只能用到一个核

当然, 虽然无法通过多线程来调用多个核, 但依然可以通过多进程来实现多核任务, 每个Python进程有各自独立的GIL锁, 互不影响

ThreadLocal

在多线程环境下,每个线程都有自己的数据。一个线程使用自己的局部变量比使用全局变量好,因为局部变量只有线程自己能看见,不会影响其他线程,而全局变量的修改必须加锁

但局部变量在函数调用时传递起来很麻烦, 需要每次调用都把对象作为参数传入, 因此为了解决这个问题ThreadLocal出现了

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
import threading

# 创建全局ThreadLocal对象:
local_school = threading.local()

def process_student():
# 获取当前线程关联的student:
std = local_school.student
print('Hello, %s (in %s)' % (std, threading.current_thread().name))

def process_thread(name):
# 绑定ThreadLocal的student:
local_school.student = name
process_student()

t1 = threading.Thread(target= process_thread, args=('Alice',), name='Thread-A')
t2 = threading.Thread(target= process_thread, args=('Bob',), name='Thread-B')
t1.start()
t2.start()
t1.join()
t2.join()

全局变量local_school就是一个ThreadLocal对象,每个Thread对它都可以读写student属性,但互不影响。可以把local_school看成全局变量,但每个属性如local_school.student都是线程的局部变量,可以任意读写而互不干扰,也不用管理锁的问题,ThreadLocal内部会处理

作用: ThreadLocal最常用的地方就是为每个线程绑定一个数据库连接, HTTP请求, 用户身份信息等, 这样一个现场的所有调用到的处理函数都可以非常方便的访问这些资源

进程 VS 线程

Master-Worker模式

在多任务环境下, 通常会设计Master-Worker模式, 由Master负责分配任务, Worker负责执行任务, 因此通常是一个Master, 多个Worker

如果是多进程, 则主进程是Master, 其它进程就是Worker; 如果是多线程, 则主线程就是Master, 其它线程就是Worker

优缺点

多进程最大的优点就是稳定性高, 即使子进程崩溃了, 也不会影响主进程和其它子进程(但如果主进程崩溃, 则程序崩溃, 但一般主进程只负责分配任务, 崩溃的概率极低 )Apache最早就采用多进程模式
缺点便是创建进程的代价大, 在Unix/Linux系统下, 用fork()调用还行, 但在Windows系统下创建进程的开销巨大, 同时, 操作系统可以同时运行的进程数也是有限的

多线程优点便是通常比多进程快一点(但也快不到哪去 ); 而其缺点便是任何一个线程挂掉都有可能直接造成整个进程的崩溃

Windows下, 多线程的效率比多进程要高, 所以微软的IIS服务器默认采用了多线程模式, 又由于多线程存在稳定性的问题, IIS的稳定性就不如Apache; 为了缓解这个问题, IISApache现在又有了多进程+多线程的混合模式

进程切换

无论是多进程还是多线程, 只要数量一多, 那么效率肯定上不去, 因为在进程切换的过程中, 便需要耗费大量资源, 导致真正的任务无法完成

因此, 是否采用多任务处理的第一个考虑便是任务数量的多少, 如果任务数量过多, 则不建议使用多任务模型

计算密集型 vs IO密集型

是否采用多任务的第二个考虑便是任务的类型, 我们可以把任务简单地分为:

  • 计算密集型: 进行大量的计算, 消耗CPU的资源; 任务越多, 花在任务切换的时间就越多, CPU执行任务的效率就越低, 所以任务密集型任务同时进行的数量应当等于CPU的核心数(这类任务主要消耗CPU的资源, 因此代码运行效率至关重要, Python这类脚本语言运行效率很低, 不适合计算密集型任务, C才是最好的选择)
  • IO密集型: 涉及到网络, 磁盘IO的任务都是IO密集型任务; 这些任务对CPU的消耗很少, 大部分时间都在等待IO操作完成; 所以任务越多, CPU效率越高, 但也有一个限度(这类任务对于代码运行效率并无要求, 反而对开发效率有着要求, 所以Python这类的脚本语言是首选, 而C语言最差)

异步IO

考虑到CPU和IO之间巨大的速度差异,一个任务在执行的过程中大部分时间都在等待IO操作,单进程单线程模型会导致别的任务无法并行执行,因此,我们才需要多进程模型或者多线程模型来支持多任务并发执行

而现代操作系统对IO操作进行了改进, 最大的特点便是支持异步IO, 通过这种支持, 便可以用单进程单线程模型来执行多任务, 这种全新的模型称为事件驱动模型

: Nginx就是支持异步IO的Web服务器, 它在单核CPU上采用单进程模型就可以高效的支持多任务, 在多核CPU上, 可以运行多个进程(数量和CPU核心数相同), 充分运用多核CPU

在Python中, 单线程的异步编程模型称为协程, 在协程的支持下, 就可以基于事件驱动编写高效的多任务程序

分布式进程

在Thread和Process, 应当优先选择Process, 因为Process更稳定, 同时Process可以分布到多台机器上, 而Thread最多只能分布到同一台机器的多个CPU上

Python的multiprocessing模块不但支持多进程, 其中的managers子模块还支持把多进程分布到多台机器上; 一个服务进程可以作为调度者, 将任务分布到其它多个进程中, 依靠网络通信由于managers模块封装的很好, 我们不必了解网络通信的细节, 就可以很容易的编写分布式多进程程序

:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
# task_master.py

import random, time, queue
from multiprocessing.managers import BaseManager

# 发送任务的队列:
task_queue = queue.Queue()
# 接收结果的队列:
result_queue = queue.Queue()

# 返回发送任务队列和接受结果队列的函数, 因为`pickle`无法序列化`lambda`函数, 则`QueueManager.register()`必须传入一个函数
def get_t_queue():
return task_queue
def get_r_queue():
return result_queue

# 从BaseManager继承的QueueManager:
class QueueManager(BaseManager):
pass

# 把两个Queue都注册到网络上, callable参数关联了Queue对象:
QueueManager.register('get_task_queue', callable=get_t_queue)
QueueManager.register('get_result_queue', callable=get_r_queue)
# 绑定端口5000, 设置验证码'abc':
manager = QueueManager(address=('127.0.0.1', 5000), authkey=b'abc')
# 启动Queue:
manager.start()
# 获得通过网络访问的Queue对象:
task = manager.get_task_queue()
result = manager.get_result_queue()
# 放几个任务进去:
for i in range(10):
n = random.randint(0, 10000)
print('Put task %d...' % n)
task.put(n)
# 从result队列读取结果:
print('Try get results...')
for i in range(10):
r = result.get(timeout=10)
print('Result: %s' % r)
# 关闭:
manager.shutdown()
print('master exit.')

通过两个在网络上注册(register)过的Queue通道进行信息的传递
: 在一台机器上编写多进程程序时, 创建的Queue可以直接拿来用, 但在分布式多进程环境下, 添加任务到Queue不可以直接对原始的task_queue进行操作, 这样就绕过了QueueManager的封装, 必须通过manager.get_task_queue()获得的Queue接口添加

然后在另一台机器上启动任务进程:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
# task_worker.py

import time, sys, queue
from multiprocessing.managers import BaseManager

# 创建类似的QueueManager:
class QueueManager(BaseManager):
pass

# 由于这个QueueManager只从网络上获取Queue,所以注册时只提供名字:
QueueManager.register('get_task_queue')
QueueManager.register('get_result_queue')

# 连接到服务器,也就是运行task_master.py的机器:
server_addr = '127.0.0.1'
print('Connect to server %s...' % server_addr)
# 端口和验证码注意保持与task_master.py设置的完全一致:
m = QueueManager(address=(server_addr, 5000), authkey=b'abc')
# 从网络连接:
m.connect()
# 获取Queue的对象:
task = m.get_task_queue()
result = m.get_result_queue()
# 从task队列取任务,并把结果写入result队列:
for i in range(10):
try:
n = task.get(timeout=1)
print('run task %d * %d...' % (n, n))
r = '%d * %d = %d' % (n, n, n*n)
time.sleep(1)
result.put(r)
except Queue.Empty:
print('task queue is empty.')
# 处理结束:
print('worker exit.')

任务进程要通过网络连接到服务进程, 所以要指定服务进程的IP

如果将worker的计算改成发送邮件, 就实现了邮件队列的异步发送

问题1: Queue对象存储在哪儿?
: Queue对象存储在task_master.py进程中, 而之所以可以通过网络访问, 则是通过QueueManager实现的

; Queue的作用是用来传递任务和接受结果的, 每个任务的描述数据量要尽量小, 比如发送一个处理日志文件的任务, 就不要发送日志文件本身, 而是发送日志文件存放的完整路径, 由Worker进程再去共享的磁盘读取文件