SE205: Lab on Design Patterns for Concurrent Systems in POSIX

Laurent Pautet (pautet@telecom-paristech.fr)

Index


1 Lab on Design Patterns for Concurrent Systems in POSIX

Objectives

This lab aims to discover the asynchronous execution model proposed by Java and to implement it in C. The model consists in defining objects of the Runnable or Callable type and having them executed by a constrained set of threads under the management of an ExecutorService scheduler. This offers different execution semantics and different resource configurations. You can find its specification under ExecutorService.

This lab does not cover all services by ExecutorService. It illustrates how some of them work. Course material on tasks can be found here. You can view the full documentation of POSIX functions related to threads by following this link.

Sources

You will find all the sources in this compressed archive. Several scenario files are provided to verify your solutions. In addition, an implementation in Java using ExecutorService will allow you to compare the expected result of your implementation in C / POSIX.

You must reuse the protected circular buffer implemented in a previous lab. You must have done at least the first 4 questions of this previous lab. That is to say the implementation with the conditional variables for the blocking, non-blocking and timed semantics.

To decompress, use GNU tar:

tar zxf src.tar.gz

How To Debug

To find your errors, we strongly recommend that you use gdb. It is critical to debug your C programs using gdb and not filling your program with printf. If you have a memory problem (SIGSEGV, ...), do:

gdb ./main_executor
(gdb) run test-20.txt

In case of a problem, the program will stop on the incorrect memory access. To understand the issue, use gdb commands:

MacOS

For MacOS users, it will be necessary to make your gdb operational and MacOS does not facilitate the task. You will find the procedure by following this link. If this link is not sufficient, there are many guides to solve this problem.


1.1 Overview of the system architecture

The main program is located in main_executor.c, the operation of which we will detail later.

The main code consists, after initialization of internal structures of the program, in reading a scenario file passed on the command line and creating as many jobs (job_t) as indicated in the scenario using the job_table_size variable.

Each job is described by a job_t structure, described in scenario.h, containing an index on its position in the jobs table and an exec_time execution time (or job computation time).

// scenario.h

typedef struct {
  int    id;
  long   exec_time;
} job_t;

Executing a job consists in executing the main procedure main_job, described at the beginning of main_executor.c, which itself consists in signaling its start ("initiate"), in simulating a job whose duration is passed as a parameter, the exec_time attribute of the job_t structure, and finally to signal its termination ("complete").

If you progress through main_executor.c, an executor is created by specifying configuration parameters.

// executor.h

typedef struct _executor_t {
  thread_pool_t      * thread_pool;
  long                 keep_alive_time;
  protected_buffer_t * futures;
} executor_t;

During creation, we indicate the configuration parameters, core_pool_size and max_pool_size, of the thread_pool thread manager, but also blocking_queue_size, the maximum number of callables that can remain pending in the future queue, and the keep_alive_time time during which the threads can remain inactive before being destroyed. Each future is a structure referring to a callable and its result (see slides).

executor is responsible for processing the work submitted to it. After creating the executor, the program creates as many callables as there are jobs and submits them to the executor using the submit_callable procedure. For each callable submitted, submit_callable returns a future. Subsequently, the program will collect the results from futures by get_callable_result, possibly blocking if the results are not yet available.

After creating the executor, the program populates the table of callables and futures using the job table. The callable_t structure, described in executor.h, has a pointer to the code of the main procedure to be executed (main attribute) and job characteristics (params attribute). The other attributes of callable_t will be explained later.

// executor.h

typedef void * (*main_func_t)(void *);

typedef struct {
  void       * params;
  main_func_t  main;
  long         period;
} callable_t;

Each callable is associated to a future. Thus, the future_t structure refers to the callable to which it is associated (callable attribute) as well as to the result (result attribute) that it produces once it is in the completed state (completed attribute).

// executor.h

typedef struct {
  int             completed;
  callable_t    * callable;
  void          * result;
} future_t;

1.2 Implementation of a basic threads manager

To get back to the system overview, we can note that in the executor.c file, the submit_callable function requests the creation of a thread of the thread_pool thread manager.

We are going to implement a first version of the thread manager in the thread_pool.c and thread_pool.h files. We briefly recall the principle of the thread manager. A thread manager (thread pool) keeps multiple threads waiting for requests to execute. As shown in the structure below, we do not maintain the list of allocated threads, but only their number as well as the main parameters core_pool_size and max_pool_size.

// thread_pool.h

typedef struct {
  int             core_pool_size;
  int             max_pool_size;
  int             size;
  int             shutdown;
} thread_pool_t;

As the thread_pool structure is accessible concurrently, it must be completed in order to protect it against concurrent access by adding a synchronization object as an attribute. This synchronization object must also be initialized in thread_pool_init.

For now, we are only interested in the case where the number of threads created is less than core_pool_size. You will need to complete the create_pool_thread function so that it creates a thread using the parameters passed to it and then updates the attributes of the thread_pool_t structure.

Having partially completed thread_pool_create, we can use the test-20.txt scenario to verify the implementation. An example of expected output is given below. The messages concerning the callables do not matter, since for the moment, their results are not correctly handled. On the other hand, jobs must be completed in increasing order of execution time (1000, 3000, 4000, 7000) while they are started in a different order (1000, 7000, 3000, 4000). Indeed, core_pool_size equal to 4, 4 threads must be created and each one executes a job in parallel with the others. So jobs always start on instant 0 and end on their expected response time. Note that 4 threads are created and that 4 threads terminate.

core_pool_size = 4
max_pool_size = 4
blocking_queue_size = 4
keep_alive_time = -1
000000 [pool_thread] created
000000 [main_job] initiate execution=1000 period=0
000000 [submit_callable] id 0
000000 [pool_thread] created
000000 [submit_callable] id 1
000000 [main_job] initiate execution=7000 period=0
000000 [pool_thread] created
000000 [main_job] initiate execution=3000 period=0
000000 [submit_callable] id 2
000000 [pool_thread] created
000000 [submit_callable] id 3
000000 [get_callable_result] id 0
000000 [main_job] initiate execution=4000 period=0
000000 [get_callable_result] id 1
000000 [get_callable_result] id 2
000000 [get_callable_result] id 3
001004 [main_job] complete execution=1000 period=0
001004 [pool_thread] terminated
003000 [main_job] complete execution=3000 period=0
003000 [pool_thread] terminated
004005 [main_job] complete execution=4000 period=0
004005 [pool_thread] terminated
007003 [main_job] complete execution=7000 period=0
007003 [pool_thread] terminated
010003 [executor_shutdown]

1.3 Storing execution results

We will now make sure to block while waiting for the result of the execution of a callable to be available. To do this, you have to modify the executor.c file. The result of executing a callable is stored in the future_t structure returned by submit_callable. The get_callable_result function should block while waiting for the callable to complete, ie the completed attribute of the future_t structure is true.

To do this, we use a mutex and a conditional variable. The first to protect the structure against concurrent access, the second to wait until the result of the computation associated with the callable is available. It is therefore necessary to complete the get_callable_result function so that it blocks while waiting for the completed attribute to change to true.

Symmetrically, each callable is executed through the main procedure main_pool_thread of a pool thread. It will be necessary to complete the main_pool_thread function so that, when the callable is executed, the pool thread signals that the result is available by updating the completed attribute of future as well as the synchronization objects.

You can use the test-20.txt scenario to verify your implementation. In this scenario, the results must appear in the correct order and at the expected times. Note that callable results are retrieved in the same order as they are submitted to the executor. So, in the output shown below, the result of callable 0 submitted first is retrieved at instant 1000 since its execution time is 1000 milliseconds. Then, the result of callable 1 submitted second is retrieved on instant 7000. On the other hand, it is only on instant 7000 that we can retrieve the result of callable 3. Indeed, this one has terminated on instant 3000 but the main program retrieves the results in an order such that it has to wait to retrieve the results of callable 2 at 7000 to be able to retrieve the one of callable 3. The same situation occurs for callable 4.

core_pool_size = 4
max_pool_size = 4
blocking_queue_size = 4
keep_alive_time = -1
000000 [pool_thread] created
000000 [submit_callable] id 0
000000 [main_job] initiate execution=1000 period=0
000000 [pool_thread] created
000000 [main_job] initiate execution=7000 period=0
000000 [submit_callable] id 1
000000 [pool_thread] created
000000 [submit_callable] id 2
000000 [pool_thread] created
000000 [submit_callable] id 3
000000 [main_job] initiate execution=4000 period=0
000000 [main_job] initiate execution=3000 period=0
001005 [main_job] complete execution=1000 period=0
001005 [pool_thread] terminated
001005 [get_callable_result] id 0
003005 [main_job] complete execution=3000 period=0
003005 [pool_thread] terminated
004000 [main_job] complete execution=4000 period=0
004000 [pool_thread] terminated
007003 [main_job] complete execution=7000 period=0
007003 [pool_thread] terminated
007003 [get_callable_result] id 1
007003 [get_callable_result] id 2
007003 [get_callable_result] id 3
017007 [executor_shutdown]

1.4 Storing callables in a queue

When a number of threads equal to core_pool_size has been created, the executor suspends the creation of threads. In submit_callable, a call is made to pool_thread_create whose last parameter is 0 (false), which indicates that we do not want to exceed the limit of core_pool_size. Thus, pool_thread_create returns 0 (false) to indicate that the limit has been reached.

// If the current thread pool size is not greater than core_pool_size,
// create a new thread. If it is and force is true, create a new
// thread as well. If a thread is created, increment the current
// thread pool size. Use main as a thread main procedure.

int pool_thread_create(thread_pool_t * thread_pool,
                       main_func_t     main,
                       void          * executor,
                       int             force);

submit_callable must be completed so that new callables are stored in the executor’s future queue. No other thread is created until the queue is full.

In addition, the already created threads, after having executed their current work, must check the queue to extract a future object from it and execute the callable object it contains. Once the callable has been executed, we must complete main_pool_thread, so that the pool_thread extracts another future from the executor’s queue. In the absence of keep_alive_time (FOREVER), the pool_thread blocks as long as no callable is present in the queue.

You can use the test-21.txt scenario to verify your implementation. In this example, there are only 2 pool threads and a queue of size 4 for 4 jobs to perform with computation time 1000ms, 7000ms, 3000ms, and 4000ms. The first thread performs the first 1000 ms job and the second the 7000 ms job. Unlike the previous cases, on the 1000 ms instant, the first thread will extract from the queue the third job which lasts 3000 ms and which will therefore end on the 4000 ms instant. Since the second 7000 ms job is still not completed, the thread will pull the last 4000 ms job from the queue and terminate on the 8000 ms instant.

core_pool_size = 2
max_pool_size = 4
blocking_queue_size = 4
keep_alive_time = -1
000000 [pool_thread] created
000000 [main_job] initiate execution=1000 period=0
000000 [submit_callable] id 0
000000 [pool_thread] created
000000 [submit_callable] id 1
000000 [submit_callable] id 2
000000 [main_job] initiate execution=7000 period=0
000000 [submit_callable] id 3
001005 [main_job] complete execution=1000 period=0
001005 [main_job] initiate execution=3000 period=0
001005 [get_callable_result] id 0
004010 [main_job] complete execution=3000 period=0
004010 [main_job] initiate execution=4000 period=0
007003 [main_job] complete execution=7000 period=0
007003 [get_callable_result] id 1
007003 [get_callable_result] id 2
008013 [main_job] complete execution=4000 period=0
008013 [get_callable_result] id 3
018013 [executor_shutdown]

It can be noted that the 2 threads are indeed created but do not terminate anymore since they are waiting infinitely for jobs in the queue. The program intentionally terminates incorrectly since the executor_shutdown operation does not stop these threads. This problem will be handled later on.


1.5 Implementation of an advanced threads manager

We are now looking to complete the thread creation mechanism of the thread manager. This is about creating new threads when the callable queue is full. However, we should not exceed a maximum of max_pool_size of threads.

As indicated previously, the pool_thread_create function has a force parameter that allows you to force the creation of a thread if the number of threads created is greater than or equal to core_pool_size.

You must first complete pool_thread_create for a thread to be created when the number of threads created is greater than or equal to core_pool_size, when this number is less than max_pool_size and when the force parameter is true.

To implement this functionality, you must also complete submit_callable so that once the queue is full, a pool_thread is created by a call to pool_thread_create under the circumstances described above. However, care must be taken to preserve the order of the callables. If the current callable cannot be processed, extract the first callable from the queue, insert the current callable and assign the first callable to the newly created thread.

You can use the test-22.txt scenario to verify your implementation. In this example, 4 jobs of computation time 1000ms, 7000ms, 3000ms, and 4000ms are submitted to the executor. The manager allows the immediate creation of 2 pool threads since core_pool_size is equal to 2. So 2 callables (1000 ms and 7000 ms) will be processed at instant 0. The third callable will be stored in the queue. However, this one being of size 1 and the queue being full with the callable of 3000 ms, the fourth callable of 4000 ms will cause the creation of a third thread, knowing that max_pool_size is equal to 4. It is therefore necessary to check that the 4000 ms job is processed last and that only 3 threads are created.

core_pool_size = 2
max_pool_size = 4
blocking_queue_size = 1
keep_alive_time = -1
000000 [pool_thread] created
000000 [submit_callable] id 0
000000 [pool_thread] created
000000 [submit_callable] id 1
000000 [main_job] initiate execution=1000 period=0
000000 [submit_callable] id 2
000000 [main_job] initiate execution=7000 period=0
000000 [pool_thread] created
000000 [submit_callable] id 3
000000 [main_job] initiate execution=3000 period=0
001000 [main_job] complete execution=1000 period=0
001000 [main_job] initiate execution=4000 period=0
001000 [get_callable_result] id 0
003004 [main_job] complete execution=3000 period=0
005005 [main_job] complete execution=4000 period=0
007005 [main_job] complete execution=7000 period=0
007005 [get_callable_result] id 1
007005 [get_callable_result] id 2
007005 [get_callable_result] id 3
017006 [executor_shutdown]

1.6 Expiration of a thread after a period of inactivity

We want to let a thread expire if it remains idle for a time specified by keep_alive_time when the number of threads created is greater than core_pool_size and the callable queue is empty.

You have to complete main_pool_thread, so that once the thread has finished its current work, it waits during a keep_alive_time period for a callable to be extracted from the queue. If no future (so no callable) is returned, the thread should call pool_thread_remove, to see if it should stop because the number of threads created is greater than core_pool_size.

It is therefore necessary to complete pool_thread_remove so that the function confirms or not that the thread can be terminated, the number of threads created being greater than core_pool_size. As the number of threads created and therefore as the thread_pool structure is going to be modified, care must be taken to protect thread_pool against concurrent access.

You can use the test-23.txt scenario to verify your implementation. In this example, 4 jobs of computation time 1000ms, 7000ms, 3000ms, and 4000ms are submitted to the executor. The manager does not allow the creation of any thread pool since core_pool_size is 0. Since the queue is of size 1, the first callable is stored in this queue. Then 3 pool threads are created to execute the callables of 1000 ms, 7000 ms and 3000 ms. On instant 1000, the thread in charge of the 1000 ms callable takes care of the 4000 ms callable and ends on instant 5000. As keep_alive_time is 500ms and core_pool_size is 0, the 3 callable threads should end at 3500ms (3000ms + 500ms), 5500ms (1000ms + 4000ms + 500ms) and 7500ms (7000ms + 500ms).

core_pool_size = 0
max_pool_size = 4
blocking_queue_size = 1
keep_alive_time = 500
000000 [submit_callable] id 0
000000 [pool_thread] created
000000 [submit_callable] id 1
000000 [pool_thread] created
000000 [submit_callable] id 2
000000 [pool_thread] created
000000 [submit_callable] id 3
000000 [main_job] initiate execution=1000 period=0
000000 [main_job] initiate execution=7000 period=0
000000 [main_job] initiate execution=3000 period=0
001002 [main_job] complete execution=1000 period=0
001002 [main_job] initiate execution=4000 period=0
001002 [get_callable_result] id 0
001002 [get_callable_result] id 1
003002 [main_job] complete execution=3000 period=0
003502 [pool_thread] terminated
005002 [main_job] complete execution=4000 period=0
005502 [pool_thread] terminated
007000 [main_job] complete execution=7000 period=0
007000 [get_callable_result] id 2
007000 [get_callable_result] id 3
007502 [pool_thread] terminated
017002 [executor_shutdown]

1.7 Consistent termination of the threads manager

Up to now, the executor_shutdown function in the executor.c file does nothing, so the program ends without taking care to shutdown active threads. This has not introduced any problem so far as the scenarios were designed for. But we will now have to deal with this problem.

In executor.c, executor_shutdown calls thread_pool_shutdown to signal threads in the thread pool to terminate as soon as possible. This is not sufficient because some threads can be blocked indefinitely waiting for callables or futures (keep_alive_time infinite). To unblock them, we must complete the executor_shutdown by filling the queue with empty callables (NULL). In the code of main_pool_thread, the threads will be unblocked (with null future) and could potentially terminate if pool_thread_remove allows it.

if ((future == NULL) && pool_thread_remove (executor->thread_pool))
  break;

Thus, executor_shutdown can be terminated by blocking on wait_thread_pool_empty. However, pool_thread_remove and wait_thread_pool_empty will have to be completed. We must add to the thread_pool_t structure of thread_pool.h a synchronization object that will block any thread waiting for the termination of all active pool threads. Then we shall have to use this object in wait_thread_pool_empty and pool_thread_remove.

To complete pool_thread_remove, it is necessary to allow a thread to terminate when the thread pool is in the shutdown phase, whether the number of active threads falls below core_pool_size or not. When all the threads have become inactive through pool_thread_remove, the synchronization object must signal that the thread pool is truly empty.

You can use the test-24.txt scenario to verify your implementation. In this example, 4 jobs of computation time 1000ms, 7000ms, 3000ms, and 4000ms are submitted to the executor. A single thread processes these jobs and finishes their execution on instant 15000. executor_shutdown starts on the instant 25000. The last thread must therefore end on this instant, as it is the case below, with the message [pool_thread] terminated.

core_pool_size = 1
max_pool_size = 1
blocking_queue_size = 4
keep_alive_time = 20000
000000 [pool_thread] created
000000 [main_job] initiate execution=1000 period=0
000000 [submit_callable] id 0
000000 [submit_callable] id 1
000000 [submit_callable] id 2
000000 [submit_callable] id 3
001000 [main_job] complete execution=1000 period=0
001001 [main_job] initiate execution=7000 period=0
001001 [get_callable_result] id 0
008004 [main_job] complete execution=7000 period=0
008004 [main_job] initiate execution=3000 period=0
008004 [get_callable_result] id 1
011006 [main_job] complete execution=3000 period=0
011006 [main_job] initiate execution=4000 period=0
011006 [get_callable_result] id 2
015009 [main_job] complete execution=4000 period=0
015009 [get_callable_result] id 3
025011 [pool_thread] terminated
025011 [executor_shutdown]

1.8 Implementation of periodic threads

We want to implement periodic threads according to the so-called fixed frequency policy (withFixedRate). When the period configuration variable is set to a non-zero value in milliseconds, the callables created will be considered as periodic with period period. As a periodic task, they will no longer provide results since their execution is now infinite.

It is therefore necessary to complete main_pool_thread so that the thread has periodic behavior if the period of the callable is not zero.

You can use the test-25.txt scenario to verify your implementation. We create 4 callables whose execution is repeated every 8000 ms. As shutdown occurs after 10,000 ms, callables will only end after their second execution, at 16,000 ms.

core_pool_size = 4
max_pool_size = 4
blocking_queue_size = 4
keep_alive_time = 5000
000000 [pool_thread] created
000000 [main_job] initiate execution=1000 period=8000
000000 [submit_callable] id 0
000000 [pool_thread] created
000000 [submit_callable] id 1
000000 [pool_thread] created
000000 [submit_callable] id 2
000000 [pool_thread] created
000000 [submit_callable] id 3
000000 [main_job] initiate execution=7000 period=8000
000000 [main_job] initiate execution=3000 period=8000
000000 [main_job] initiate execution=4000 period=8000
001004 [main_job] complete execution=1000 period=8000
003002 [main_job] complete execution=3000 period=8000
004001 [main_job] complete execution=4000 period=8000
007001 [main_job] complete execution=7000 period=8000
008000 [main_job] initiate execution=1000 period=8000
008000 [main_job] initiate execution=3000 period=8000
008000 [main_job] initiate execution=7000 period=8000
008000 [main_job] initiate execution=4000 period=8000
009005 [main_job] complete execution=1000 period=8000
011004 [main_job] complete execution=3000 period=8000
012004 [main_job] complete execution=4000 period=8000
015004 [main_job] complete execution=7000 period=8000
016003 [pool_thread] terminated
016003 [pool_thread] terminated
016003 [pool_thread] terminated
016003 [pool_thread] terminated
016003 [executor_shutdown]