13.7 Guarding Against Spawning Too Many Threads

13.7.1 Problem

You need to prevent too many threads from being spawned, a problem that could potentially result in a denial of service owing to exhausted system resources.

13.7.2 Solution

A common mistake in writing multithreaded programs is to create a new thread every time a new task is initiated; this is often overkill. Often, a "pool" of threads can be used to perform simple tasks. A set number of threads are created when the program initializes, and these threads exist for the lifetime of the process. Whenever a task needs to be performed on another thread, the task can be queued. When a thread is available, it can perform the task, then go back to waiting for another task to perform.

On Windows 2000 and greater, there is a new API function called QueueUserWorkItem( ) that essentially implements the same functionality as that presented in this recipe. Unfortunately, that function does not exist on older versions of Windows. Our solution has the advantage of being portable to such older systems. However, if you are writing code that is guaranteed always to be running on a system that supports the API, you may wish to use it instead. Regardless of whether you use the API or the code we present in this recipe, the concepts are the same, and the bulk of our discussion still applies.

13.7.3 Discussion

Suppose that the program using thread spawns is a network server, and it spawns a new thread for each connection it receives, an attacker can quickly flood the server with false or incomplete connections. The result is either that the server runs out of available threads and cannot create any more, or that it cannot create them fast enough to service the incoming requests. Either way, legitimate connections can no longer get through, and system resources are exhausted.

The proper way to handle a program using an arbitrary number of threads is to generate a "pool" of threads in advance, which solves two problems. First, it removes the thread creation time, which can be expensive because of the cost of accepting a new connection. Second, it prevents system resources from being exhausted because too many threads have been spawned.

We can effectively map threads that would otherwise be spawned to tasks. Normally when a thread is spawned, a function to serve as its entry point is specified along with a pointer to void as an argument that can be any application-specific data to be passed to the thread's entry point. We'll mirror these semantics in our tasks and create a function, spc_threadpool_schedule( ) to schedule a new task. The task will be stored at the end of a list so that tasks will be run in the order they are scheduled. When a new task is scheduled, the system will signal a condition object, which pooled threads will wait on when they have no tasks to run. Figure 13-1 illustrates the sequence of events that occurs in each pooled thread.

Figure 13-1. Actions carried out by pooled threads
figs/spcb_1301.gif

Notice that the number of tasks that can be scheduled is not restricted. As long as there is sufficient memory to create a new task structure, tasks will be scheduled. Depending on how the thread pool is to be used, it may be desirable to limit the number of tasks that can be scheduled at any one time. For example, in a network server that schedules each connection as a task, you may want to immediately limit the number of connections until all of the already scheduled connections have been run.

#include <stdlib.h>
#ifndef WIN32
#include <pthread.h>
#else
#include <windows.h>
#endif
   
typedef void (*spc_threadpool_fnptr)(void *);
   
typedef struct _spc_threadpool_task {
  spc_threadpool_fnptr        fnptr;
  void                        *arg;
  struct _spc_threadpool_task *next;
} spc_threadpool_task;
   
typedef struct {
  int                 size;
  int                 destroy;
#ifndef WIN32
  pthread_t           *tids;
  pthread_cond_t      cond;
#else
  HANDLE              *tids;
  HANDLE              cond;
#endif
  spc_threadpool_task *tasks;
  spc_threadpool_task *tail;
} spc_threadpool_t;
   
#ifndef WIN32
#define SPC_ACQUIRE_MUTEX(mtx)       pthread_mutex_lock(&(mtx))
#define SPC_RELEASE_MUTEX(mtx)       pthread_mutex_unlock(&(mtx))
#define SPC_CREATE_COND(cond)        pthread_cond_init(&(cond), 0)
#define SPC_DESTROY_COND(cond)       pthread_cond_destroy(&(cond))
#define SPC_SIGNAL_COND(cond)        pthread_cond_signal(&(cond))
#define SPC_BROADCAST_COND(cond)     pthread_cond_broadcast(&(cond))
#define SPC_WAIT_COND(cond, mtx)     pthread_cond_wait(&(cond), &(mtx))
#define SPC_CLEANUP_PUSH(func, arg)  pthread_cleanup_push(func, arg)
#define SPC_CLEANUP_POP(exec)        pthread_cleanup_pop(exec)
#define SPC_CREATE_THREAD(t, f, arg) (!pthread_create(&(t), 0, (f), (arg)))
   
static pthread_mutex_t threadpool_mutex = PTHREAD_MUTEX_INITIALIZER;
#else
#define SPC_ACQUIRE_MUTEX(mtx)       WaitForSingleObjectEx((mtx), INFINITE, FALSE)
#define SPC_RELEASE_MUTEX(mtx)       ReleaseMutex((mtx))
#define SPC_CREATE_COND(cond)        (cond) = CreateEvent(0, TRUE, FALSE, 0)
#define SPC_DESTROY_COND(cond)       CloseHandle((cond))
#define SPC_SIGNAL_COND(cond)        SetEvent((cond))
#define SPC_BROADCAST_COND(cond)     PulseEvent((cond))
#define SPC_WAIT_COND(cond, mtx)     spc_win32_wait_cond((cond), (mtx))
#define SPC_CLEANUP_PUSH(func, arg)  { void (*_ _spc_func)(void *) = (func); \
                                     void *_ _spc_arg = (arg)
#define SPC_CLEANUP_POP(exec)        if ((exec)) _ _spc_func(_ _spc_arg); } \
                                     do {  } while (0)
#define SPC_CREATE_THREAD(t, f, arg) ((t) = CreateThread(0, 0, (f), (arg), 0, 0))
   
static HANDLE threadpool_mutex = 0;
#endif
   
#ifdef WIN32
static void spc_win32_wait_cond(HANDLE cond, HANDLE mutex) {
  HANDLE handles[2];
   
  handles[0] = cond;
  handles[1] = mutex;
  ResetEvent(cond);
  ReleaseMutex(mutex);
  WaitForMultipleObjectsEx(2, handles, TRUE, INFINITE, FALSE);
}
#endif
   
int spc_threadpool_schedule(spc_threadpool_t *pool, spc_threadpool_fnptr fnptr,
                            void *arg) {
  spc_threadpool_task *task;
   
  SPC_ACQUIRE_MUTEX(threadpool_mutex);
  if (!pool->tids) {
    SPC_RELEASE_MUTEX(threadpool_mutex);
    return 0;
  }
  if (!(task = (spc_threadpool_task *)malloc(sizeof(spc_threadpool_task)))) {
    SPC_RELEASE_MUTEX(threadpool_mutex);
    return 0;
  }
  task->fnptr = fnptr;
  task->arg   = arg;
  task->next  = 0;
  if (pool->tail) pool->tail->next = task;
  else pool->tasks = task;
  pool->tail = task;
  SPC_SIGNAL_COND(pool->cond);
  SPC_RELEASE_MUTEX(threadpool_mutex);
  return 1;
}

Each pooled thread will normally run in a loop that waits for new tasks to be scheduled. When a new task is scheduled, it will be removed from the list of scheduled tasks and run. When there are no scheduled tasks, the threads will be put to sleep, waiting on the condition that spc_threadpool_schedule( ) will signal when a new task is scheduled. Note that pthread_cond_wait( ) is a cancellation point. If the thread is cancelled while it is waiting for the condition to be signaled, the guard mutex will be locked. As a result, we need to push a cleanup handler to undo that so that other threads will successfully die when they are cancelled as well. (The importance of this behavior will become apparent shortly.)

static void cleanup_worker(void *arg) {
  spc_threadpool_t *pool = (spc_threadpool_t *)arg;
   
  if (pool->destroy && !--pool->destroy) {
    SPC_DESTROY_COND(pool->cond);
    free(pool);
  }
  SPC_RELEASE_MUTEX(threadpool_mutex);
}
   
#ifndef WIN32
static void *worker_thread(void *arg) {
#else
static DWORD WINAPI worker_thread(LPVOID arg) {
#endif
  int                 done = 0;
  spc_threadpool_t    *pool = (spc_threadpool_t *)arg;
  spc_threadpool_task *task;
  
  while (!done) {
    SPC_ACQUIRE_MUTEX(threadpool_mutex);
    if (!pool->tids || pool->destroy) {
      cleanup_worker(arg);
      return 0;
    }
    SPC_CLEANUP_PUSH(cleanup_worker, arg);
    if (pool->tids) {
      if (!pool->tasks) SPC_WAIT_COND(pool->cond, threadpool_mutex);
      if ((task = pool->tasks) != 0)
        if (!(pool->tasks = task->next)) pool->tail = 0;
    } else done = 1;
    SPC_CLEANUP_POP(1);
   
    if (!done && task) {
      task->fnptr(task->arg);
      free(task);
    }
  }
  return 0;
}

Before any tasks can be scheduled, the pool of threads to run them needs to be created. This is done by making a call to spc_threadpool_init( ) and specifying the number of threads that will be in the pool. Be careful not to make the size of the pool too small. It is better for it to be too big than not big enough. Ideally, you would like to have scheduled tasks remain scheduled for as short a time as possible. Finding the right size for the thread pool will likely take some tuning, and it is probably a good idea to make it a configurable option in your program.

If there is a problem creating any of the threads to be part of the pool, any already created threads are canceled, and the initialization function will return failure. Successive attempts can be made to initialize the pool without any leakage of resources.

spc_threadpool_t *spc_threadpool_init(int pool_size) {
  int              i;
  spc_threadpool_t *pool;
   
#ifdef WIN32
  if (!threadpool_mutex) threadpool_mutex = CreateMutex(NULL, FALSE, 0);
#endif
  
  if (!(pool = (spc_threadpool_t *)malloc(sizeof(spc_threadpool_t))))
    return 0;
#ifndef WIN32
  pool->tids = (pthread_t *)malloc(sizeof(pthread_t) * pool_size);
#else
  pool->tids = (HANDLE *)malloc(sizeof(HANDLE) * pool_size);
#endif
  if (!pool->tids) {
    free(pool);
    return 0;
  }
  SPC_CREATE_COND(pool->cond);
   
  pool->size    = pool_size;
  pool->destroy = 0;
  pool->tasks   = 0;
  pool->tail    = 0;
   
  SPC_ACQUIRE_MUTEX(threadpool_mutex);  
  for (i = 0;  i < pool->size;  i++) {
    if (!SPC_CREATE_THREAD(pool->tids[i], worker_thread, pool)) {
      pool->destroy = i;
      free(pool->tids);
      pool->tids = 0;
      SPC_RELEASE_MUTEX(threadpool_mutex);
      return 0;
    }
  }
  SPC_RELEASE_MUTEX(threadpool_mutex);
  return pool;
}

Finally, when the thread pool is no longer needed, it can be cleaned up by calling spc_threadpool_cleanup( ). All of the threads in the pool will be cancelled, and any scheduled tasks will be destroyed without being run.

void spc_threadpool_cleanup(spc_threadpool_t *pool) {
  spc_threadpool_task *next;
   
  SPC_ACQUIRE_MUTEX(threadpool_mutex);
  if (pool->tids) {
    while (pool->tasks) {
      next = pool->tasks->next;
      free(pool->tasks);
      pool->tasks = next;
    }
    free(pool->tids);
    pool->tids = 0;
  }
  pool->destroy = pool->size;
  SPC_BROADCAST_COND(pool->cond);
  SPC_RELEASE_MUTEX(threadpool_mutex);
}