查看原文
其他

python的生产者消费者模型,看这篇就够了

猿媛牧场 Python专栏 2018-10-28




首先先来解释下,什么是「生产者消费者模型」生产者消费者问题(Producer-consumer problem),也称有限缓冲问题(Bounded-buffer problem),是一个多线程同步问题的经典案例。该问题描述了共享固定大小缓冲区的两个线程——即所谓的“生产者”和“消费者”——在实际运行时会发生的问题。生产者的主要作用是生成一定量的数据放到缓冲区中,然后重复此过程。与此同时,消费者也在缓冲区消耗这些数据。该问题的关键就是要保证生产者不会在缓冲区满时加入数据,消费者也不会在缓冲区中空时消耗数据。
以上是wiki百科对于模型的解释,用我的话总结一下就是:一个愿打,一个愿挨

从上图中可以看到生产者消费者之间用中间类似一个队列一样的东西串起来。
这个队列可以想像成一个存放产品的“仓库”,生产者只需要关心这个“仓库”,并不需要关心具体的消费者,对于生产者而言甚至都不知道有这些消费者存在。
对于消费者而言他也不需要关心具体的生产者,到底有多少生产者也不是他关心的事情,他只要关心这个“仓库”中还有没有东西。这种模型是一种松耦合模型

那么接下来就用代码来演示一下

class Consumer(threading.Thread):
  def __init__(self, queue):
      # 从python3 开始,继承有了极大的改善
      super().__init__()
      # 我喜欢用单下划线去标识私有变量
      self._queue = queue

  # 继承了Thread类后,需要重写run方法来自定义事件
  def run(self):
      while True:
          # msg即是我们说的消息,也是仓库中的货物
          msg = self._queue.get()
          # 我会在生产者中加入quit关键字,保证程序能够自动退出
          if isinstance(msg, str) and msg == 'quit':
              break
          print(f"I'm a thread, and I received {msg}!!")
      print('Bye byes!')


def producer():
  # 生产者生产一个新的队列,用于存放消息
  queue = Queue.Queue()
  # 初始化一个消费者实例
  worker = Consumer(queue)

  # 开启消费者线程
  worker.start()  
  start_time = time.time()
  # 退出条件
  while time.time() - start_time < 5:
      queue.put('something at %s' % time.time())
      time.sleep(1)
  queue.put('quit')
  worker.join()


if __name__ == '__main__':
  producer()


那么如果在爬虫的时候,我们就可以把仓库中的货物想像成是一个个的url,生产者产生url链接,消费者获取url连接并从中得到数据,在队列的帮助下可以使用多线程加快爬虫速度。

import time, threading, Queue
import requests

class Consumer(threading.Thread):
   def __init__(self, queue):
       super().__init__()
       self._queue = queue

   def run(self):
       while True:
           _content = self._queue.get()
           print(_content)
           if isinstance(_content, str) and _content == 'quit':
               break
           _res = requests.get(content)
       print('Bye byes!')

def Producer():
   _urls = ['url1', 'url2', 'url3', 'url4']
   _q = Queue.Queue()
   # 一次打开4个队列
   _workers = build_worker_pool(_q, 4)
   _start_time = time.time()
   for _url in _urls:
       _q.put(_url)
   for _w in _workers:
       _q.put('quit')
   for _w in _workers:
       # join的作用就是为了保证所有的子线程都结束了,再结束父线程
       _w.join()
   _t = time.time() - start_time
   print(f"Done! Time taken: {_t}")

def build_worker_pool(queue, size):
   _workers = []
   for _ in range(size):
       _worker = Consumer(queue)
       _worker.start()
       _workers.append(_worker)
   return workers

if __name__ == '__main__':
   Producer()


好啦~今天的内容就到这里差不多咯~


如果你对今天的内容还感兴趣的话,何不点个赞再走呢?
如果感兴趣到想赞赏我,就不要犹豫啦~




目前我开了2个主群,我邀请了一些我的BAT伙伴前来助阵。定期也会在群里组织抽奖、送书等活动。更有各种资源分享。

目前2个主群都以过百,想要加入的小伙伴,可以加我微信,我拉你们,或者公众号回复关键“关注作者”。


另外高级群」已经升级啦!如果你错过了种子轮,难道还要错过天使轮吗?群内不定期组织红包接龙,每天中午1小时的随即话题讨论没有广告只聊技术生活,这样的群上哪找?


推荐阅读:

写了这么多年的python,tuple竟然是可变的?
你真的会用python写mongodb的URI吗?



    您可能也对以下帖子感兴趣

    文章有问题?点此查看未经处理的缓存