In Computer Science, a thread is defined as the smallest unit of execution with the independent set of instructions. In simple terms, it is a separate flow of instruction. The advantage of threading is that it allows a user to run different parts of the program in a concurrent manner and make the design of the program simpler.
During threading, different processors run on a single program and each one of them performs an independent task simultaneously. However, if you want to perform multiprocessing, then you need to execute your code in a different language or use the multiprocessing module.
In the CPython implementation of Python, interactions are made with the Global Interpreter Lock (GIL) which always limits one Python thread to run at a time. In threading, good candidates are considered those who spend much of their time waiting for external events. These are all true in the case when the code is written in Python. However, in the case of threading in C other than Python, they have the ability to release GIL and run in a concurrent manner.
Basically, building up your program to use threading will help to make the design clearer and easier to reason about. Let us see how to start a thread in Python.
The Python Standard Library contains a module named threading which comprises all the basics needed to understand the process of threading better. By this module, you can easily encapsulate threads and provide a clean interface to work with them.
If you want to start a thread, first you need to create a Thread instance and then implement .start():
import logging import threading import time def thread_func(name): logging.info("Thread %s: starting...",name) time.sleep(2) logging.info("Thread %s: finishing...",name) if __name__ == "__main__": format = "%(asctime)s: %(message)s" logging.basicConfig(format=format,level=logging.INFO, datefmt="%H:%M:%S") logging.info("Main : before creating thread...") t = threading.Thread(target=thread_function,args=(1,)) logging.info("Main : before running thread...") t.start() logging.info("Main : wait for the thread to finish...") # t.join() logging.info("Main : all done...")
It is observable that the main section is responsible for creating and initiating the thread:
t = threading.Thread(target=thread_function, args=(1,)) t.start()
When a Thread is created, a function and a list of arguments to that function are passed. In the example above, thread_function() is being run and 1 is passed as an argument. The function, however, simply logs messages with a time.sleep() in between them.
The output of the code above will be displayed as:
$ ./single_thread.py Main : before creating thread... Main : before running thread... Thread 1: starting... Main : wait for the thread to finish... Main : all done... Thread 1: finishing...
The Thread gets finished only after the Main section of the code.
In terms of computer science, a daemon is a computer program that runs as a background process. It is basically a thread that runs in the background without worrying about shutting it down. A daemon thread will shut down immediately when the program terminates. However, if a program is running non-Daemon threads, then the program will wait for those threads to complete before it ends.
In the example code above, you might have noticed that there is a pause of about 2 seconds after the main function has printed the all done message and before the thread is finished. This is because Python waits for the non-daemonic thread to complete.
threading.shutdown() goes through all of the running threads and calls .join on every non-daemonic thread. You can understand it better if you look at the source of Python threading.
Let us the example we did before with a daemon thread by adding the daemon=True flag:
t = threading.Thread(target=thread_function, args=(1,),daemon=True)
Now if you run your program, the output will be as follows:
$ ./daemon_thread.py Main : before creating thread... Main : before running thread... Thread 1: starting... Main : wait for the thread to finish... Main : all done...
The basic difference here is that the final line of output is missing. This is because when the main function reached the end of code, the daemon was killed.
The process of executing multiple threads in a parallel manner is called multithreading. It enhances the performance of the program and Python multithreading is quite easy to learn.
Let us start understanding multithreading using the example we used earlier:
import logging import threading import time def thread_func(name): logging.info("Thread %s: starting...", name) time.sleep(2) logging.info("Thread %s: finishing...", name) if __name__ == "__main__": format = "%(asctime)s: %(message)s" logging.basicConfig(format=format,level=logging.INFO, datefmt="%H:%M:%S") multiple_threads = list() for index in range(3): logging.info("Main : create and start thread %d...",index) t = threading.Thread(target=thread_function,args=(index,)) threads.append(x) t.start() for index, thread in enumerate(multiple_threads): logging.info("Main : before joining thread %d...",index) thread.join() logging.info("Main : thread %d done...",index)
This code will work in the same way as it was in the process to start a thread. First, we need to create a Thread object and then call the .start() object. The program then keeps a list of Thread objects. It then waits for them using .join(). If we run this code multiple times, the output will be as below:
$ ./multiple_threads.py Main : create and start thread 0... Thread 0: starting... Main : create and start thread 1... Thread 1: starting... Main : create and start thread 2... Thread 2: starting... Main : before joining thread 0... Thread 2: finishing... Thread 1: finishing... Thread 0: finishing... Main : thread 0 done... Main : before joining thread 1... Main : thread 1 done... Main : before joining thread 2... Main : thread 2 done...
The threads are sequenced in the opposite order in this example. This is because multithreading generates different orderings. The Thread x: finishing message informs when each of the thread is done. The thread order is determined by the operating system, so it is essential to know the algorithm design that uses the threading process.
Using a ThreadpoolExecutor is an easier way to start up a group of threads. It is contained in the Python Standard Library in concurrent.futures. You can create it as a context manager using the help of with statement. It will help in managing and destructing the pool.
Example to illustrate a ThreadpoolExecutor (only the main section):
import concurrent.futures if __name__ == "__main__": format = "%(asctime)s: %(message)s" logging.basicConfig(format=format,level=logging.INFO, datefmt="%H:%M:%S") with concurrent.futures.ThreadPoolExecutor(max_workers=3) asexecutor: executor.map(thread_function,range(3))
The code above creates a ThreadpoolExecutor and informs how many worker threads it needs in the pool and then .map() is used to iterate through a list of things. When the with block ends, .join() is used on each of the threads in the pool. It is recommended to use ThreadpoolExecutor whenever possible so that you never forget to .join() the threads.
The output of the code will look as follows:
$ ./executor.py Thread 0: starting... Thread 1: starting... Thread 2: starting... Thread 1: finishing... Thread 0: finishing... Thread 2: finishing…
When multiple threads try to access a shared piece of data or resource, race conditions occur. Race conditions produce results that are confusing for a user to understand and it occurs rarely and is very difficult to debug.
Let us try to understand a race condition using a class with a false database:
class FalseDatabase: def race(self): self.value = 0 def update(self,name): logging.info("Thread %s: starting update...",name) local_copy_value = self.value local_copy_value += 1 time.sleep(0.1) self.value = local_copy_value logging.info("Thread %s: finishing update...",name)
The class FalseDatabase holds the shared data value on which the race condition will occur. The function race simply intializes .value to zero.
The work of .update() is to analyze a database, perform some computation and then rewrite a value to the database. However, reading from the database means just copying .value to a local variable. Computation means adding a single value and then .sleep() for a little bit and then the value is written back by copying the local value back to .value().
The main section of FalseDatabase:
if __name__ == "__main__": format = "%(asctime)s: %(message)s" logging.basicConfig(format=format, level=logging.INFO, datefmt="%H:%M:%S") dtb = FalseDatabase() logging.info("Testing update. Starting value is %d...",dtb.value) with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor: for index in range(2): executor.submit(dtb.update,index) logging.info("Testing update. Ending value is %d...", dtb.value)
The programs create a ThreadPoolExecutor with two threads and calls .submit()and then runs database.update().
.submit() contains two arguments: both positional and named arguments are passed to the function running in the thread:
.submit(function, *args, **kwargs)
The output will look like as follows:
$ ./racecond.py Testing unlocked update... Starting value is 0... Thread 0: starting update... Thread 1: starting update... Thread 0: finishing update... Thread 1: finishing update... Testing unlocked update... Ending value is 1...
In this section, we would be discussing how threads work in a simplified manner.
When the ThreadPoolExecutor is informed to run each thread, we are basically telling it to which function to run and what are the parameters to be passed: executor.submit(database.update, index). This will allow each thread in the pool to call the executor.submit(index). The database is a reference to the FalseDatabase object that was created in main function.
Each of the threads will have a reference to the database and also a unique index value which will make the log statements readable. The thread contains its own version of all the data local to the function. This is called local_copy in case of .update(). This is an advantage that allows all the local variables to a function to be thread-safe.
If we consider the race condition again, the two threads will run concurrently. They will each point to the same object database and will have their own version of local_copy. The database object will be the reason for the problems.
The program will start with Thread 1 running .update() and then the thread will call time.sleep() and allows other threads to take its place and start running. Now Thread 2 performs all the same operations just like Thread 1. It also copies database.value into its local_copy but database.value does not get updated.
Now when Thread 2 ends, the shared database.value still contains zero and both versions of local_copy have the value one. Finally, Thread 1 again wakes up and it terminates by saving its local_copy which gives a chance to Thread 2 to run. On the other hand, Thread 2 is unaware of Thread 1 and the updated database.value. Thread 2 also then stores its version of local_copy into database.value.
The race condition occurs here in the sense that Thread 1 and Thread 2 have interleaving access to a single shared object and they overwrite each other’s results. Race condition can also occur when one thread releases memory or closes a file handle before the work of another thread.
You can solve race conditions with the help of Lock. A Lock is an object that acts like a hall pass which will allow only one thread at a time to enter the read-modify-write section of the code. If any other thread wants to enter at the same time, it has to wait until the current owner of the Lock gives it up.
The basic functions are .acquire() and .release(). A thread will call my_lock.acquire() to get the Lock. However, this thread will have to wait if the Lock is held by another thread until it releases it.
The Lock in Python also works as a context manager and can be used within a with statement and will be released automatically with the exit of with block. Let us take the previous FalseDatabase class and add Lock to it:
class FalseDatabase: def race(self): self.value = 0 self._lock = threading.Lock() def locked_update(self, name): logging.info("Thread %s: starting update...",name) logging.debug("Thread %s about to lock...",name) with self._lock: logging.debug("Thread %s has lock...",name) local_copy = self.value local_copy += 1 time.sleep(0.1) self.value = local_copy logging.debug("Thread %s about to release lock...",name) logging.debug("Thread %s after release...",name) logging.info("Thread %s: finishing update...",name)
._lock is a part of the threading.Lock() object and is initialized in the unlocked state and later released with the help of with statement.
The output of the code above with logging set to warning level will be as follows:
$ ./fixingracecondition.py Testing locked update. Starting value is 0. Thread 0: starting update... Thread 1: starting update... Thread 0: finishing update... Thread 1: finishing update... Testing locked update. Ending value is 2.
The output of the code with full logging by setting the level to DEBUG:
$ ./fixingracecondition.py Testing locked update. Starting value is 0. Thread 0: starting update... Thread 0 about to lock... Thread 0 has lock... Thread 1: starting update... Thread 1 about to lock... Thread 0 about to release lock... Thread 0 after release... Thread 0: finishing update... Thread 1 has lock... Thread 1 about to release lock... Thread 1 after release... Thread 1: finishing update... Testing locked update. Ending value is 2.
The Lock provides a mutual exclusion between the threads.
In Computer Science, the Producer-Consumer Threading Problem is a classic example of a multi-process synchronization problem.
Consider a program that has to read messages and write them to disk. It will listen and accept messages as they coming in bursts and not at regular intervals. This part of the program is termed as the producer.
On the other hand, you need to write the message to the database once you have it. This database access is slow because of bursts of messages coming in. This part of the program is called the consumer.
A pipeline has to be created between the producer and consumer that will act as the changing part as you gather more knowledge about various synchronization objects.
The basic design is a producer thread that will read from a false network and put the message into the pipeline:
import random Sentinel = object() def producer(pipeline): """Pretend we're getting a message from the network.""" for index in range(10): msg = random.randint(1,101) logging.info("Producer got message: %s",msg) pipeline.set_msg(msg,"Producer") # Send a sentinel message to tell consumer we're done pipeline.set_msg(SENTINEL,"Producer")
The producer gets a random number between 1 and 100 and calls the .set_message() on the pipeline to send it to the consumer:
def consumer(pipeline): """Pretend we're saving a number in the database.""" msg = 0 while msg is not Sentinel: msg = pipeline.get_msg("Consumer") if msg is not Sentinel: logging.info("Consumer storing message: %s",msg)
The consumer reads a message from the pipeline and displays the false database.
The main section of the section is as follows:
if __name__ == "__main__": format = "%(asctime)s: %(message)s" logging.basicConfig(format=format,level=logging.INFO, datefmt="%H:%M:%S") # logging.getLogger().setLevel(logging.DEBUG) pipeline = Pipeline() with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor: executor.submit(producer, pipeline) executor.submit(consumer, pipeline)
Now let us see the code of Pipeline that will pass messages from the producer to consumer:
class Pipeline: """Class to allow a single element pipeline between producer and consumer.""" def pipeline_message(self): self.msg = 0 self.producer_lock = threading.Lock() self.consumer_lock = threading.Lock() self.consumer_lock.acquire() def get_msg(self, name): logging.debug("%s:about to acquire getlock...",name) self.consumer_lock.acquire() logging.debug("%s:have getlock...",name) msg = self.msg logging.debug("%s:about to release setlock...",name) self.producer_lock.release() logging.debug("%s:setlock released...",name) return msg def set_msg(self, msg, name): logging.debug("%s:about to acquire setlock...",name) self.producer_lock.acquire() logging.debug("%s:have setlock...",name) self.msg=msg logging.debug("%s:about to release getlock...",name) self.consumer_lock.release() logging.debug("%s:getlock released...", name)
The members of Pipeline are:
The function pipeline_message initializes the three members and then calls .acquire() on the .consumer_lock. Now the producer has the allowance to add a message and the consumer has to wait until the message is present.
.get_msg calls .acquire on the consumer_lock and then the consumer copies the value in .msg and then calls .release() on the .producer_lock. After the lock is released, the producer can insert the message into the pipeline. Now the producer will call the .set_msg() and it will acquire the .producer_lock and set the .msg and then the lock is released and the consumer can read the value.
The output of the code with the logging set to WARNING:
$ ./producerconsumer_lock.py Producer got data 43 Producer got data 45 Consumer storing data: 43 Producer got data 86 Consumer storing data: 45 Producer got data 40 Consumer storing data: 86 Producer got data 62 Consumer storing data: 40 Producer got data 15 Consumer storing data: 62 Producer got data 16 Consumer storing data: 15 Producer got data 61 Consumer storing data: 16 Producer got data 73 Consumer storing data: 61 Producer got data 22 Consumer storing data: 73 Consumer storing data: 22
Python consists of few more threading modules which can be handy to use in different cases. Some of which are discussed below.
A semaphore is a counter module with few unique properties. The first property is that its counting is atomic which means that the operating system will not swap the thread while incrementing or decrementing the counter. The internal counter increments when .release() is called and decremented when .acquire() is called.
The other property is that if a thread calls .acquire() while the counter is zero, then the thread will be blocked until another thread calls .release().
The main work of semaphores is to protect a resource having a limited capacity. It is used in cases where you have a pool of connections and you want to limit the size of the pool to a particular number.
The Timer module is used to schedule a function that is to be called after a certain amount of time has passed. You need to pass a number of seconds to wait and a function to call to create a Timer:
t = threading.Timer(20.0,my_timer_function)
The timer is started by calling the .start function and you can stop it by calling .cancel(). A Timer prompts for action after a particular amount of time.
In this article we have covered most of the topics associated with threading in Python. We have discussed:
We hope you are now well aware of Python threading and how to build threaded programs and the problems they approach to solve. You have also gained knowledge of the problems that arise when writing and debugging different types of threaded programs.
For more information about threading and its uses in the real-world applications, you may refer to the official documentation of Python threading. To gain more knowledge about Python tips and tricks, check our Python tutorial and get a good hold over coding in Python by joining the Python certification course.
Your email address will not be published. Required fields are marked *