<small id='g89gW'></small><noframes id='g89gW'>

      <bdo id='g89gW'></bdo><ul id='g89gW'></ul>

  1. <tfoot id='g89gW'></tfoot>
    <i id='g89gW'><tr id='g89gW'><dt id='g89gW'><q id='g89gW'><span id='g89gW'><b id='g89gW'><form id='g89gW'><ins id='g89gW'></ins><ul id='g89gW'></ul><sub id='g89gW'></sub></form><legend id='g89gW'></legend><bdo id='g89gW'><pre id='g89gW'><center id='g89gW'></center></pre></bdo></b><th id='g89gW'></th></span></q></dt></tr></i><div id='g89gW'><tfoot id='g89gW'></tfoot><dl id='g89gW'><fieldset id='g89gW'></fieldset></dl></div>

      <legend id='g89gW'><style id='g89gW'><dir id='g89gW'><q id='g89gW'></q></dir></style></legend>

    1. 消耗看门狗队列事件的 Python 并行线程

      时间:2023-09-29
    2. <small id='JvcDu'></small><noframes id='JvcDu'>

        • <bdo id='JvcDu'></bdo><ul id='JvcDu'></ul>
              <tbody id='JvcDu'></tbody>

            1. <legend id='JvcDu'><style id='JvcDu'><dir id='JvcDu'><q id='JvcDu'></q></dir></style></legend>

              <i id='JvcDu'><tr id='JvcDu'><dt id='JvcDu'><q id='JvcDu'><span id='JvcDu'><b id='JvcDu'><form id='JvcDu'><ins id='JvcDu'></ins><ul id='JvcDu'></ul><sub id='JvcDu'></sub></form><legend id='JvcDu'></legend><bdo id='JvcDu'><pre id='JvcDu'><center id='JvcDu'></center></pre></bdo></b><th id='JvcDu'></th></span></q></dt></tr></i><div id='JvcDu'><tfoot id='JvcDu'></tfoot><dl id='JvcDu'><fieldset id='JvcDu'></fieldset></dl></div>
              <tfoot id='JvcDu'></tfoot>
              • 本文介绍了消耗看门狗队列事件的 Python 并行线程的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着跟版网的小编来一起学习吧!

                问题描述

                每当外部程序 (TCPdump) 在我的目录中创建 *.pcap 文件时,我有这段代码应该将事件放入队列中.我的问题是我总是得到一个空队列,尽管我从 process() 函数得到了打印.

                I have this code that should put an event in a queue each time an external program (TCPdump) creates a *.pcap file in my directory. My problem is that I always get an empty queue, although I got the print from process() function.

                我做错了什么?队列是否正确定义并在两个类之间共享?

                What am I doing wrong? Is the queue correctly defined and shared between the two classes?

                编辑-----------------
                我可能明白为什么我有一个空队列,我认为这是因为我正在打印我在 Handler 类填充之前初始化的队列.我修改了我的代码并创建了两个应该使用同一个队列的进程,但现在执行卡在 queue.put() 并且线程 ReadPcapFiles() 停止运行.

                EDIT-----------------
                I maybe understood why I got an empty queue, I think it is because I'm printing the queue that I initialized before it gets filled by Handler class. I modified my code and created two processes that should consume the same queue, but now the execution stuck on queue.put() and the thread ReadPcapFiles() stop running.

                这里是更新的代码:

                import time
                import pyshark
                import concurrent.futures
                import threading
                import logging
                from queue import Queue
                from multiprocessing import Process
                from watchdog.observers import Observer, api
                from watchdog.events import PatternMatchingEventHandler
                
                class Handler(PatternMatchingEventHandler):
                    patterns = ["*.pcap", "*.pcapng"]
                
                    def __init__(self, queue):
                        PatternMatchingEventHandler.__init__(self)
                        self.queue = queue
                
                    def process(self, event):
                        #print(f'event type: {event.event_type}  path : {event.src_path}')   
                        self.queue.put(event.src_path)
                        logging.info(f"Storing message: {self.queue.qsize()}")
                        print("Producer queue: ", list(self.queue.queue))
                        #self.queue.get()
                
                    def on_created(self, event):
                        self.process(event)          
                
                
                def StartWatcher(watchdogq, event):
                    path = 'C:\...'
                    handler = Handler(watchdogq)
                    observer = Observer()
                    while not event.is_set():
                        observer.schedule(handler, path, recursive=False)
                        print("About to start observer")
                        observer.start()
                        try:
                            while True:
                                time.sleep(1)
                        except Exception as error:
                            observer.stop()
                            print("Error: " + str(error))
                        observer.join()
                
                
                def ReadPcapFiles(consumerq, event):
                    while not event.is_set() or not consumerq.empty():
                        print("Consumer queue: ", consumerq.get())
                        #print("Consumer queue: ", list(consumerq.queue))
                
                    # pcapfile = pyshark.FileCapture(self.queue.get())
                    #     for packet in pcapfile:
                    #         countPacket +=1 
                
                if __name__ == '__main__':
                    format = "%(asctime)s: %(message)s"
                    logging.basicConfig(format=format, level=logging.INFO,datefmt="%H:%M:%S")
                    logging.getLogger().setLevel(logging.DEBUG)
                
                    queue = Queue()
                    event = threading.Event()
                    with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor:
                        executor.submit(StartWatcher,queue, event)
                        executor.submit(ReadPcapFiles,queue, event)
                
                        time.sleep(0.1)
                        logging.info("Main: about to set event")
                        event.set()
                

                旧代码:

                import time
                from queue import Queue
                from watchdog.observers import Observer
                from watchdog.events import PatternMatchingEventHandler
                
                class Handler(PatternMatchingEventHandler):
                    patterns = ["*.pcap", "*.pcapng"]
                
                    def __init__(self, queue):
                        PatternMatchingEventHandler.__init__(self)
                        self.queue = queue
                
                    def process(self, event):
                        print(f'event type: {event.event_type}  path : {event.src_path}')   
                        self.queue.put(event.src_path)
                
                    def on_created(self, event):
                        self.process(event)
                
                class Watcher():
                    def __init__(self, path):
                        self.queue = Queue()
                        self.observer = Observer()
                        self.handler = Handler(self.queue)
                        self.path = path
                
                    def start(self): 
                        self.observer.schedule(self.handler, self.path, recursive=True)
                        self.observer.start()
                        try:
                            while True:
                                time.sleep(1)
                                self.queue.get()
                                print(list(self.queue.queue))
                        except Exception as error:
                            self.observer.stop()
                            print("Error: " + str(error))
                        self.observer.join()  
                
                if __name__ == '__main__':
                    watcher = Watcher('C:\...')
                    watcher.start()
                

                推荐答案

                这对我有用(我从 this回答,谢谢!)但请注意,我认为这是一种解决方法,所以如果有人对此有更好的解决方案或者可以更好地解释 Python 中这种行为的原因,请不要犹豫回答!

                This is working for me (I got the main idea from this answer, thanks!) but notice that I consider this a workaround, so if someone has a better solution to this or can better explain the reason of such behavior in Python, please do not hesitate to answer!

                我的猜测是我有两个主要问题:
                - 我正在另一个线程中启动看门狗进程(这以某种方式阻塞了我的队列消耗线程).
                - Python 线程确实不能真正并行工作,因此需要启动一个独立的进程.

                My guess is that I had two main problems:
                - I was starting Watchdog process inside another thread (and that was blocking somehow my queue consuming thread).
                - Python threading does not work really in parallel and therefore starting an independent process was necessary.

                这是我的代码:

                import time
                import pyshark
                import threading
                import logging
                import os
                from queue import Queue
                from multiprocessing import Process, Pool
                from watchdog.observers import Observer, api
                from watchdog.events import PatternMatchingEventHandler
                from concurrent.futures import ThreadPoolExecutor
                
                class Handler(PatternMatchingEventHandler):
                    patterns = ["*.pcap", "*.pcapng"]
                
                    def __init__(self, queue):
                        PatternMatchingEventHandler.__init__(self)
                        self.queue = queue
                
                    def process(self, event):  
                        self.queue.put(event.src_path)
                        logging.info(f"Storing message: {self.queue.qsize()}")
                        print("Producer queue: ", list(self.queue.queue))
                
                
                    def on_created(self, event):
                        #wait that the transfer of the file is finished before processing it
                        file_size = -1
                        while file_size != os.path.getsize(event.src_path):
                            file_size = os.path.getsize(event.src_path)
                            time.sleep(1)
                
                        self.process(event)         
                
                def ConsumeQueue(consumerq):
                    while True:
                        if not consumerq.empty(): 
                            pool = Pool()
                            pool.apply_async(ReadPcapFiles, (consumerq.get(), ))
                        else:    
                            time.sleep(1)
                
                def ReadPcapFiles(get_event):        
                    createdFile = get_event
                    print(f"This is my event in ReadPacapFile {createdFile}")
                
                    countPacket = 0
                    bandwidth = 0
                    pcapfile = pyshark.FileCapture(createdFile)
                    for packet in pcapfile:
                        countPacket +=1
                        bandwidth = bandwidth + int(packet.length)
                    print(f"Packet nr {countPacket}")
                    print(f"Byte per second {bandwidth}")
                
                
                if __name__ == '__main__':
                
                    format = "%(asctime)s: %(message)s"
                    logging.basicConfig(format=format, level=logging.INFO,datefmt="%H:%M:%S")
                    logging.getLogger().setLevel(logging.DEBUG)
                
                    queue = Queue()
                    path = 'C:\...'
                
                    worker = threading.Thread(target=ConsumeQueue, args=(queue, ), daemon=True)
                    print("About to start worker")
                    worker.start()
                
                    event_handler = Handler(queue)
                    observer = Observer()
                    observer.schedule(event_handler, path, recursive=False)
                    print("About to start observer")
                    observer.start()
                
                    try:
                        while True:
                            time.sleep(1)
                    except Exception as error:
                        observer.stop()
                        print("Error: " + str(error))
                    observer.join()
                

                这篇关于消耗看门狗队列事件的 Python 并行线程的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持跟版网!

                上一篇:向 Python 中的所有线程发送单向消息 下一篇:等效于带有工作“线程"的 asyncio.Queues

                相关文章

                  <legend id='OSwAF'><style id='OSwAF'><dir id='OSwAF'><q id='OSwAF'></q></dir></style></legend>

                  <tfoot id='OSwAF'></tfoot>
                1. <small id='OSwAF'></small><noframes id='OSwAF'>

                  1. <i id='OSwAF'><tr id='OSwAF'><dt id='OSwAF'><q id='OSwAF'><span id='OSwAF'><b id='OSwAF'><form id='OSwAF'><ins id='OSwAF'></ins><ul id='OSwAF'></ul><sub id='OSwAF'></sub></form><legend id='OSwAF'></legend><bdo id='OSwAF'><pre id='OSwAF'><center id='OSwAF'></center></pre></bdo></b><th id='OSwAF'></th></span></q></dt></tr></i><div id='OSwAF'><tfoot id='OSwAF'></tfoot><dl id='OSwAF'><fieldset id='OSwAF'></fieldset></dl></div>
                    • <bdo id='OSwAF'></bdo><ul id='OSwAF'></ul>