UPDATED: 9/5/2013 to use better Thread::Queue::dequeue_timed in lieu of sleep.
I’ve been doing a lot of large-system coding with Perl ithreads lately. This thread model has been scaling very well on very high-throughput operations in conjunction with well-placed yield() calls in the thread-handler. This isn’t rocket-science, but it handles some subtle gotchas encountered with other pool models I’ve seen described.
The below snippet creates a pair of queues- one for work, and one for return values. While you can get the return value of a thread by “joining” it, if we want our threads to be relatively immortal, the best way to process the returns is to shove them into a queue that the master thread can grab from (or ignore). Of course, you could put an object in the queue so that the work unit and the return value could be returned together.
We use some symbol that will never be a valid work unit- ‘EXIT’ in this example, as a key to flag the threads they can die. This method allows the threads to grind through their work at whatever pace they want – some work will inevitably be faster than others- and then finish gracefully.
Inside the thread, don’t disregard the use of eval {} blocks. If your thread handler calls out to another module, or relies on some other code body, and it faults, dies, etc. your worker thread will completely die and you’ll irrecoverably lose the work unit it was grinding on.
The method I show below is a simple rescheduler – it pops off the last ‘EXIT’, and replaces it with the failed work unit, and the removed ‘EXIT’. You may want to hand it off to a formal scheduler thread. Be aware of infinite work loops where a unit is invalid and just keeps getting requeued forever. You may want to install a signal handler that queues up ‘EXIT’ symbols, and/or dumps the work queue contents.
use threads; use Thread::Queue; my $THREADS=10; # Number of threads my $retq = Thread::Queue->new(); # Thread return values #(if you care about them) my $workq = Thread::Queue->new(); # Work to do $workq->enqueue(@stufftodo); # Queue up some work to do $workq->enqueue("EXIT") for(1..$THREADS); # And tell them when # they're done threads->create("Handle_Work") for(1..$THREADS); # Spawn our workers # Process returns while the the threads are running. # Alternatively, if we just want to wait until they're all done: # sleep 10 while threads->list(threads::running); while(threads->list(threads::running)){ # Blocking dequeue with 5-second timeout if (defined(my $data = $retq->dequeue_timed(5))) { # Work on $data } } # When we get here, there are no more running threads. # At this point we may want to take one more run through the # return queue, or do whatever makes sense. sub Handle_Work { while(my $todo=$workq->dequeue()) { last if $todo eq 'EXIT'; # All done # ...Do work here... # If that work might generate an error and cause the # thread to exit/die prematurely: eval { # do dangerous work here }; if($@) { # if we want to requeue this work to do later #(eg. a temporary failure) $workq->extract(-1); # Removes the last 'EXIT' from the queue $workq->enqueue($todo,"EXIT"); # queue back up this work unit, # and the 'EXIT' we stripped next; # Do the next thing } # ...Do more work here, perhaps... $retq->enqueue($returnValue); } # We're all done with this thread threads->detach; }
Thanks for the post, it is just what I have been looking for. A lot of the posts on the subject have not been real world based.
I’m modifying a directory monitoring script which I believe the sample you have provided will for a useful structure to work with.
Thanks
Chris
Hi Matthew,
The handling of the return jobs is dangerous. If a thread send something on the $retq and leaves while the join thread is in his sleep 10, you’ll lose the last result. You must repeat the queue, when you notice that the theads left.
With the new version of Thread::Queue, you can avoid the pending/sleep by using $retq->dequeue_timed([seconds])
Regards
Alex S.
Thanks for the pointer! I did a little testing to make sure it worked as expected, and updated the post.