对操作系统来说,一个任务就是一个进程(Process),比如打开一个浏览器就是启动一个浏览器进程,打开两个记事本就启动了两个记事本进程。
有些进程不止同时干一件事,比如Word,它可以同时进行打字、拼写检查、打印等事情。在一个进程内部,要同时干多件事,就需要同时运行多个“子任务”,我们把进程内的这些“子任务”称为线程(Thread)。
进程之间是内存隔离的,即不同的进程拥有各自的内存空间。这就类似于不同的公司拥有不同的办公场所。
线程之间是内存共享的,线程是属于进程的,一个进程内的多个线程之间是共享这个进程所拥有的内存空间的。这就好比,公司员工之间共享公司的办公场所。
进程是操作系统分配资源的最小单位,而线程是程序执行(CPU调度)的最小单位。
multiprocessing模块是跨平台版本的多进程模块,提供了一个Process类来代表一个进程对象。 import os from multiprocessing import Process
def run_proc(name): print("子进程 %s (%s)" % (os.getpid(), name)) print("父进程 %s" % os.getppid()) # getppid 获取当前父进程编号
if name == "main":
print("父进程 %s" % os.getpid()) p = Process(target=run_proc, args=("test",)) p2 = Process(target=run_proc, kwargs={"name": "test2"}) print("Child process start") p.start() p2.start() p.join() p2.join() print("Child process end")
创建子进程时,只需要传入一个执行函数和参数,创建一个Process实例,用start()方法(仅仅只是给操作系统发送了一个信号)启动。
join()方法:主进程等待子进程结束后再继续往下运行,通常用于进程间的同步。
进程间是不共享全局变量的。实际上创建一个子进程就是把主进程的资源进行拷贝产生了一个新进程,这里主进程和子进程是互相独立的。
实际上,主进程会等待所有子进程执行结束再结束。通过设置守护主进程,来保证主进程执行完毕,子进程随之销毁。 import time from multiprocessing import Process
def work(): for i in range(10): print("工作中...") time.sleep(0.2)
if name == "main": work_process = Process(target=work) work_process.daemon = True # 方式一 设置守护主进程 work_process.start() time.sleep(1)
print("主进程执行结束")
守护进程必须是子进程,且不允许再创建新的子进程。主进程结束,守护进程也会随之结束。守护进程必须在start之前,start之后不能再设置daemon。
from multiprocessing import Process, Lock import time
def task(name, mutex): mutex.acquire() # 上锁 print(f"{name} 1") time.sleep(1) print(f"{name} 2") mutex.release() # 解锁
if name == "main": mutex = Lock() for i in range(3): p = Process(target=task, args=(f"进程{i}", mutex)) p.start()
Lock不能多次上锁,推荐使用RLock可以多次上锁。
如果要启动大量的子进程,可以用进程池的方式批量创建子进程: import os import time import random from multiprocessing import Pool
def long_time_task(name): print("Run task %s (%s)..." % (name, os.getpid())) start = time.time() time.sleep(random.random() * 3) end = time.time() print("Task %s runs %0.2f seconds." % (name, (end - start)))
if name == "main": print("Parent process %s." % os.getpid()) p = Pool(4) for i in range(5): p.apply_async(long_time_task, args=(i,)) print("Waiting for all subprocesses done...") p.close() p.join() print("All subprocesses done.")
对Pool对象调用join()方法会等待所有子进程执行完毕,调用join()之前必须先调用close(),调用close()之后就不能继续添加新的Process了。
注意输出的结果,task 0,1,2,3是立刻执行的,而task 4要等待前面某个task完成后才执行,这是因为Pool的默认大小是CPU核数,这里设置为4最多同时执行4个进程。
如果改成p = Pool(5)就可以同时跑5个进程。
很多时候,子进程并不是自身,而是一个外部进程。创建了子进程后,还需要控制子进程的输入和输出。
subprocess模块可以非常方便地启动一个子进程,然后控制其输入和输出。
下面的例子演示了如何在Python代码中运行命令nslookup www.python.org,这和命令行直接运行的效果是一样的:
import subprocess print("$ nslookup www.python.org") r = subprocess.call(["nslookup", "www.python.org"]) print("Exit code:", r)
如果子进程还需要输入,则可以通过communicate()方法输入:
import subprocess print("$ nslookup") p = subprocess.Popen(["nslookup"], stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE) output, err = p.communicate(b"set q=mxnpython.orgnexitn") print(output.decode("gbk")) # Windows默认编码是gbk print("Exit code:", p.returncode)
上面的代码相当于在命令行执行命令nslookup,然后手动输入:
set q=mx python.org exit
输入set q=mx检查某个DNS域名的MX记录,然后输入想要检查的DNS域名,输入之后,将显示DNS服务器的MX记录及对应的IP地址。
multiprocessing模块包装了底层机制,提供了Queue、Pipes等多种方式来交换数据。
以Queue为例,在父进程中创建两个子进程,一个往Queue里写数据,一个从Queue里读数据:
import os import time import random from multiprocessing import Process, Queue # 写数据进程执行的代码 def write(q): print("Process to write: %s" % os.getpid()) for value in ["A", "B", "C"]: print("Put %s to queue..." % value) q.put(value) time.sleep(random.random()) # 读数据进程执行的代码 def read(q): print("Process to read: %s" % os.getpid()) while True: value = q.get(True) print("Get %s from queue." % value) if name == "main": # 父进程创建Queue,并传给各个子进程: q = Queue() pw = Process(target=write, args=(q,)) pr = Process(target=read, args=(q,)) # 启动子进程pw,写入 pw.start() # 启动子进程pr,读取 pr.start() # 等待pw结束 pw.join() # pr进程里是死循环,无法等待其结束,只能强行终止。使用terminate终止进程,不会引起finally执行! pr.terminate()
Python的线程是真正的Posix Thread,而不是模拟出来的线程。利用Python的标准库threading,可以轻松实现多线程任务。
启动一个线程就是把一个函数传入并创建Thread实例,然后调用start()开始执行:
import time import threading def sing(msg): n = 0 while n < 5: n += 1 print(f"{threading.current_thread().name} is running...{msg}") time.sleep(1) def dance(msg): n = 0 while n < 5: n += 1 print(f"{threading.current_thread().name} is running...{msg}") time.sleep(1) if name == "main": # 以元组方式传参 sing_thread = threading.Thread(target=sing, args=("我在唱歌,啦啦啦",)) # 以字典方式传参 dance_thread = threading.Thread(target=dance, kwargs={"msg": "我在跳舞,呱呱呱"}) sing_thread.start() dance_thread.start()
任何进程默认就会启动一个线程,把该线程称为主线程,主线程又可以启动新的线程,Python的threading模块有个current_thread()函数,它永远返回当前线程的实例。
主线程实例的名字叫MainThread,子线程的名字可在创建时指定,如果不起名字Python会自动给线程命名为Thread-1 Thread-2 ...
实际上,主线程会等待所有子线程执行结束再结束。通过设置守护主线程,来保证主线程执行完毕,子线程随之销毁。 from threading import Thread import time
def task(name): time.sleep(1) print(f"你好,{name}")
if name == "main": t = Thread(target=task, args=("张三",)) t.setDaemon(True) # 守护线程,必须在 start() 之前设置
t.start() print("主线程")
多线程都在同个进程中,多线程使用的资源都是同一个进程中的资源,因此多线程间是共享全局变量的。因此,线程之间共享数据最大的危险在于多个线程同时改一个变量,把内容给改乱了。
来看看多个线程同时操作一个变量怎么把内容给改乱了:
import threading # 假定这是你的银行存款 balance = 0 def change_it(n): # 先存后取,结果应该为0 global balance balance = balance + n balance = balance - n def run_thread(n): for i in range(2000000): change_it(n) t1 = threading.Thread(target=run_thread, args=(5,)) t2 = threading.Thread(target=run_thread, args=(8,)) t1.start() t2.start() t1.join() t2.join() print(balance)
变量balance理论上结果应该为0,但是,由于线程的调度是由操作系统决定的,当t1、t2交替执行时,只要循环次数足够多,balance的结果就不一定是0了。
原因是因为高级语言的一条语句在CPU执行时是若干条语句,即使一个简单的计算balance = balance + n也分两步:
可看成是: x = balance + n balance = x因为修改balance需要多条语句,而执行这几条语句时,线程可能中断,从而导致多个线程把同一个对象的内容改乱了。
要确保balance计算正确,就要给change_it()上一把锁(互斥锁mutex),当某个线程开始执行change_it()时,该线程因为获得了锁,其他线程就不能同时执行change_it(),只能等待,直到锁被释放后,获得该锁以后才能改。由于锁只有一个,无论多少线程,同一时刻最多只有一个线程持有该锁,所以,不会造成修改的冲突。
创建一个锁通过threading.Lock()来实现:
import threading balance = 0 lock = threading.Lock() def change_it(n): # 先存后取,结果应该为0 global balance balance = balance + n balance = balance - n def run_thread(n): for i in range(2000000): # 先要获取锁 lock.acquire() try: # 放心地改吧 change_it(n) finally: # 改完了一定要释放锁,否则会产生死锁 lock.release() t1 = threading.Thread(target=run_thread, args=(5,)) t2 = threading.Thread(target=run_thread, args=(8,)) t1.start() t2.start() t1.join() t2.join() print(balance)
用try...finally模式来确保锁一定会被释放。也可以使用with lock模式:
import threading lock = threading.Lock() class Account: def init(self, balance): self.balance = balance def draw(account, amount): with lock: if account.balance >= amount: print("取钱成功") account.balance -= amount print("余额", account.balance) else: print("取钱失败,余额不足") account = Account(1000) t1 = threading.Thread(target=draw, args=(account, 600)) t2 = threading.Thread(target=draw, args=(account, 800)) t1.start() t2.start()
Python的线程虽是真正的线程,但解释器执行代码时,有一个GIL锁:Global Interpreter Lock,任何Python线程执行前,必须先获得GIL锁,然后,每执行100条字节码,解释器就自动释放GIL锁,让别的线程有机会执行。这个GIL全局锁实际上把所有线程的执行代码都给上了锁,所以,多线程在Python中只能交替执行,即使100个线程跑在100核CPU上,也只能用到1个核。
所以,在Python中,可以使用多线程,但不要指望能有效利用多核。
不过,Python虽然不能利用多线程实现多核任务,但可以通过多进程实现多核任务。多个Python进程有各自独立的GIL锁,互不影响。
在多线程环境下,每个线程都有自己的数据。一个线程使用自己的局部变量比使用全局变量好,因为局部变量只有线程自己能看见,不会影响其他线程,而全局变量的修改必须加锁。
但局部变量也有问题,就是在函数调用的时候,传递起来很麻烦。
ThreadLocal解决了参数在一个线程中各个函数之间互相传递的问题。一个ThreadLocal变量虽然是全局变量,但每个线程都只能读写自己线程的独立副本,互不干扰。
import threading # 创建全局ThreadLocal对象 local_school = threading.local() def process_student(): # 获取当前线程关联的student std = local_school.student print("Hello, %s (in %s)" % (std, threading.current_thread().name)) def process_thread(name): # 绑定ThreadLocal的student local_school.student = name process_student() t1 = threading.Thread(target=process_thread, args=("Alice",), name="Thread-A") t2 = threading.Thread(target=process_thread, args=("Bob",), name="Thread-B") t1.start() t2.start() t1.join() t2.join()
全局变量local_school是一个ThreadLocal对象,每个Thread对它都可以读写student属性,但互不影响。可以把local_school看成全局变量,但每个属性如local_school.student都是线程的局部变量,可以任意读写而互不干扰,也不用管理锁的问题,ThreadLocal内部会处理。
可以理解为全局变量local_school是一个dict,不但可以用local_school.student,还可以绑定其他变量,如local_school.teacher等。
ThreadLocal最常用的地方就是为每个线程绑定一个数据库连接,HTTP请求,用户身份信息等,这样一个线程的所有调用到的处理函数都可以非常方便地访问这些资源。
import requests import os import bs4 import threading
os.makedirs("xkcd", exist_ok=True) # store comics in ./xkcd
def downloadXkcd(startComic, endComic): for urlNumber in range(startComic, endComic):
print(f"Downloading page https://xkcd.com/{urlNumber}...") res = requests.get(f"https://xkcd.com/{urlNumber}") res.raise_for_status()
soup = bs4.BeautifulSoup(res.text, "html.parser") # Find the URL of the comic image. comicElem = soup.select("#comic img") if comicElem == []: print("Could not find comic image.") else: comicUrl = comicElem[0].get("src") # Download the image. print(f"Downloading image {comicUrl}...") res = requests.get("https:" + comicUrl) res.raise_for_status() # Save the image to ./xkcd. imageFile = open(os.path.join("xkcd", os.path.basename(comicUrl)), "wb") for chunk in res.iter_content(100000): imageFile.write(chunk) imageFile.close()
downloadThreads = [] # a list of all the Thread objects for i in range(0, 140, 10): # loops 14 times, creates 14 threads start = i end = i + 9 if start == 0: start = 1 # There is no comic 0, so set it to 1. downloadThread = threading.Thread(target=downloadXkcd, args=(start, end)) downloadThreads.append(downloadThread) downloadThread.start()
for downloadThread in downloadThreads: downloadThread.join() print("Done.")
queue.Queue可用于多线程间的、线程安全的数据通信。 import queue
q = queue.Queue()
q.put("100") q.put("200")
item = q.get() print(item) # 100 item2 = q.get() print(item2) # 200
print(q.qsize()) # 0
print(q.empty()) # True
print(q.full()) # False
from threading import Thread, Semaphore, currentThread import time import random
semaphore = Semaphore(3)
def task(): with semaphore: print(currentThread().getName()) time.sleep(random.randint(1, 3))
if name == "main": for i in range(10): t = Thread(target=task) t.start()
本文作者:a
本文链接:
版权声明:本博客所有文章除特别声明外,均采用 BY-NC-SA 许可协议。转载请注明出处!