Python进程和线程

19次阅读
没有评论

共计 9147 个字符,预计需要花费 23 分钟才能阅读完成。

对操作系统来说,一个任务就是一个进程(Process),比如打开一个浏览器就是启动一个浏览器进程,打开两个记事本就启动了两个记事本进程。

有些进程不止同时干一件事,比如 Word,它可以同时进行打字、拼写检查、打印等事情。在一个进程内部,要同时干多件事,就需要同时运行多个“子任务”,我们把进程内的这些“子任务”称为线程(Thread)。

进程之间是内存隔离的,即不同的进程拥有各自的内存空间。这就类似于不同的公司拥有不同的办公场所。

线程之间是内存共享的,线程是属于进程的,一个进程内的多个线程之间是共享这个进程所拥有的内存空间的。这就好比,公司员工之间共享公司的办公场所。

进程是操作系统分配资源的最小单位,而线程是程序执行(CPU 调度)的最小单位。

进程

multiprocessing 模块是跨平台版本的多进程模块,提供了一个 Process 类来代表一个进程对象。

import os
from multiprocessing import Process

# 子进程要执行的代码
def run_proc(name):
    print(" 子进程 %s (%s)" % (os.getpid(), name))
    print(" 父进程 %s" % os.getppid())  # getppid 获取当前父进程编号

if __name__ == "__main__":
    # getpid 获取当前进程编号
    print(" 父进程 %s" % os.getpid())
    p = Process(target=run_proc, args=("test",))
    p2 = Process(target=run_proc, kwargs={"name": "test2"})
    print("Child process start")
    p.start()
    p2.start()
    p.join()
    p2.join()
    print("Child process end")

创建子进程时,只需要传入一个执行函数和参数,创建一个 Process 实例,用 start()方法(仅仅只是给操作系统发送了一个信号)启动。

join()方法:主进程等待子进程结束后再继续往下运行,通常用于进程间的同步。

进程间是不共享全局变量的。实际上创建一个子进程就是把主进程的资源进行拷贝产生了一个新进程,这里主进程和子进程是互相独立的。

守护主进程

实际上,主进程会等待所有子进程执行结束再结束。通过设置守护主进程,来保证主进程执行完毕,子进程随之销毁。

import time
from multiprocessing import Process

def work():
    for i in range(10):
        print(" 工作中...")
        time.sleep(0.2)

if __name__ == "__main__":
    work_process = Process(target=work)
    work_process.daemon = True  # 方式一 设置守护主进程
    work_process.start()
    time.sleep(1)
    # work_process.terminate()  # 方式二 直接销毁子进程
    print(" 主进程执行结束 ")

互斥锁

from multiprocessing import Process, Lock
import time

def task(name, mutex):
    mutex.acquire()  # 上锁
    print(f"{name} 1")
    time.sleep(1)
    print(f"{name} 2")
    mutex.release()  # 解锁

if __name__ == "__main__":
    mutex = Lock()
    for i in range(3):
        p = Process(target=task, args=(f" 进程{i}", mutex))
        p.start()

进程池 Pool

如果要启动大量的子进程,可以用进程池的方式批量创建子进程:

import os
import time
import random
from multiprocessing import Pool

def long_time_task(name):
    print("Run task %s (%s)..." % (name, os.getpid()))
    start = time.time()
    time.sleep(random.random() * 3)
    end = time.time()
    print("Task %s runs %0.2f seconds." % (name, (end - start)))

if __name__ == "__main__":
    print("Parent process %s." % os.getpid())
    p = Pool(4)
    for i in range(5):
        p.apply_async(long_time_task, args=(i,))
    print("Waiting for all subprocesses done...")
    p.close()
    p.join()
    print("All subprocesses done.")

对 Pool 对象调用 join()方法会等待所有子进程执行完毕,调用 join()之前必须先调用 close(),调用 close()之后就不能继续添加新的 Process 了。

注意输出的结果,task 0,1,2,3 是立刻执行的,而 task 4 要等待前面某个 task 完成后才执行,这是因为 Pool 的默认大小是 CPU 核数,这里设置为 4 最多同时执行 4 个进程。

如果改成 p = Pool(5) 就可以同时跑 5 个进程。

子进程

很多时候,子进程并不是自身,而是一个外部进程。创建了子进程后,还需要控制子进程的输入和输出。

subprocess 模块可以非常方便地启动一个子进程,然后控制其输入和输出。

下面的例子演示了如何在 Python 代码中运行命令nslookup www.python.org,这和命令行直接运行的效果是一样的:

import subprocess

print("$ nslookup www.python.org")
r = subprocess.call(["nslookup", "www.python.org"])
print("Exit code:", r)

如果子进程还需要输入,则可以通过 communicate()方法输入:

import subprocess

print("$ nslookup")
p = subprocess.Popen(["nslookup"], stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
output, err = p.communicate(b"set q=mxnpython.orgnexitn")
print(output.decode("gbk"))  # Windows 默认编码是 gbk
print("Exit code:", p.returncode)

上面的代码相当于在命令行执行命令 nslookup,然后手动输入:

set q=mx
python.org
exit

输入 set q=mx 检查某个 DNS 域名的 MX 记录,然后输入想要检查的 DNS 域名,输入之后,将显示 DNS 服务器的 MX 记录及对应的 IP 地址。

进程间通信

multiprocessing 模块包装了底层机制,提供了 Queue、Pipes 等多种方式来交换数据。

以 Queue 为例,在父进程中创建两个子进程,一个往 Queue 里写数据,一个从 Queue 里读数据:

import os
import time
import random
from multiprocessing import Process, Queue

# 写数据进程执行的代码
def write(q):
    print("Process to write: %s" % os.getpid())
    for value in ["A", "B", "C"]:
        print("Put %s to queue..." % value)
        q.put(value)
        time.sleep(random.random())

# 读数据进程执行的代码
def read(q):
    print("Process to read: %s" % os.getpid())
    while True:
        value = q.get(True)
        print("Get %s from queue." % value)

if __name__ == "__main__":
    # 父进程创建 Queue,并传给各个子进程:q = Queue()
    pw = Process(target=write, args=(q,))
    pr = Process(target=read, args=(q,))
    # 启动子进程 pw,写入
    pw.start()
    # 启动子进程 pr,读取
    pr.start()
    # 等待 pw 结束
    pw.join()
    # pr 进程里是死循环,无法等待其结束,只能强行终止
    pr.terminate()

线程

Python 的线程是真正的 Posix Thread,而不是模拟出来的线程。利用 Python 的标准库 threading,可以轻松实现多线程任务。

启动一个线程就是把一个函数传入并创建 Thread 实例,然后调用 start()开始执行:

import time
import threading

def sing(msg):
    n = 0
    while n < 5:
        n += 1
        print(f"{threading.current_thread().name} is running...{msg}")
        time.sleep(1)

def dance(msg):
    n = 0
    while n < 5:
        n += 1
        print(f"{threading.current_thread().name} is running...{msg}")
        time.sleep(1)

if __name__ == "__main__":
    # 以元组方式传参
    sing_thread = threading.Thread(target=sing, args=(" 我在唱歌,啦啦啦 ",))
    # 以字典方式传参
    dance_thread = threading.Thread(target=dance, kwargs={"msg": " 我在跳舞,呱呱呱 "})

    sing_thread.start()
    dance_thread.start()

任何进程默认就会启动一个线程,把该线程称为主线程,主线程又可以启动新的线程,Python 的 threading 模块有个 current_thread()函数,它永远返回当前线程的实例。

主线程实例的名字叫 MainThread,子线程的名字可在创建时指定,如果不起名字 Python 会自动给线程命名为 Thread-1 Thread-2 …

守护主线程

实际上,主线程会等待所有子线程执行结束再结束。通过设置守护主线程,来保证主线程执行完毕,子线程随之销毁。

from threading import Thread
import time

def task(name):
    time.sleep(1)
    print(f" 你好,{name}")

if __name__ == "__main__":
    t = Thread(target=task, args=(" 张三 ",))
    t.setDaemon(True)  # 守护线程,必须在 start() 之前设置
    # t.daemon = True  # 另一种设置方法
    t.start()
    print(" 主线程 ")

锁 Lock

多线程都在同个进程中,多线程使用的资源都是同一个进程中的资源,因此多线程间是共享全局变量的。因此,线程之间共享数据最大的危险在于多个线程同时改一个变量,把内容给改乱了。

来看看多个线程同时操作一个变量怎么把内容给改乱了:

import threading

# 假定这是你的银行存款
balance = 0

def change_it(n):
    # 先存后取,结果应该为 0
    global balance
    balance = balance + n
    balance = balance - n

def run_thread(n):
    for i in range(2000000):
        change_it(n)

t1 = threading.Thread(target=run_thread, args=(5,))
t2 = threading.Thread(target=run_thread, args=(8,))
t1.start()
t2.start()
t1.join()
t2.join()
print(balance)

变量 balance 理论上结果应该为 0,但是,由于线程的调度是由操作系统决定的,当 t1、t2 交替执行时,只要循环次数足够多,balance 的结果就不一定是 0 了。

原因是因为高级语言的一条语句在 CPU 执行时是若干条语句,即使一个简单的计算 balance = balance + n 也分两步:

  1. 计算 balance + n,存入临时变量中
  2. 将临时变量的值赋给 balance

可看成是:

x = balance + n
balance = x

因为修改 balance 需要多条语句,而执行这几条语句时,线程可能中断,从而导致多个线程把同一个对象的内容改乱了。

要确保 balance 计算正确,就要给 change_it()上一把锁(互斥锁 mutex),当某个线程开始执行 change_it()时,该线程因为获得了锁,其他线程就不能同时执行 change_it(),只能等待,直到锁被释放后,获得该锁以后才能改。由于锁只有一个,无论多少线程,同一时刻最多只有一个线程持有该锁,所以,不会造成修改的冲突。

创建一个锁通过 threading.Lock()来实现:

import threading

balance = 0
lock = threading.Lock()

def change_it(n):
    # 先存后取,结果应该为 0
    global balance
    balance = balance + n
    balance = balance - n

def run_thread(n):
    for i in range(2000000):
        # 先要获取锁
        lock.acquire()
        try:
            # 放心地改吧
            change_it(n)
        finally:
            # 改完了一定要释放锁,否则会产生死锁
            lock.release()

t1 = threading.Thread(target=run_thread, args=(5,))
t2 = threading.Thread(target=run_thread, args=(8,))
t1.start()
t2.start()
t1.join()
t2.join()
print(balance)

用 try…finally 模式来确保锁一定会被释放。也可以使用 with lock 模式:

import threading

lock = threading.Lock()

class Account:
    def __init__(self, balance):
        self.balance = balance

def draw(account, amount):
    with lock:
        if account.balance >= amount:
            print(" 取钱成功 ")
            account.balance -= amount
            print(" 余额 ", account.balance)
        else:
            print(" 取钱失败,余额不足 ")

account = Account(1000)
t1 = threading.Thread(target=draw, args=(account, 600))
t2 = threading.Thread(target=draw, args=(account, 800))

t1.start()
t2.start()

GIL 全局锁

Python 的线程虽是真正的线程,但解释器执行代码时,有一个 GIL 锁:Global Interpreter Lock,任何 Python 线程执行前,必须先获得 GIL 锁,然后,每执行 100 条字节码,解释器就自动释放 GIL 锁,让别的线程有机会执行。这个 GIL 全局锁实际上把所有线程的执行代码都给上了锁,所以,多线程在 Python 中只能交替执行,即使 100 个线程跑在 100 核 CPU 上,也只能用到 1 个核。

所以,在 Python 中,可以使用多线程,但不要指望能有效利用多核。

不过,Python 虽然不能利用多线程实现多核任务,但可以通过多进程实现多核任务。多个 Python 进程有各自独立的 GIL 锁,互不影响。

ThreadLocal

在多线程环境下,每个线程都有自己的数据。一个线程使用自己的局部变量比使用全局变量好,因为局部变量只有线程自己能看见,不会影响其他线程,而全局变量的修改必须加锁。

但局部变量也有问题,就是在函数调用的时候,传递起来很麻烦。

ThreadLocal 解决了参数在一个线程中各个函数之间互相传递的问题。一个 ThreadLocal 变量虽然是全局变量,但每个线程都只能读写自己线程的独立副本,互不干扰。

import threading

# 创建全局 ThreadLocal 对象
local_school = threading.local()

def process_student():
    # 获取当前线程关联的 student
    std = local_school.student
    print("Hello, %s (in %s)" % (std, threading.current_thread().name))

def process_thread(name):
    # 绑定 ThreadLocal 的 student
    local_school.student = name
    process_student()

t1 = threading.Thread(target=process_thread, args=("Alice",), name="Thread-A")
t2 = threading.Thread(target=process_thread, args=("Bob",), name="Thread-B")
t1.start()
t2.start()
t1.join()
t2.join()

全局变量 local_school 是一个 ThreadLocal 对象,每个 Thread 对它都可以读写 student 属性,但互不影响。可以把 local_school 看成全局变量,但每个属性如 local_school.student 都是线程的局部变量,可以任意读写而互不干扰,也不用管理锁的问题,ThreadLocal 内部会处理。

可以理解为全局变量 local_school 是一个 dict,不但可以用 local_school.student,还可以绑定其他变量,如 local_school.teacher 等。

ThreadLocal 最常用的地方就是为每个线程绑定一个数据库连接,HTTP 请求,用户身份信息等,这样一个线程的所有调用到的处理函数都可以非常方便地访问这些资源。

在爬虫中的应用

import requests
import os
import bs4
import threading

os.makedirs("xkcd", exist_ok=True)  # store comics in ./xkcd

def downloadXkcd(startComic, endComic):
    for urlNumber in range(startComic, endComic):
        # Download the page.
        print(f"Downloading page https://xkcd.com/{urlNumber}...")
        res = requests.get(f"https://xkcd.com/{urlNumber}")
        res.raise_for_status()

        soup = bs4.BeautifulSoup(res.text, "html.parser")

        # Find the URL of the comic image.
        comicElem = soup.select("#comic img")
        if comicElem == []:
            print("Could not find comic image.")
        else:
            comicUrl = comicElem[0].get("src")
            # Download the image.
            print(f"Downloading image {comicUrl}...")
        res = requests.get("https:" + comicUrl)
        res.raise_for_status()

        # Save the image to ./xkcd.
        imageFile = open(os.path.join("xkcd", os.path.basename(comicUrl)), "wb")
        for chunk in res.iter_content(100000):
            imageFile.write(chunk)
        imageFile.close()

downloadThreads = []  # a list of all the Thread objects
for i in range(0, 140, 10):  # loops 14 times, creates 14 threads
    start = i
    end = i + 9
    if start == 0:
        start = 1  # There is no comic 0, so set it to 1.
    downloadThread = threading.Thread(target=downloadXkcd, args=(start, end))
    downloadThreads.append(downloadThread)
    downloadThread.start()

for downloadThread in downloadThreads:
    downloadThread.join()
print("Done.")

多线程数据通信

queue.Queue 可用于多线程间的、线程安全的数据通信。

import queue

q = queue.Queue()
# 添加元素
q.put("100")
q.put("200")
# 获取元素
item = q.get()
print(item)  # 100
item2 = q.get()
print(item2)  # 200
# 查看元素多少
print(q.qsize())  # 0
# 判断是否为空
print(q.empty())  # True
# 判断是否已满
print(q.full())  # False

信号量

from threading import Thread, Semaphore, currentThread
import time
import random

semaphore = Semaphore(3)

def task():
    with semaphore:
        print(currentThread().getName())
        time.sleep(random.randint(1, 3))

if __name__ == "__main__":
    for i in range(10):
        t = Thread(target=task)
        t.start()

正文完
post-qrcode
 0
三毛
版权声明:本站原创文章,由 三毛 于2023-08-17发表,共计9147字。
转载说明:除特殊说明外本站文章皆由CC-4.0协议发布,转载请注明出处。
评论(没有评论)