导读
真正的并行执行多任务只能在多核CPU上实现,但是,由于任务数量远远多于CPU的核心数量,所以,操作系统也会自动把很多任务轮流调度到每个核心上执行。
对于操作系统来说,一个任务就是一个进程(Process),比如打开一个浏览器就是启动一个浏览器进程,打开一个记事本就启动了一个记事本进程,打开两个记事本就启动了两个记事本进程,打开一个Word就启动了一个Word进程。
有些进程还不止同时干一件事,比如Word,它可以同时进行打字、拼写检查、打印等事情。在一个进程内部,要同时干多件事,就需要同时运行多个“子任务”,我们把进程内的这些“子任务”称为线程(Thread)。当然,真正地同时执行多线程需要多核CPU才可能实现。
我们前面编写的所有的Python程序,都是执行单任务的进程,也就是只有一个线程。
线程是最小的执行单元,而进程由至少一个线程组成。如何调度进程和线程,完全由操作系统决定,程序自己不能决定什么时候执行,执行多长时间。
多进程和多线程的程序涉及到同步、数据共享的问题,编写起来更复杂。
多线程
添加线程
# 导入模块 |
join功能
使用join对控制多个线程的执行顺序非常关键。join功能是等待调用线程完成再继续下面的操作。推荐如下这种1221的V型排布。
示例代码def T1_job():
print("T1 start\n")
for i in range(10):
time.sleep(0.1)
print("T1 finish\n")
def T2_job():
print("T2 start\n")
print("T2 finish\n")
thread_1 = threading.Thread(target=T1_job, name='T1')
thread_2 = threading.Thread(target=T2_job, name='T2')
----------------------------------------------------------------------
thread_1.start() # start T1
thread_2.start() # start T2
thread_2.join() # join for T2
thread_1.join() # join for T1
print("all done\n")
"""
T1 start
T2 start
T2 finish
T1 finish
all done
"""
setDaemon功能
setDaemon()方法。主线程A中,创建了子线程B,并且在主线程A中调用了B.setDaemon(),这个的意思是,把主线程A设置为守护线程,这时候,要是主线程A执行结束了,就不管子线程B是否完成,一并和主线程A退出.这就是setDaemon方法的含义,这基本和join是相反的。
示例代码import threading
import time
class MyThread(threading.Thread):
def __init__(self,id):
threading.Thread.__init__(self)
def run(self):
time.sleep(5)
print "This is " + self.getName()
if __name__ == "__main__":
t1=MyThread(999)
t1.setDaemon(True)
t1.start()
print "I am the father thread."
"""
I am the father thread.
"""
# 可以看出,子线程t1中的内容并未打出。
线程锁Lock
lock在不同线程使用同一共享内存时,能够确保线程之间互不影响,使用lock的方法是, 在每个线程执行运算修改共享内存之前,执行lock.acquire()将共享内存上锁, 确保当前线程执行时,内存不会被其他线程访问,执行运算完毕后,使用lock.release()将锁打开, 保证其他的线程可以使用该共享内存。
示例代码import threading
def job1():
global A,lock
lock.acquire()
for i in range(10):
A+=1
print('job1',A)
lock.release()
def job2():
global A,lock
lock.acquire()
for i in range(10):
A+=10
print('job2',A)
lock.release()
if __name__== '__main__':
lock=threading.Lock()
A=0
t1=threading.Thread(target=job1)
t2=threading.Thread(target=job2)
t1.start()
t2.start()
t1.join()
t2.join()
"""
job1 1
job1 2
job1 3
job1 4
job1 5
job1 6
job1 7
job1 8
job1 9
job1 10
job2 20
job2 30
job2 40
job2 50
job2 60
job2 70
job2 80
job2 90
job2 100
job2 110
"""
其他常用操作
import threading |
GIL
Python 的设计上, 有一个必要的环节, 就是 Global Interpreter Lock (GIL). 这个东西让 Python 还是一次性只能处理一个东西.
GIL最大的问题就是Python的多线程程序并不能利用多核CPU的优势 (比如一个使用了多个线程的计算密集型程序只会在一个单CPU上面运行)。
所以Python的多线程就是假的。。。。
多进程
Python提供多进程的原因很简单, 就是用来弥补 threading 的一些劣势, 比如在 GIL.
添加进程
示例代码# 多进程
import multiprocessing as mp
def job(a,d):
print('aaaaa')
print(mp.current_process())
if __name__=='__main__':
p1 = mp.Process(target=job,args=(1,2))
print(mp.current_process())
p1.start()
p1.join()
进程池
进程池就是我们将所要运行的东西,放到池子里,Python会自行解决多进程的问题
示例代码import multiprocessing as mp
def job(x):
return x*x
# 定义一个pool
pool = mp.Pool()
# 有了池子之后,就可以让池子对应某一个函数,我们向池子里丢数据,池子就会返回函数返回的值。Pool和之前的Process的不同点是丢向Pool的函数有返回值,而Process的没有返回值。
# 接下来用map()获取结果,在map()中需要放入函数和需要迭代运算的值,然后它会自动分配给CPU核
res = pool.map(job, range(10))
print(res)
自定义核数量
Pool默认大小是CPU的核数,我们也可以通过在Pool中传入processes参数即可自定义需要的核数量。
示例代码def multicore():
pool = mp.Pool(processes=3) # 定义CPU核数量为3
res = pool.map(job, range(10))
print(res)
apply_async
示例代码import multiprocessing as mp
def job(x):
return x*x
pool = mp.Pool()
multi_res = [pool.apply_async(job, (i,)) for i in range(10)]
print([res.get() for res in multi_res])
# 和map差不多
共享内存
我们可以通过使用Value数据存储在一个共享的内存表中。import multiprocessing as mp
value1 = mp.Value('i', 0)
value2 = mp.Value('d', 3.14)
其中d和i参数用来设置数据类型的,d表示一个双精浮点类型,i表示一个带符号的整型。更多的形式请查看此表.| Type code | C Type | Python Type | Minimum size in bytes |
| --------- | ------------------ | ----------------- | --------------------- |
| `'b'` | signed char | int | 1 |
| `'B'` | unsigned char | int | 1 |
| `'u'` | Py_UNICODE | Unicode character | 2 |
| `'h'` | signed short | int | 2 |
| `'H'` | unsigned short | int | 2 |
| `'i'` | signed int | int | 2 |
| `'I'` | unsigned int | int | 2 |
| `'l'` | signed long | int | 4 |
| `'L'` | unsigned long | int | 4 |
| `'q'` | signed long long | int | 8 |
| `'Q'` | unsigned long long | int | 8 |
| `'f'` | float | float | 4 |
| `'d'` | double | float | 8 |
在Python的mutiprocessing中,有还有一个Array类,可以和共享内存交互,来实现在进程之间共享数据。array = mp.Array('i', [1, 2, 3, 4])
这里的Array和numpy中的不同,它只能是一维的,不能是多维的。同样和Value 一样,需要定义数据形式,否则会报错。
进程锁
为了解决不同进程抢共享资源的问题,我们可以用加进程锁来解决。
示例代码import multiprocessing as mp
import time
def job(v, num, l):
l.acquire() # 锁住
for _ in range(5):
time.sleep(0.1)
v.value += num # 获取共享内存
print(v.value)
l.release() # 释放
def multicore():
l = mp.Lock() # 定义一个进程锁
v = mp.Value('i', 0) # 定义共享内存
p1 = mp.Process(target=job, args=(v,1,l)) # 需要将lock传入
p2 = mp.Process(target=job, args=(v,3,l))
p1.start()
p2.start()
p1.join()
p2.join()
if __name__ == '__main__':
multicore()
"""
1
2
3
4
5
8
11
14
17
20
"""
# 显然,进程锁保证了进程p1的完整运行,然后才进行了进程p2的运行