Python进程和线程

编程 · 2023-08-17 · 195 人浏览

对操作系统来说,一个任务就是一个进程(Process),比如打开一个浏览器就是启动一个浏览器进程,打开两个记事本就启动了两个记事本进程。

有些进程不止同时干一件事,比如Word,它可以同时进行打字、拼写检查、打印等事情。在一个进程内部,要同时干多件事,就需要同时运行多个“子任务”,我们把进程内的这些“子任务”称为线程(Thread)。

进程之间是内存隔离的,即不同的进程拥有各自的内存空间。这就类似于不同的公司拥有不同的办公场所。

线程之间是内存共享的,线程是属于进程的,一个进程内的多个线程之间是共享这个进程所拥有的内存空间的。这就好比,公司员工之间共享公司的办公场所。

进程

multiprocessing模块是跨平台版本的多进程模块,提供了一个Process类来代表一个进程对象。

import os
from multiprocessing import Process

# 子进程要执行的代码
def run_proc(name):
    print("子进程 %s (%s)" % (os.getpid(), name))
    print("父进程 %s" % os.getppid())  # getppid 获取当前父进程编号

if __name__ == "__main__":
    # getpid 获取当前进程编号
    print("父进程 %s" % os.getpid())
    p = Process(target=run_proc, args=("test",))
    print("Child process will start")
    p.start()
    p.join()
    print("Child process end")

创建子进程时,只需要传入一个执行函数和参数,创建一个Process实例,用start()方法(仅仅只是给操作系统发送了一个信号)启动。

join()方法:主进程等待子进程结束后再继续往下运行,通常用于进程间的同步。

守护主进程

import time
from multiprocessing import Process

def work():
    for i in range(10):
        print("工作中...")
        time.sleep(0.2)

if __name__ == "__main__":
    work_process = Process(target=work)
    work_process.daemon = True  # 方式一 设置守护主进程
    work_process.start()
    time.sleep(1)
    work_process.terminate()  # 方式二 直接销毁子进程
    print("主进程执行结束")

互斥锁

from multiprocessing import Process, Lock
import time

def task(name, mutex):
    mutex.acquire()  # 上锁
    print(f"{name} 1")
    time.sleep(1)
    print(f"{name} 2")
    mutex.release()  # 解锁

if __name__ == "__main__":
    mutex = Lock()
    for i in range(3):
        p = Process(target=task, args=(f"进程{i}", mutex))
        p.start()

进程池Pool

如果要启动大量的子进程,可以用进程池的方式批量创建子进程:

import os
import time
import random
from multiprocessing import Pool

def long_time_task(name):
    print("Run task %s (%s)..." % (name, os.getpid()))
    start = time.time()
    time.sleep(random.random() * 3)
    end = time.time()
    print("Task %s runs %0.2f seconds." % (name, (end - start)))

if __name__ == "__main__":
    print("Parent process %s." % os.getpid())
    p = Pool(4)
    for i in range(5):
        p.apply_async(long_time_task, args=(i,))
    print("Waiting for all subprocesses done...")
    p.close()
    p.join()
    print("All subprocesses done.")

对Pool对象调用join()方法会等待所有子进程执行完毕,调用join()之前必须先调用close(),调用close()之后就不能继续添加新的Process了。

注意输出的结果,task 0,1,2,3是立刻执行的,而task 4要等待前面某个task完成后才执行,这是因为Pool的默认大小是CPU核数,这里设置为4最多同时执行4个进程。

如果改成p = Pool(5)就可以同时跑5个进程。

子进程

很多时候,子进程并不是自身,而是一个外部进程。创建了子进程后,还需要控制子进程的输入和输出。

subprocess模块可以非常方便地启动一个子进程,然后控制其输入和输出。

下面的例子演示了如何在Python代码中运行命令nslookup www.python.org,这和命令行直接运行的效果是一样的:

import subprocess

print("$ nslookup www.python.org")
r = subprocess.call(["nslookup", "www.python.org"])
print("Exit code:", r)

如果子进程还需要输入,则可以通过communicate()方法输入:

import subprocess

print("$ nslookup")
p = subprocess.Popen(["nslookup"], stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
output, err = p.communicate(b"set q=mxnpython.orgnexitn")
print(output.decode("gbk"))  # Windows默认编码是gbk
print("Exit code:", p.returncode)

上面的代码相当于在命令行执行命令nslookup,然后手动输入:

set q=mx
python.org
exit

输入set q=mx检查某个DNS域名的MX记录,然后输入想要检查的DNS域名,输入之后,将显示DNS服务器的MX记录及对应的IP地址。

进程间通信

multiprocessing模块包装了底层机制,提供了Queue、Pipes等多种方式来交换数据。

以Queue为例,在父进程中创建两个子进程,一个往Queue里写数据,一个从Queue里读数据:

import os
import time
import random
from multiprocessing import Process, Queue

# 写数据进程执行的代码
def write(q):
    print("Process to write: %s" % os.getpid())
    for value in ["A", "B", "C"]:
        print("Put %s to queue..." % value)
        q.put(value)
        time.sleep(random.random())

# 读数据进程执行的代码
def read(q):
    print("Process to read: %s" % os.getpid())
    while True:
        value = q.get(True)
        print("Get %s from queue." % value)

if __name__ == "__main__":
    # 父进程创建Queue,并传给各个子进程:
    q = Queue()
    pw = Process(target=write, args=(q,))
    pr = Process(target=read, args=(q,))
    # 启动子进程pw,写入
    pw.start()
    # 启动子进程pr,读取
    pr.start()
    # 等待pw结束
    pw.join()
    # pr进程里是死循环,无法等待其结束,只能强行终止
    pr.terminate()

线程

Python的线程是真正的Posix Thread,而不是模拟出来的线程。利用Python的标准库threading,可以轻松实现多线程任务。

启动一个线程就是把一个函数传入并创建Thread实例,然后调用start()开始执行:

import time
import threading

def sing(msg):
    n = 0
    while n < 5:
        n += 1
        print(f"{threading.current_thread().name} is running...{msg}")
        time.sleep(1)

def dance(msg):
    n = 0
    while n < 5:
        n += 1
        print(f"{threading.current_thread().name} is running...{msg}")
        time.sleep(1)

if __name__ == "__main__":
    # 以元组方式传参
    sing_thread = threading.Thread(target=sing, args=("我在唱歌,啦啦啦",))
    # 以字典方式传参
    dance_thread = threading.Thread(target=dance, kwargs={"msg": "我在跳舞,呱呱呱"})

    sing_thread.start()
    dance_thread.start()

任何进程默认就会启动一个线程,把该线程称为主线程,主线程又可以启动新的线程,Python的threading模块有个current_thread()函数,它永远返回当前线程的实例。

主线程实例的名字叫MainThread,子线程的名字可在创建时指定,如果不起名字Python会自动给线程命名为Thread-1 Thread-2 ...

锁Lock

多线程和多进程最大不同在于,多进程中,同一个变量,各自有一份拷贝存在于每个进程中,互不影响,而多线程中,所有变量都由所有线程共享,所以,任何一个变量都可以被任何一个线程修改,因此,线程之间共享数据最大的危险在于多个线程同时改一个变量,把内容给改乱了。

来看看多个线程同时操作一个变量怎么把内容给改乱了:

import threading

# 假定这是你的银行存款
balance = 0

def change_it(n):
    # 先存后取,结果应该为0
    global balance
    balance = balance + n
    balance = balance - n

def run_thread(n):
    for i in range(2000000):
        change_it(n)

t1 = threading.Thread(target=run_thread, args=(5,))
t2 = threading.Thread(target=run_thread, args=(8,))
t1.start()
t2.start()
t1.join()
t2.join()
print(balance)

变量balance理论上结果应该为0,但是,由于线程的调度是由操作系统决定的,当t1、t2交替执行时,只要循环次数足够多,balance的结果就不一定是0了。

原因是因为高级语言的一条语句在CPU执行时是若干条语句,即使一个简单的计算balance = balance + n也分两步:

  1. 计算balance + n,存入临时变量中
  2. 将临时变量的值赋给balance

可看成是:

x = balance + n
balance = x

因为修改balance需要多条语句,而执行这几条语句时,线程可能中断,从而导致多个线程把同一个对象的内容改乱了。

要确保balance计算正确,就要给change_it()上一把锁,当某个线程开始执行change_it()时,该线程因为获得了锁,其他线程就不能同时执行change_it(),只能等待,直到锁被释放后,获得该锁以后才能改。由于锁只有一个,无论多少线程,同一时刻最多只有一个线程持有该锁,所以,不会造成修改的冲突。

创建一个锁通过threading.Lock()来实现:

import threading

balance = 0
lock = threading.Lock()

def change_it(n):
    # 先存后取,结果应该为0
    global balance
    balance = balance + n
    balance = balance - n

def run_thread(n):
    for i in range(2000000):
        # 先要获取锁
        lock.acquire()
        try:
            # 放心地改吧
            change_it(n)
        finally:
            # 改完了一定要释放锁
            lock.release()

t1 = threading.Thread(target=run_thread, args=(5,))
t2 = threading.Thread(target=run_thread, args=(8,))
t1.start()
t2.start()
t1.join()
t2.join()
print(balance)

用try...finally模式来确保锁一定会被释放。也可以使用with lock模式:

import threading

lock = threading.Lock()

class Account:
    def __init__(self, balance):
        self.balance = balance

def draw(account, amount):
    with lock:
        if account.balance >= amount:
            print("取钱成功")
            account.balance -= amount
            print("余额", account.balance)
        else:
            print("取钱失败,余额不足")

account = Account(1000)
t1 = threading.Thread(target=draw, args=(account, 600))
t2 = threading.Thread(target=draw, args=(account, 800))

t1.start()
t2.start()

GIL全局锁

Python的线程虽是真正的线程,但解释器执行代码时,有一个GIL锁:Global Interpreter Lock,任何Python线程执行前,必须先获得GIL锁,然后,每执行100条字节码,解释器就自动释放GIL锁,让别的线程有机会执行。这个GIL全局锁实际上把所有线程的执行代码都给上了锁,所以,多线程在Python中只能交替执行,即使100个线程跑在100核CPU上,也只能用到1个核。

所以,在Python中,可以使用多线程,但不要指望能有效利用多核。

不过,Python虽然不能利用多线程实现多核任务,但可以通过多进程实现多核任务。多个Python进程有各自独立的GIL锁,互不影响。

ThreadLocal

在多线程环境下,每个线程都有自己的数据。一个线程使用自己的局部变量比使用全局变量好,因为局部变量只有线程自己能看见,不会影响其他线程,而全局变量的修改必须加锁。

但局部变量也有问题,就是在函数调用的时候,传递起来很麻烦。

ThreadLocal解决了参数在一个线程中各个函数之间互相传递的问题。一个ThreadLocal变量虽然是全局变量,但每个线程都只能读写自己线程的独立副本,互不干扰。

import threading

# 创建全局ThreadLocal对象
local_school = threading.local()

def process_student():
    # 获取当前线程关联的student
    std = local_school.student
    print("Hello, %s (in %s)" % (std, threading.current_thread().name))

def process_thread(name):
    # 绑定ThreadLocal的student
    local_school.student = name
    process_student()

t1 = threading.Thread(target=process_thread, args=("Alice",), name="Thread-A")
t2 = threading.Thread(target=process_thread, args=("Bob",), name="Thread-B")
t1.start()
t2.start()
t1.join()
t2.join()

全局变量local_school是一个ThreadLocal对象,每个Thread对它都可以读写student属性,但互不影响。可以把local_school看成全局变量,但每个属性如local_school.student都是线程的局部变量,可以任意读写而互不干扰,也不用管理锁的问题,ThreadLocal内部会处理。

可以理解为全局变量local_school是一个dict,不但可以用local_school.student,还可以绑定其他变量,如local_school.teacher等。

ThreadLocal最常用的地方就是为每个线程绑定一个数据库连接,HTTP请求,用户身份信息等,这样一个线程的所有调用到的处理函数都可以非常方便地访问这些资源。

在爬虫中的应用

import requests
import os
import bs4
import threading

os.makedirs("xkcd", exist_ok=True)  # store comics in ./xkcd

def downloadXkcd(startComic, endComic):
    for urlNumber in range(startComic, endComic):
        # Download the page.
        print(f"Downloading page https://xkcd.com/{urlNumber}...")
        res = requests.get(f"https://xkcd.com/{urlNumber}")
        res.raise_for_status()

        soup = bs4.BeautifulSoup(res.text, "html.parser")

        # Find the URL of the comic image.
        comicElem = soup.select("#comic img")
        if comicElem == []:
            print("Could not find comic image.")
        else:
            comicUrl = comicElem[0].get("src")
            # Download the image.
            print(f"Downloading image {comicUrl}...")
        res = requests.get("https:" + comicUrl)
        res.raise_for_status()

        # Save the image to ./xkcd.
        imageFile = open(os.path.join("xkcd", os.path.basename(comicUrl)), "wb")
        for chunk in res.iter_content(100000):
            imageFile.write(chunk)
        imageFile.close()

downloadThreads = []  # a list of all the Thread objects
for i in range(0, 140, 10):  # loops 14 times, creates 14 threads
    start = i
    end = i + 9
    if start == 0:
        start = 1  # There is no comic 0, so set it to 1.
    downloadThread = threading.Thread(target=downloadXkcd, args=(start, end))
    downloadThreads.append(downloadThread)
    downloadThread.start()

for downloadThread in downloadThreads:
    downloadThread.join()
print("Done.")

多线程数据通信

queue.Queue可用于多线程间的、线程安全的数据通信。

import queue

q = queue.Queue()
# 添加元素
q.put("100")
q.put("200")
# 获取元素
item = q.get()
print(item)  # 100
item2 = q.get()
print(item2)  # 200
# 查看元素多少
print(q.qsize())  # 0
# 判断是否为空
print(q.empty())  # True
# 判断是否已满
print(q.full())  # False

守护线程

from threading import Thread
import time

def task(name):
    time.sleep(1)
    print(f"你好,{name}")

if __name__ == "__main__":
    t = Thread(target=task, args=("张三",))
    t.setDaemon(True)  # 守护线程,必须在 start() 之前设置
    # t.daemon = True  # 另一种设置方法
    t.start()
    print("主线程")

信号量

from threading import Thread, Semaphore, currentThread
import time
import random

semaphore = Semaphore(3)

def task():
    with semaphore:
        print(currentThread().getName())
        time.sleep(random.randint(1, 3))

if __name__ == "__main__":
    for i in range(10):
        t = Thread(target=task)
        t.start()
Python
Theme Jasmine by Kent Liao