[Tutor] [Python 3] Threads status, join() and Semaphore queue

Dimitar Ivanov dimitarxivanov at gmail.com
Sat Nov 24 11:08:32 EST 2018


Hi Cameron,

Massive apologies for the delayed answer!

Your explanation definitely clears up quite a bit of my misunderstanding,
thank you for that!

There was a reason why I shy away from using Queue, but for the life of me
I can't remember right now what that reason was. I will have to modify my
code using your example and give it another try, I will make sure to let
you know if I run into any issues or additional questions. :)

Regards,
Dimitar

On Tue, 20 Nov 2018 at 08:39, Cameron Simpson <cs at cskk.id.au> wrote:

> On 19Nov2018 23:52, Dimitar Ivanov <dimitarxivanov at gmail.com> wrote:
> >I'm having a hard time getting my head around threads so I was hoping
> >someone who has better understanding of their underlying functionality
> >could lend me a helping hand, in particular how threads work with each
> >other when using thread.join() and Semaphore set with maximum value. I'll
> >try to keep it as clear and concise as possible, but please don't hesitate
> >to ask if anything about my approach is unclear or, frankly, awful.
> >
> >I'm writing a script that performs a couple of I/O operations and CLI
> >commands for each element in a list of IDs. The whole process takes a
> while
> >and may vary based on the ID, hence the threading approach sounded like
> the
> >best fit since next ID can start once space has freed up. I'm parsing an
> >extract of my code below and will explain what I can't properly understand
> >underneath.
>
> I'm just going to scatter a few random remarks about your code in here
> with your listing before addressing your other queries lower down...
>
> >file1.py
> >---------
> >ids = [<IDs listed here>]
> >threadsPool = []
> >for id in ids:
>
> The name "id" is unfortunate as it conflicts with the id() builtin
> function. Maybe "element_id"? Wordier, but also more clear.
>
> >  thread = threading.Thread(target=file2.runStuff, name=str(id), args=(id,
> >))
> >  threadsPool.append(thread)
> >for thread in threadsPool:
> >  thread.start()
>
> You could start each thread right after creating it if you wished.
>
> >for thread in threadsPool:
> >  print(thread.enumerate())
>
> "enumerate" is a function from the threading module, not a method of a
> Thread. So try:
>
>   print(threading.enumerate())
>
> Frankly I'm surprised that "thread.enumerate" works at all.
>
> >  print("Queuing thread" + str(thread))
> >  thread.join()
> >
> >file2.py
> >----------
> >queue = threading.Semaphore(2)
>
> I'd be disinclined to call this a "queue", which usually implies a FIFO
> list of some variety: put things onto it, and pull things off it,
> usually first in first off. Maybe just "sem" or "thread_capacity" or
> something?
>
> >def runStuff(id):
> >  queue.acquire()
> >  print("Lock acquired for " + str(id))
> >  file3.doMoreStuff()
> >  file4.evenMoreStuff()
> >  queue.release()
> >
> >Onto my confusion - as long as I don't try to print information about the
> >thread that's being queued or the total amount of threads using
> >.enumerate(), the script is working absolutely flawlessly, each thread
> that
> >doesn't have a lock is waiting until it acquires it and then moves on. I
> >decided it'd be nice to be able to provide more information about which
> >thread starts next and how many threads are active right now (each can
> take
> >a different amount of time), however, when I tried to do that, my log was
> >showing me some pretty funky output which at first made me believe I've
> >messed up all my threads, example:
> >
> >
> ><<  2018-11-19 15:01:38,094 file2 [ID09] INFO - Lock acquired for
> >ID09                 <---- this is from file2.py
> >------ some time later and other logs in here ---------
> >[<_MainThread(MainThread, started 140431033562880)>, <Thread(ID09, started
> >140430614177536)>] <---- output from thread.enumerate(), file1.py
> ><<  2018-11-19 15:01:38,103 file1 [MainThread] DEBUG - Queuing thread -
> ><Thread(ID09, started 140430614177536)> <---- output from print() right
> >after thread.enumerate()
> >
> >After some head scratching, I believe I've finally tracked down the reason
> >for my confusion:
> >
> >The .start() loop starts the threads and the first 2 acquire a lock
> >immediately and start running, later on the .join() queue puts the rest in
> >waiting for lock, that's fine, what I didn't realize, of course, is that
> >the .join() loop goes through threads that have already been instantly
> >kicked off by the .start() loop (the first 2 threads since Semaphore
> allows
> >2 locks) and then my print in that loop is telling me that those threads
> >are being queued, except they aren't since they are already running, it's
> >just my text is telling me that, since I wasn't smart enough to realize
> >what's about to happen, as seen below:
> >
> ><<  2018-11-19 15:01:33,094 file1.py [MainThread] DEBUG - Queuing thread -
> ><Thread(ID02, stopped 140430666626816)> <--- makes it clear the thread has
> >already even finished
>
> Yes. The .join() has NO EFFECT on the Thread itself: it doesn't start it
> or stop it. It just waits for the Thread to complete. So yes, your log
> message is misleading.
>
> >Which finally gets me to my cry for help - I know I can't modify the
> >threadsPool list to remove the threads already created on the fly, so I
> can
> >have only the ones pending to be queued in the 2nd loop, but for the life
> >of me I can't think of a proper way to try and extract some information
> >about what threads are still going (or rather, have finished since
> >thread.enumerate() shows both running and queued).
>
> Well, what you'd probably _like_ is a way to be told about each Thread
> as it completes, and report them then. Which you can do using a Queue,
> getting each Thread to report its completion to the Queue as it happens.
>
> Untested example:
>
>   from queue import Queue
>   q = Queue()
>   threadsPool = []
>   for id in ids:
>     thread = threading.Thread(target=file2.runStuff, name=str(id),
> args=(id, q))
>     threadsPool.append(thread)
>
> and modify runStuff thus:
>
>   def runStuff(id, q):
>     sem.aquire()
>     ...
>     sem.release()
>     q.put(id)
>
> After the threads are started, collect completed ids:
>
>   for count in range(len(ids)):
>     id = q.get()
>     print("completed work on id %r" % (id,))
>
> You'll notice no .join() there. Getting the id off the queue "q" implies
> that the Thread has completed.
>
> >I have the feeling I'm using a very wrong approach in trying to extract
> >that information in the .join() loop, since it only goes back to it once a
> >thread has finished, but at the same time it feels like the perfect
> timing.
>
> You're collecting specific threads. If other threads complete earlier
> than that specific thread, they don't get reported immediately. You're
> reporting threads in the order you made them, not in the order they
> complete. Using a Queue and not worrying about the threads themselves
> lets you gets ids as they're done, in whatever order.
>
> >And just in case you are wondering why I have my threads starting in
> >file1.py and my Semaphore queue in file2.py, it's because I wanted to
> split
> >the runStuff(id) function in a separate module due to its length. I don't
> >know if it's a good way to do it, but thankfully the Python interpreter is
> >smart enough to see through my ignorance.
>
> I usually split modules based on function, not size. Put related things
> in the same module. Often in classes, but let us not go there yet.
>
> If you want to get a little funky, separate the runStuff code which
> works on the id from the control code (the semaphore use). You could
> then run the queue collection in its own thread. Have the main control
> code also manage the semaphore:
>
>   q = Queue()
>   threadsPool = []
>   for id in ids:
>     thread = threading.Thread(target=file2.runStuff, name=str(id),
> args=(id, q))
>     threadsPool.append(thread)
>
>   collector = Thread(target=collect, args=(q, ids))
>   collector.start()
>
>   for thread in threadPool:
>     sem.acquire()
>     thread.start()
>
>   # wait for collection to complete
>   collector.join()
>
>   def collect(q, ids):
>     for count in range(len(ids)):
>       id = q.get()
>       sem.release()
>
> so that you acquire the semaphore before starting each thread, and
> release the semaphore as threads complete and report their id values.
>
> Because these things need to happen in parallel (acquire, start versus
> q.get, release) you run the collector in its own thread.
>
> Does this clear anything up?
>
> Cheers,
> Cameron Simpson <cs at cskk.id.au>
>


More information about the Tutor mailing list