[Tutor] [Python 3] Threads status, join() and Semaphore queue
Cameron Simpson
cs at cskk.id.au
Tue Nov 20 02:22:23 EST 2018
On 19Nov2018 23:52, Dimitar Ivanov <dimitarxivanov at gmail.com> wrote:
>I'm having a hard time getting my head around threads so I was hoping
>someone who has better understanding of their underlying functionality
>could lend me a helping hand, in particular how threads work with each
>other when using thread.join() and Semaphore set with maximum value. I'll
>try to keep it as clear and concise as possible, but please don't hesitate
>to ask if anything about my approach is unclear or, frankly, awful.
>
>I'm writing a script that performs a couple of I/O operations and CLI
>commands for each element in a list of IDs. The whole process takes a while
>and may vary based on the ID, hence the threading approach sounded like the
>best fit since next ID can start once space has freed up. I'm parsing an
>extract of my code below and will explain what I can't properly understand
>underneath.
I'm just going to scatter a few random remarks about your code in here
with your listing before addressing your other queries lower down...
>file1.py
>---------
>ids = [<IDs listed here>]
>threadsPool = []
>for id in ids:
The name "id" is unfortunate as it conflicts with the id() builtin
function. Maybe "element_id"? Wordier, but also more clear.
> thread = threading.Thread(target=file2.runStuff, name=str(id), args=(id,
>))
> threadsPool.append(thread)
>for thread in threadsPool:
> thread.start()
You could start each thread right after creating it if you wished.
>for thread in threadsPool:
> print(thread.enumerate())
"enumerate" is a function from the threading module, not a method of a
Thread. So try:
print(threading.enumerate())
Frankly I'm surprised that "thread.enumerate" works at all.
> print("Queuing thread" + str(thread))
> thread.join()
>
>file2.py
>----------
>queue = threading.Semaphore(2)
I'd be disinclined to call this a "queue", which usually implies a FIFO
list of some variety: put things onto it, and pull things off it,
usually first in first off. Maybe just "sem" or "thread_capacity" or
something?
>def runStuff(id):
> queue.acquire()
> print("Lock acquired for " + str(id))
> file3.doMoreStuff()
> file4.evenMoreStuff()
> queue.release()
>
>Onto my confusion - as long as I don't try to print information about the
>thread that's being queued or the total amount of threads using
>.enumerate(), the script is working absolutely flawlessly, each thread that
>doesn't have a lock is waiting until it acquires it and then moves on. I
>decided it'd be nice to be able to provide more information about which
>thread starts next and how many threads are active right now (each can take
>a different amount of time), however, when I tried to do that, my log was
>showing me some pretty funky output which at first made me believe I've
>messed up all my threads, example:
>
>
><< 2018-11-19 15:01:38,094 file2 [ID09] INFO - Lock acquired for
>ID09 <---- this is from file2.py
>------ some time later and other logs in here ---------
>[<_MainThread(MainThread, started 140431033562880)>, <Thread(ID09, started
>140430614177536)>] <---- output from thread.enumerate(), file1.py
><< 2018-11-19 15:01:38,103 file1 [MainThread] DEBUG - Queuing thread -
><Thread(ID09, started 140430614177536)> <---- output from print() right
>after thread.enumerate()
>
>After some head scratching, I believe I've finally tracked down the reason
>for my confusion:
>
>The .start() loop starts the threads and the first 2 acquire a lock
>immediately and start running, later on the .join() queue puts the rest in
>waiting for lock, that's fine, what I didn't realize, of course, is that
>the .join() loop goes through threads that have already been instantly
>kicked off by the .start() loop (the first 2 threads since Semaphore allows
>2 locks) and then my print in that loop is telling me that those threads
>are being queued, except they aren't since they are already running, it's
>just my text is telling me that, since I wasn't smart enough to realize
>what's about to happen, as seen below:
>
><< 2018-11-19 15:01:33,094 file1.py [MainThread] DEBUG - Queuing thread -
><Thread(ID02, stopped 140430666626816)> <--- makes it clear the thread has
>already even finished
Yes. The .join() has NO EFFECT on the Thread itself: it doesn't start it
or stop it. It just waits for the Thread to complete. So yes, your log
message is misleading.
>Which finally gets me to my cry for help - I know I can't modify the
>threadsPool list to remove the threads already created on the fly, so I can
>have only the ones pending to be queued in the 2nd loop, but for the life
>of me I can't think of a proper way to try and extract some information
>about what threads are still going (or rather, have finished since
>thread.enumerate() shows both running and queued).
Well, what you'd probably _like_ is a way to be told about each Thread
as it completes, and report them then. Which you can do using a Queue,
getting each Thread to report its completion to the Queue as it happens.
Untested example:
from queue import Queue
q = Queue()
threadsPool = []
for id in ids:
thread = threading.Thread(target=file2.runStuff, name=str(id), args=(id, q))
threadsPool.append(thread)
and modify runStuff thus:
def runStuff(id, q):
sem.aquire()
...
sem.release()
q.put(id)
After the threads are started, collect completed ids:
for count in range(len(ids)):
id = q.get()
print("completed work on id %r" % (id,))
You'll notice no .join() there. Getting the id off the queue "q" implies
that the Thread has completed.
>I have the feeling I'm using a very wrong approach in trying to extract
>that information in the .join() loop, since it only goes back to it once a
>thread has finished, but at the same time it feels like the perfect timing.
You're collecting specific threads. If other threads complete earlier
than that specific thread, they don't get reported immediately. You're
reporting threads in the order you made them, not in the order they
complete. Using a Queue and not worrying about the threads themselves
lets you gets ids as they're done, in whatever order.
>And just in case you are wondering why I have my threads starting in
>file1.py and my Semaphore queue in file2.py, it's because I wanted to split
>the runStuff(id) function in a separate module due to its length. I don't
>know if it's a good way to do it, but thankfully the Python interpreter is
>smart enough to see through my ignorance.
I usually split modules based on function, not size. Put related things
in the same module. Often in classes, but let us not go there yet.
If you want to get a little funky, separate the runStuff code which
works on the id from the control code (the semaphore use). You could
then run the queue collection in its own thread. Have the main control
code also manage the semaphore:
q = Queue()
threadsPool = []
for id in ids:
thread = threading.Thread(target=file2.runStuff, name=str(id), args=(id, q))
threadsPool.append(thread)
collector = Thread(target=collect, args=(q, ids))
collector.start()
for thread in threadPool:
sem.acquire()
thread.start()
# wait for collection to complete
collector.join()
def collect(q, ids):
for count in range(len(ids)):
id = q.get()
sem.release()
so that you acquire the semaphore before starting each thread, and
release the semaphore as threads complete and report their id values.
Because these things need to happen in parallel (acquire, start versus
q.get, release) you run the collector in its own thread.
Does this clear anything up?
Cheers,
Cameron Simpson <cs at cskk.id.au>
More information about the Tutor
mailing list