python多线程中,主线程中如果捕获子线程的异常,笔者查阅了相关资料,有一种方式是使用队列(queue)将子线程的异常写入队列,然后主进程中去遍历异常消息队列,这种方式不近需要额外引入一个q对象,要同时遍历队列和判断线程状态,实现上上非常丑陋,后来发现如下方式,通过继承threading.Thread后,重写run和join方法,优雅地实现了线程方法的异常“上抛”,可以在主线程中轻松捕获子线程的异常信息。

 

以下是译文:

https://www.geeksforgeeks.org/handling-a-threads-exception-in-the-caller-thread-in-python/
Python中的多线程可以通过使用threading库来实现。为了调用线程,调用者线程创建一个线程对象并在其上调用 start 方法。一旦调用了 join 方法,就会启动它的执行并执行类对象的 run 方法。

对于异常处理,使用 try-except 块来捕获在 try 块中引发的异常,并在 except 块中进行相应处理
比如:

# Importing the modules
import threading
import sys
 
# Custom Thread Class
class MyThread(threading.Thread):
    def someFunction(self):
        print("Hello World")
    def run(self):
          self.someFunction()
    def join(self):
        threading.Thread.join(self)
 
# Driver function
def main():
    t = MyThread()
    t.start()
    t.join()
 
# Driver code
if __name__ == '__main__':
    main()
#输出:Hello World

为了在调用者线程中捕获和处理线程的异常,我们使用一个变量来存储被调用线程中引发的异常(如果有),当被调用线程被连接时,连接函数检查 exc 的值是否为 None,
如果是则不生成异常,否则再次引发存储在 exc 中的生成异常。这发生在调用者线程中,因此可以在调用者线程本身中处理。

示例:
该示例创建了一个 MyThread 类型的线程 t,该线程的 run() 方法调用了 someFunction() 方法,该方法引发了 MyException,因此每当线程运行时,它都会引发异常。
为了在调用者线程中捕获异常,我们维护了一个单独的变量 exc,它设置为当被调用线程引发异常时引发的异常。
最后在 join() 方法中检查此 exc,如果不是 None,则 join 只会引发相同的异常。因此,在调用者线程中引发了捕获的异常,因为 join 在调用者线程(这里是主线程)中返回,因此被相应地处理。

# Importing the modules
import threading
import sys
 
# Custom Exception Class
class MyException(Exception):
    pass
 
# Custom Thread Class
class MyThread(threading.Thread):
     
  # Function that raises the custom exception
    def someFunction(self):
        name = threading.current_thread().name
        raise MyException("An error in thread "+ name)
 
    def run(self):
        # Variable that stores the exception, if raised by someFunction
        self.exc = None           
        try:
            self.someFunction()
        except BaseException as e:
            self.exc = e
       
    def join(self):
        threading.Thread.join(self)
        # Since join() returns in caller thread
        # we re-raise the caught exception
        # if any was caught
        if self.exc:
            raise self.exc
 
# Driver function
def main():
   
    # Create a new Thread t
    # Here Main is the caller thread
    t = MyThread()
    t.start()
     
    # Exception handled in Caller thread
    try:
        t.join()
    except Exception as e:
        print("Exception Handled in Main, Details of the Exception:", str(e))
 
# Driver code
if __name__ == '__main__':
    main()

#输出:Exception Handled in Main, Details of the Exception: An error in thread Thread-1 

 

 

译者补充:

 

补充1:理解重新run和join方法的目的

按照常规的方式,通过继承threading.Thread重写run方法不作任何异常处理,如下:执行该代码,主线程中无法捕获子线程的异常,也即主线程中没有任何反应。
由此可以体会到重写run和join方法的作用。

# Importing the modules
import threading
import sys


# Custom Exception Class
class MyException(Exception):
    pass


# Custom Thread Class
class MyThread(threading.Thread):

    # Function that raises the custom exception
    def someFunction(self):
        name = threading.current_thread().name
        raise MyException("An error in thread " + name)

    def run(self):
        self.someFunction()

# Driver function
def main():
    # Create a new Thread t
    # Here Main is the caller thread
    t = MyThread()
    t.start()

    # Exception handled in Caller thread
    try:
        t.join()
    except Exception as e:
        print("Exception Handled in Main, Details of the Exception:",str(e))
) # Driver code if __name__ == '__main__': main()

 

补充2:上述case只有一个线程,如果开启了多个线程,且不希望某个线程异常之后打断主线程,该怎么处理?此时需要在线程join的时候依次捕获器异常,而不是在线程join的时候捕获异常。

# Importing the modules
import concurrent
import threading
import sys


# Custom Exception Class
import time
from concurrent.futures import ThreadPoolExecutor


class MyException(Exception):
    pass


# Custom Thread Class
class MyThread(threading.Thread):

    # Function that raises the custom exception
    def someFunction(self):
        name = threading.current_thread().name
        raise MyException("An error in thread " + name)

    def run(self):
        self.someFunction()



# Driver function
def main():
    # Create a new Thread t
    # Here Main is the caller thread
    t1 = MyThread()
    t2 = MyThread()

    list_thread = [t1,t2]
for t in list_thread:
        t.start()
    # Exception handled in Caller thread
    for t in list_thread:
        try:
            t.join()
        except Exception as e:
            print("Exception Handled in Main, Details of the Exception:",str(e))
) # Driver code if __name__ == '__main__': main()

 

 

补充3:如果使用线程池的模式开启多线程,不需要重写线程类,同时也就也不需要重写run和join方法,只需要遍历ThreadPoolExecutor的submit方法返回的futuer对象即可。可见ThreadPoolExecutor对象替我们做了很多工作。

# Importing the modules
import concurrent
import threading
import sys


# Custom Exception Class
import time
from concurrent.futures import ThreadPoolExecutor


class MyException(Exception):
    pass


# Custom Thread Class
class MyThread():
    # Function that raises the custom exception
    def someFunction(self):
        time.sleep(1)
        name = threading.current_thread().name
        raise MyException("An error in thread " + name)



# Driver function
def main():
    # Create a new Thread t
    # Here Main is the caller thread
    t1 = MyThread()
    t2 = MyThread()

    list_thread = [t1,t2]
    list_task = []

    '''
    for t in list_thread:
        t.start()
    # Exception handled in Caller thread
    for t in list_thread:
        try:
            t.join()
        except Exception as e:
            print("Exception Handled in Main, Details of the Exception:", str(e))
    '''

    with ThreadPoolExecutor(max_workers=3) as executor:
        for obj in list_thread:
            future = executor.submit(obj.someFunction)
            list_task.append(future)


    for future in list_task:
        try:
            future.result()
        except Exception as e:
            print("Exception Handled in Main, Details of the Exception:", str(e))



# Driver code
if __name__ == '__main__':
    main()

 

 

补充4:如何让子线程在异常的时候打断,或者不打断主线程?

其实上述代码中已经写明了,在异常捕获的中,如果想要子线程不打断主线程,需要在单个子线程join的时候进行异常处理,这种情况下单个子线程正常与否,不影响主线程。、
如果try except在线程join的外层(线程池中的list_task外层),那么任何一个线程的异常将中断主线程

    
    for t in list_thread:
        try:
            t.join()
        except Exception as e:
            print("Exception Handled in Main, Details of the Exception:", str(e))


    for future in list_task:
        try:
            future.result()
        except Exception as e:
            print("Exception Handled in Main, Details of the Exception:", str(e))