• <small id='RO6Yk'></small><noframes id='RO6Yk'>

    <tfoot id='RO6Yk'></tfoot>
    1. <i id='RO6Yk'><tr id='RO6Yk'><dt id='RO6Yk'><q id='RO6Yk'><span id='RO6Yk'><b id='RO6Yk'><form id='RO6Yk'><ins id='RO6Yk'></ins><ul id='RO6Yk'></ul><sub id='RO6Yk'></sub></form><legend id='RO6Yk'></legend><bdo id='RO6Yk'><pre id='RO6Yk'><center id='RO6Yk'></center></pre></bdo></b><th id='RO6Yk'></th></span></q></dt></tr></i><div id='RO6Yk'><tfoot id='RO6Yk'></tfoot><dl id='RO6Yk'><fieldset id='RO6Yk'></fieldset></dl></div>

      1. <legend id='RO6Yk'><style id='RO6Yk'><dir id='RO6Yk'><q id='RO6Yk'></q></dir></style></legend>
          <bdo id='RO6Yk'></bdo><ul id='RO6Yk'></ul>
      2. 同步异步队列

        时间:2023-09-29
          <tbody id='or8CI'></tbody>
        <tfoot id='or8CI'></tfoot>
          <bdo id='or8CI'></bdo><ul id='or8CI'></ul>

                <legend id='or8CI'><style id='or8CI'><dir id='or8CI'><q id='or8CI'></q></dir></style></legend>
              • <small id='or8CI'></small><noframes id='or8CI'>

                  <i id='or8CI'><tr id='or8CI'><dt id='or8CI'><q id='or8CI'><span id='or8CI'><b id='or8CI'><form id='or8CI'><ins id='or8CI'></ins><ul id='or8CI'></ul><sub id='or8CI'></sub></form><legend id='or8CI'></legend><bdo id='or8CI'><pre id='or8CI'><center id='or8CI'></center></pre></bdo></b><th id='or8CI'></th></span></q></dt></tr></i><div id='or8CI'><tfoot id='or8CI'></tfoot><dl id='or8CI'><fieldset id='or8CI'></fieldset></dl></div>

                  本文介绍了同步异步队列的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着跟版网的小编来一起学习吧!

                  问题描述

                  我计划使用基于异步队列的生产者-消费者实现来处理实时数据,其中以正确的时间顺序发送数据至关重要.所以这里是它的代码片段:

                  I am planning to have an asyncio Queue based producer-consumer implementation for a processing of realtime data where sending out data in correct time order is vital. So here is the code snippet of it :

                  async def produce(Q, n_jobs):
                      for i in range(n_jobs):
                          
                          print(f"Producing :{i}")
                          await Q.put(i)
                  
                  
                  async def consume(Q):
                      while True:
                          n = await Q.get()
                          
                          print(f"Consumed :{n}")
                         
                         x = do_sometask_and_return_the_result(n)
                         print(f"Finished :{n} and Result: {x}")
                  
                  
                  async def main(loop):
                      Q = asyncio.Queue(loop=loop, maxsize=3)
                      await asyncio.wait([produce(Q, 10), consume(Q), consume(Q), consume(Q)])
                      print("Done")
                  

                  这里生产者产生数据并将其放入异步队列.我有多个消费者来消费和处理数据.在查看输出时,在打印Consumed :{n}"时保持顺序.(如 1,2,3,4... 等等),这完全没问题.但是,由于函数 do_sometask_and_return_the_result(n) 需要可变时间来返回结果,因此在 n Finished :{n}"的下一次打印中不会保持顺序.(如 2,1,4,3,5,...).

                  Here the producer produces data and puts it into the asyncio Queue. I have multiple consumers to consume and process the data. While seeing the outputs, the order is maintained while printing "Consumed :{n}" (as in 1,2,3,4... and so on) , this is completely fine. but, since the function do_sometask_and_return_the_result(n) takes variable time to return the result, the order is not maintained in the next print of n "Finished :{n}" (as in 2,1,4,3,5,...).

                  有什么方法可以同步这些数据,因为我需要保持打印结果的顺序?即使在 do_sometask_and_return_the_result(n) 之后,我也希望看到 1,2,3,4,.. 的 'n' 连续打印.

                  Is there any way to synchronize this data as I need to maintain the order of printing the results? I want to see 1,2,3,4,.. sequential prints for 'n' even after do_sometask_and_return_the_result(n).

                  推荐答案

                  您可以使用优先队列系统(使用 python heapq 库)在作业完成后重新排序作业.像这样.

                  You could use a priority queue system (using the python heapq library) to reorder your jobs after they are complete. Something like this.

                  # add these variables at class/global scope
                  priority_queue = []
                  current_job_id = 1
                  job_id_dict = {}
                  
                  async def produce(Q, n_jobs):
                      # same as above
                  
                  async def consume(Q):
                      while True:
                          n = await Q.get()
                          
                          print(f"Consumed :{n}")
                         
                         x = do_sometask_and_return_the_result(n)
                         await process_result(n, x)
                  
                  
                  async def process_result(n, x):
                      heappush(priority_queue, n)
                      job_id_dict[n] = x
                      while current_job_id == priority_queue[0]:
                          job_id = heappop(priority_queue)
                          print(f"Finished :{job_id} and Result: {job_id_dict[job_id]}")
                          current_job_id += 1
                       
                  
                  
                  async def main(loop):
                      Q = asyncio.Queue(loop=loop, maxsize=3)
                      await asyncio.wait([produce(Q, 10), consume(Q), consume(Q), consume(Q)])
                      print("Done")
                  
                  

                  有关 heapq 模块的更多信息:https://docs.python.org/3/library/heapq.html

                  For more information on the heapq module: https://docs.python.org/3/library/heapq.html

                  这篇关于同步异步队列的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持跟版网!

                  上一篇:Python CGI 队列 下一篇:需要帮助 Discord 机器人队列

                  相关文章

                    <bdo id='pQJ7X'></bdo><ul id='pQJ7X'></ul>
                  <tfoot id='pQJ7X'></tfoot>

                  <small id='pQJ7X'></small><noframes id='pQJ7X'>

                  1. <legend id='pQJ7X'><style id='pQJ7X'><dir id='pQJ7X'><q id='pQJ7X'></q></dir></style></legend>
                    <i id='pQJ7X'><tr id='pQJ7X'><dt id='pQJ7X'><q id='pQJ7X'><span id='pQJ7X'><b id='pQJ7X'><form id='pQJ7X'><ins id='pQJ7X'></ins><ul id='pQJ7X'></ul><sub id='pQJ7X'></sub></form><legend id='pQJ7X'></legend><bdo id='pQJ7X'><pre id='pQJ7X'><center id='pQJ7X'></center></pre></bdo></b><th id='pQJ7X'></th></span></q></dt></tr></i><div id='pQJ7X'><tfoot id='pQJ7X'></tfoot><dl id='pQJ7X'><fieldset id='pQJ7X'></fieldset></dl></div>