[Tutor] [Python 3] Threads status, join() and Semaphore queue

Cameron Simpson cs at cskk.id.au
Tue Nov 20 02:22:23 EST 2018


On 19Nov2018 23:52, Dimitar Ivanov <dimitarxivanov at gmail.com> wrote:
>I'm having a hard time getting my head around threads so I was hoping
>someone who has better understanding of their underlying functionality
>could lend me a helping hand, in particular how threads work with each
>other when using thread.join() and Semaphore set with maximum value. I'll
>try to keep it as clear and concise as possible, but please don't hesitate
>to ask if anything about my approach is unclear or, frankly, awful.
>
>I'm writing a script that performs a couple of I/O operations and CLI
>commands for each element in a list of IDs. The whole process takes a while
>and may vary based on the ID, hence the threading approach sounded like the
>best fit since next ID can start once space has freed up. I'm parsing an
>extract of my code below and will explain what I can't properly understand
>underneath.

I'm just going to scatter a few random remarks about your code in here 
with your listing before addressing your other queries lower down...

>file1.py
>---------
>ids = [<IDs listed here>]
>threadsPool = []
>for id in ids:

The name "id" is unfortunate as it conflicts with the id() builtin 
function. Maybe "element_id"? Wordier, but also more clear.

>  thread = threading.Thread(target=file2.runStuff, name=str(id), args=(id,
>))
>  threadsPool.append(thread)
>for thread in threadsPool:
>  thread.start()

You could start each thread right after creating it if you wished.

>for thread in threadsPool:
>  print(thread.enumerate())

"enumerate" is a function from the threading module, not a method of a 
Thread. So try:

  print(threading.enumerate())

Frankly I'm surprised that "thread.enumerate" works at all.

>  print("Queuing thread" + str(thread))
>  thread.join()
>
>file2.py
>----------
>queue = threading.Semaphore(2)

I'd be disinclined to call this a "queue", which usually implies a FIFO 
list of some variety: put things onto it, and pull things off it, 
usually first in first off. Maybe just "sem" or "thread_capacity" or 
something?

>def runStuff(id):
>  queue.acquire()
>  print("Lock acquired for " + str(id))
>  file3.doMoreStuff()
>  file4.evenMoreStuff()
>  queue.release()
>
>Onto my confusion - as long as I don't try to print information about the
>thread that's being queued or the total amount of threads using
>.enumerate(), the script is working absolutely flawlessly, each thread that
>doesn't have a lock is waiting until it acquires it and then moves on. I
>decided it'd be nice to be able to provide more information about which
>thread starts next and how many threads are active right now (each can take
>a different amount of time), however, when I tried to do that, my log was
>showing me some pretty funky output which at first made me believe I've
>messed up all my threads, example:
>
>
><<  2018-11-19 15:01:38,094 file2 [ID09] INFO - Lock acquired for
>ID09                 <---- this is from file2.py
>------ some time later and other logs in here ---------
>[<_MainThread(MainThread, started 140431033562880)>, <Thread(ID09, started
>140430614177536)>] <---- output from thread.enumerate(), file1.py
><<  2018-11-19 15:01:38,103 file1 [MainThread] DEBUG - Queuing thread -
><Thread(ID09, started 140430614177536)> <---- output from print() right
>after thread.enumerate()
>
>After some head scratching, I believe I've finally tracked down the reason
>for my confusion:
>
>The .start() loop starts the threads and the first 2 acquire a lock
>immediately and start running, later on the .join() queue puts the rest in
>waiting for lock, that's fine, what I didn't realize, of course, is that
>the .join() loop goes through threads that have already been instantly
>kicked off by the .start() loop (the first 2 threads since Semaphore allows
>2 locks) and then my print in that loop is telling me that those threads
>are being queued, except they aren't since they are already running, it's
>just my text is telling me that, since I wasn't smart enough to realize
>what's about to happen, as seen below:
>
><<  2018-11-19 15:01:33,094 file1.py [MainThread] DEBUG - Queuing thread -
><Thread(ID02, stopped 140430666626816)> <--- makes it clear the thread has
>already even finished

Yes. The .join() has NO EFFECT on the Thread itself: it doesn't start it 
or stop it. It just waits for the Thread to complete. So yes, your log 
message is misleading.

>Which finally gets me to my cry for help - I know I can't modify the
>threadsPool list to remove the threads already created on the fly, so I can
>have only the ones pending to be queued in the 2nd loop, but for the life
>of me I can't think of a proper way to try and extract some information
>about what threads are still going (or rather, have finished since
>thread.enumerate() shows both running and queued).

Well, what you'd probably _like_ is a way to be told about each Thread 
as it completes, and report them then. Which you can do using a Queue, 
getting each Thread to report its completion to the Queue as it happens.

Untested example:

  from queue import Queue
  q = Queue()
  threadsPool = []
  for id in ids:
    thread = threading.Thread(target=file2.runStuff, name=str(id), args=(id, q))
    threadsPool.append(thread)

and modify runStuff thus:

  def runStuff(id, q):
    sem.aquire()
    ...
    sem.release()
    q.put(id)

After the threads are started, collect completed ids:

  for count in range(len(ids)):
    id = q.get()
    print("completed work on id %r" % (id,))

You'll notice no .join() there. Getting the id off the queue "q" implies 
that the Thread has completed.

>I have the feeling I'm using a very wrong approach in trying to extract
>that information in the .join() loop, since it only goes back to it once a
>thread has finished, but at the same time it feels like the perfect timing.

You're collecting specific threads. If other threads complete earlier 
than that specific thread, they don't get reported immediately. You're 
reporting threads in the order you made them, not in the order they 
complete. Using a Queue and not worrying about the threads themselves 
lets you gets ids as they're done, in whatever order.

>And just in case you are wondering why I have my threads starting in
>file1.py and my Semaphore queue in file2.py, it's because I wanted to split
>the runStuff(id) function in a separate module due to its length. I don't
>know if it's a good way to do it, but thankfully the Python interpreter is
>smart enough to see through my ignorance.

I usually split modules based on function, not size. Put related things 
in the same module. Often in classes, but let us not go there yet.

If you want to get a little funky, separate the runStuff code which 
works on the id from the control code (the semaphore use). You could 
then run the queue collection in its own thread. Have the main control 
code also manage the semaphore:

  q = Queue()
  threadsPool = []
  for id in ids:
    thread = threading.Thread(target=file2.runStuff, name=str(id), args=(id, q))
    threadsPool.append(thread)

  collector = Thread(target=collect, args=(q, ids))
  collector.start()

  for thread in threadPool:
    sem.acquire()
    thread.start()

  # wait for collection to complete
  collector.join()

  def collect(q, ids):
    for count in range(len(ids)):
      id = q.get()
      sem.release()

so that you acquire the semaphore before starting each thread, and 
release the semaphore as threads complete and report their id values.

Because these things need to happen in parallel (acquire, start versus 
q.get, release) you run the collector in its own thread.

Does this clear anything up?

Cheers,
Cameron Simpson <cs at cskk.id.au>


More information about the Tutor mailing list