[Tutor] Race condition, Pycharm Handling or something else with Queues and multiprocessing.ThreadPool?
Balthazar
StrategicWiseKing at protonmail.com
Fri Nov 15 09:08:58 EST 2024
Hi, I've been trying to understand ThreadPool and am having a funny time with it. The code runs through partly once and then finishes. However it is meant to repeatedly keep looping. The only way I've seem to get it to work is by running the code in debug on Pycharm and Stepping Over each line.
So why do I think it is a race condition? Because there are still items in the queue before it exits but visually everything seems to be in order, so this becomes a lack of knowledge on my part to understand what is really happening.
---- start here---
import logging
from queue import Queue, Empty
from multiprocessing.pool import ThreadPool
from multiprocessing import Event
import time
class MultithreadedPoolAndQueue:
def __init__(self, available_modules):
self.logger = logging.getLogger("Test_Program.MultithreadedPoolAndQueue")
self.available_modules = available_modules
self.logger.info("Multithreaded Pool and Queue Management Initialised")
self.queue = Queue()
self.pool = ThreadPool(processes=len(self.available_modules))
self.all_tasks_completed = Event()
def unpack_results(self,results):
filtered_results = []
for key, val in results.items():
if isinstance(val, list):
for item in val:
filtered_results.append({key: item})
else:
filtered_results.append({key: val})
return filtered_results
def fill_queue(self, data):
for d in data:
# Each data will be {DATA_TYPE: Data}
self.queue.put(d)
def parse_results(self, results):
"""
This function will need the filtered results. It will put the data onto the queue like this: {DATA_TYPE: Data}
:param results: Results from function. :return:
""" self.logger.info(f"Entering parse_results with: {results}")
if results is None:
pass
else:
unpacked_results = self.unpack_results(results)
for result in unpacked_results:
self.logger.info(f"Result added: {result}")
self.queue.put(result)
def error_call(self,e):
self.logger.error(e)
pass
def thread_worker(self):
max_check_count = 10
check = 0
loop_around = 0
while self.queue.empty() is False or loop_around < 2 and check < max_check_count:
try:
item = self.queue.get_nowait()
# Assuming item is in the format {data_type: data}
data_type = next(iter(item)) # Get the first (and only) key from the dictionary
query = item[data_type]
# Then for each module in available modules, apply an asynchronous system
for module in self.available_modules:
for module_name, module_class in module.items():
# Check if the module can handle this data type
if data_type in module_class.accepted_data:
module_instance = module_class()
self.pool.apply_async(module_instance.event,args=(query, data_type),callback=self.parse_results,error_callback=self.error_call)
loop_around = 0
check = 0 # Reset check counter on successful item processing
except Empty:
# If the queue is empty, wait for a short time before checking again
time.sleep(0.1)
check += 1
loop_around += 1
except Exception as e:
self.logger.exception(e)
loop_around += 1
# Wait for all async tasks to complete
self.pool.close()
self.pool.join()
self.all_tasks_completed.set()
def wait_for_completion(self):
self.all_tasks_completed.wait()
self.logger.info(f"Queue item: {self.queue.get()}")
self.logger.info("All tasks have been completed.")
---- end here---
Here's how I've been using the code.
---- start here ----
threaded_core = worker_core.MultithreadedPoolAndQueue(self.available_modules)
threaded_core.fill_queue(self.targets)
threaded_core.thread_worker()
threaded_core.wait_for_completion()
---- end here ----
If you require more code then let me know, thank you for any advice in advance.
More information about the Tutor
mailing list