|
| 1 | +##使用多线程解决斐波那契序列多输入问题 |
| 2 | +接下来我们将实践python多线程的使用。任务是并行处理斐波那契序列多输入值。为了把问题说清楚,我们将把输入分成四部分,四个线程分别处理一个输入数据。算法描述如下: |
| 3 | + |
| 4 | +1. 首先使用一个列表存储四个待输入值,这些值将被放入对于线程来说互相锁定的数据结构。 |
| 5 | +2. 输入值被放入可被锁定的数据结构之后,负责处理斐波那契序列的线程将被告知可以被执行。这时,我们可以使用python线程的同步机制Condition模块(Condition模块对共享变量提供线程之间的同步操作),模块详情请参考:[http://docs.python.org/3/library/threading.html#threading.Condition](http://docs.python.org/3/library/threading.html#threading.Condition)。 |
| 6 | +3. 当每个线程结束斐波那契序列的计算后,分别把结果存入一个字典。 |
| 7 | + |
| 8 | +接下来我们将列出代码,并且讲述其中有趣的地方: |
| 9 | + |
| 10 | +代码开始处我们加入了对编码额外的支持,导入logging, threading和queue模块。此外,我们还定义了我们例子中用到主要数据结构。一个字典,被命名为fibo_dict,将用来存储输入输出数据,输入数据为key,计算结果(输出数据)为值。我们同样定义了一个队列对象,该对象中存储线程间的共享数据(包括读写)。我们把该对象命名为shared\_queue。最后我们定义一个列表模拟程序的四个输入值。代码如下: |
| 11 | + |
| 12 | +```python |
| 13 | + |
| 14 | + #coding: utf-8 |
| 15 | + import logging, threading |
| 16 | + from queue import Queue |
| 17 | + logger = logging.getLogger() |
| 18 | + logger.setLevel(logging.DEBUG) |
| 19 | + formatter = logging.Formatter('%(asctime)s - %(message)s') |
| 20 | + ch = logging.StreamHandler() |
| 21 | + ch.setLevel(logging.DEBUG) |
| 22 | + ch.setFormatter(formatter) |
| 23 | + logger.addHandler(ch) |
| 24 | + fibo_dict = {} |
| 25 | + shared_queue = Queue() |
| 26 | + input_list = [3, 10, 5, 7] |
| 27 | +``` |
| 28 | + |
| 29 | +接下来的一行代码,我们从threading模块中定义了一个Condition对象,该对象根据一定的条件同步线程资源。 |
| 30 | + |
| 31 | +```python |
| 32 | + |
| 33 | + queue_condition = threading.Condition() |
| 34 | +``` |
| 35 | + |
| 36 | +使用Condition对象用于控制线程的创建队列。 |
| 37 | + |
| 38 | +下一块代码定义了一个很多线程都需要调用的方法,我们把它命名为fibonacci\_task。fibonacci\_task方法接收condition对象作为线程获取shared\_queue中值的协议。方法中,我们只用了with语句(关于更多with语句的用法,请参考[http://docs.python.org/3/reference/compound_stmts.html#with](http://docs.python.org/3/reference/compound_stmts.html#with))简化控制内容。如果没有with语句,我们则需要显式的使用锁,并且最后释放锁。有了with操作,代码隐式的在代码最开始获得锁,并在代码最后释放锁。fibonacci方法中接下来是逻辑处理相关代码,告诉当前线程,当shared\_queue为空时,等待。wait()方法是condition中的主要方法之一。线程将一直等待,直到被通知shared\_queue可以被使用。一旦满足shared\_queue可以被使用的条件,当前线程将接收shared\_queue中的值作为输入计算斐波那契序列的值,最后把输入和输出作为key和value存入fibo\_dict字典。最后,我们调用task_done()方法,通知某一个任务已经被分离并执行。代码如下: |
| 39 | + |
| 40 | +```python |
| 41 | + |
| 42 | + def fibonacci_task(condition): |
| 43 | + with condition: |
| 44 | + while shared_queue.empty(): |
| 45 | + logger.info("[%s] - waiting for elements in queue.." |
| 46 | + % threading.current_thread().name) |
| 47 | + condition.wait() |
| 48 | + else: |
| 49 | + value = shared_queue.get() |
| 50 | + a, b = 0, 1 |
| 51 | + for item in range(value): |
| 52 | + a, b = b, a + b |
| 53 | + fibo_dict[value] = a |
| 54 | + shared_queue.task_done() |
| 55 | + logger.debug("[%s] fibonacci of key [%d] with |
| 56 | + result [%d]" % |
| 57 | + (threading.current_thread().name, value, |
| 58 | + fibo_dict[value])) |
| 59 | +``` |
| 60 | + |
| 61 | +我们定义的第二个函数是queue_task,该函数被负责计算shared\_queue的值的线程所调用。我们看到condition对象作为获得shared\_queue的协议。input_list中的每一个值都将被插入到shared_queue中去。当所有的值都被插入到shared\_queue中后,告知负责计算斐波那契序列的方法shared\_queue已经可以使用。 |
| 62 | + |
| 63 | +```python |
| 64 | + |
| 65 | + def queue_task(condition): |
| 66 | + logging.debug('Starting queue_task...') |
| 67 | + with condition: |
| 68 | + for item in input_list: |
| 69 | + shared_queue.put(item) |
| 70 | + logging.debug("Notifying fibonacci_task threads |
| 71 | + that the queue is ready to consume..") |
| 72 | + condition.notifyAll() |
| 73 | +``` |
| 74 | + |
| 75 | +接下来我们将创建四个线程等待shared\_queue可以被使用条件。线程将执行target参数作为回调函数,代码如下: |
| 76 | + |
| 77 | +```python |
| 78 | + |
| 79 | + threads = [threading.Thread( |
| 80 | + daemon=True, target=fibonacci_task, |
| 81 | + args=(queue_condition,)) for i in range(4)] |
| 82 | +``` |
| 83 | + |
| 84 | +接着我们使用thread对象的start方法开始线程: |
| 85 | + |
| 86 | +```python |
| 87 | + |
| 88 | + [thread.start() for thread in threads] |
| 89 | +``` |
| 90 | + |
| 91 | +然后我们创建一个线程处理shared\_queue,然后执行该线程。代码如下: |
| 92 | + |
| 93 | +```python |
| 94 | + |
| 95 | + prod = threading.Thread(name='queue_task_thread', daemon=True, |
| 96 | + target=queue_task, args=(queue_condition,)) |
| 97 | + prod.start() |
| 98 | +``` |
| 99 | + |
| 100 | +最后,我们调用join()方法,调用计算斐波那契序列的所有线程,使用join()方法的目的是,让主线程等待子线程的调用,直到所有子线程执行完毕之后才结束子线程。 |
| 101 | + |
| 102 | +```python |
| 103 | + |
| 104 | + [thread.join() for thread in threads] |
| 105 | +``` |
| 106 | + |
| 107 | +程序的执行结果如下: |
| 108 | + |
| 109 | +注意到,第一个fibonacci_task线程被创建和初始化后,它们进入等待状态。同时,queue_task线程被创建并且生成shared\_queue队列。最后,queue\_task方法告知fibonacci_task线程可以执行它们的任务。 |
| 110 | + |
| 111 | +注意到,程序每次执行的过程都不一样,这也是多线程的特性之一。 |
0 commit comments