Thread synchronization questions for experts

Discussion of chess software programming and technical issues.

Moderators: bob, hgm, Harvey Williamson

Forum rules
This textbox is used to restore diagrams posted with the [d] tag before the upgrade.
User avatar
sje
Posts: 4675
Joined: Mon Mar 13, 2006 6:43 pm

Thread synchronization questions for experts

Post by sje » Tue Apr 21, 2015 9:06 am

Thread synchronization questions for experts

I am in the process of re-working Symbolic's multithreading operations. I've already completed the work for multithreaded memory clearing, multithreaded random came generation, and multithreaded text output operations. I'm now tackling the revision of multithreaded perft() as a precursor to reworking multithreaded search.

The current perft() arrangement uses a separate event input queue for each worker thread, guarded by a mutex and shared between the worker thread and the master distribution thread. When the worker thread is idle, it checks the pending event count via the mutex and does this using polling. The polling runs a loop with a 100 millisecond sleep and checks the event count once per cycle. The master distribution thread also uses polling to collect results from worker threads.

This is somewhat wasteful of CPU resources and it also can induce an unnecessary delay when waking the thread. It is what I call an "inefficient wait". What is preferred is an "efficient wait" which is much less wasteful and also provides a snappier response.

Unix-like platforms offer two means of efficient waiting from the pthread library: condition variables and semaphores. At present, I'm working with condition variables.

The basic problem I'm having is the case where a master thread calls pthread_cond_signal() when the worker thread is doing something other than waiting with pthread_cond_wait().

The master executes:

Code: Select all

  pthread_mutex_lock();
  EnqueueJob();
  pthread_cond_signal();
  pthread_mutex_unlock();
The worker executes (looping, always has the mutex lock except when inside the wait call):

Code: Select all

  pthread_cond_wait();
  DequeueJob();
While it doesn't happen often, there are times when the master can enqueue more than one job while the worker remains inside the wait routine. To fix this, I've added an inner loop in the worker which, after each wait, repeatedly hits the job queue until it's empty.

But this has problems, too. First, it prevents any master from enqueuing more requests as long as the worker is looping because the worker holds the queue mutex. This makes the master block which should be avoided as much as possible.

Second, if the worker carefully gives up the mutex lock while processing a job, this allows even more signals to be lost. Maybe some more tricky looping arrangement might fix this.

----

An alternative to a condition variable is a semaphore. Indeed, in the above the combination of an event queue including an element count and the condition variable is much like a semaphore. Should I just drop the condition variable scheme and use semaphores instead?

mar
Posts: 2122
Joined: Fri Nov 26, 2010 1:00 pm
Location: Czech Republic
Full name: Martin Sedlak

Re: Thread synchronization questions for experts

Post by mar » Tue Apr 21, 2015 10:02 am

From what I understood (and what I do), the typical use of pthread_cond_wait goes like this:

Code: Select all

pthread_mutex_lock();
while(!condition)
    pthread_cond_wait();
pthread_mutex_unlock();
similar code goes for signalling:

Code: Select all

pthread_mutex_lock();
condition = 1;
pthread_cond_signal();
pthread_mutex_unlock();
On Windows, when using Events, simply waiting on/signalling an event will do.

Aleks Peshkov
Posts: 882
Joined: Sun Nov 19, 2006 8:16 pm
Location: Russia

Re: Thread synchronization questions for experts

Post by Aleks Peshkov » Tue Apr 21, 2015 10:45 am

IMHO for better performance it should be:

Code: Select all

if (!condition) {
    pthread_mutex_lock();
    while(!condition)
        pthread_cond_wait();
    pthread_mutex_unlock();
}

Code: Select all

pthread_mutex_lock();
condition = 1;
pthread_mutex_unlock();
pthread_cond_signal();

User avatar
sje
Posts: 4675
Joined: Mon Mar 13, 2006 6:43 pm

Code sample

Post by sje » Tue Apr 21, 2015 11:13 am

Code sample

Code: Select all

WriterTask::WriterTask(void)
{
  DIPtr->Log("WriterTask created");
  joblistmutex.Lock();
  jobthreadptr = new Thread(Starter, this);
}

WriterTask::~WriterTask(void)
{
  delete jobthreadptr;
  joblistmutex.Unlock();
  DIPtr->Log("WriterTask destroyed");
}

void WriterTask::Enqueue(WriterCmd command, const std::string& text)
{
  WriterJobNodePtr wjnptr = new WriterJobNode();
  
  wjnptr->command = command;
  wjnptr->text = text;
  
  joblistmutex.Lock();
  joblist.Append(wjnptr);
  joblistcondvar.Signal();
  joblistmutex.Unlock();
}

void *WriterTask::Starter(void *objptr)
{
  ((WriterTask *) objptr)->JobLoop();
  return 0;
}

void WriterTask::JobLoop(void)
{
  bool isrunning = true;
  ui64 writecount = 0;
  
  DIPtr->Log("WriterTask::JobLoop begun");

  while (isrunning)
  {
    joblistcondvar.Wait(joblistmutex);

    while (joblist.GetCount() > 0)
    {
      WriterJobNodePtr wjnptr = joblist.DetachHead();
     
      switch (wjnptr->command)
      {
        case WriterCmdExit:
          isrunning = false;
          break;
          
        case WriterCmdNoop:
          break;
          
        case WriterCmdWriteLn:
          DIPtr->WriteLn(wjnptr->text);
          writecount++;
          break;
          
        default:
          SwitchFault("WriterTask::JobLoop");
          break;
      };
      delete wjnptr;
    };
  };

  DIPtr->Log("WriterTask::JobLoop writecount: " + EncodeUi64(writecount));
  DIPtr->Log("WriterTask::JobLoop ended");
}

User avatar
sje
Posts: 4675
Joined: Mon Mar 13, 2006 6:43 pm

Condition variables vs semaphores

Post by sje » Tue Apr 21, 2015 11:44 am

From Advanced Linux Programming (2001):
A condition variable may also be used without a condition, simply as a mechanism for blocking a thread until another thread “wakes it up.” A semaphore may also be used for that purpose.The principal difference is that a semaphore “remembers” the wake-up call even if no thread was blocked on it at the time, while a condition variable discards the wake-up call unless some thread is actually blocked on it at the time. Also, a semaphore delivers only a single wake-up per post; with pthread_cond_broadcast, an arbitrary and unknown number of blocked threads may be awoken at the same time.

User avatar
sje
Posts: 4675
Joined: Mon Mar 13, 2006 6:43 pm

Sample code using a semaphore

Post by sje » Tue Apr 21, 2015 12:29 pm

Sample code using a semaphore instead of a condition variable:

Code: Select all

WriterTask::WriterTask(void)
{
  DIPtr->Log("WriterTask created");
  jobthreadptr = new Thread(Starter, this);
}

WriterTask::~WriterTask(void)
{
  delete jobthreadptr;
  DIPtr->Log("WriterTask destroyed");
}

void WriterTask::Enqueue(WriterCmd command, const std::string& text)
{
  WriterJobNodePtr wjnptr = new WriterJobNode();

  wjnptr->command = command;
  wjnptr->text = text;

  joblistmutex.Lock();
  joblist.Append(wjnptr);
  joblistmutex.Unlock();
  joblistsemaphore.Post();
}

void *WriterTask::Starter(void *objptr)
{
  ((WriterTask *) objptr)->JobLoop();
  return 0;
}

void WriterTask::JobLoop(void)
{
  bool isrunning = true;
  ui64 writecount = 0;

  DIPtr->Log("WriterTask::JobLoop begun");

  while (isrunning)
  {
    WriterJobNodePtr wjnptr;
    
    joblistsemaphore.Wait();
    joblistmutex.Lock();
    wjnptr = joblist.DetachHead();
    joblistmutex.Unlock();

    switch (wjnptr->command)
    {
      case WriterCmdExit:
        isrunning = false;
        break;

      case WriterCmdNoop:
        break;

      case WriterCmdWriteLn:
        DIPtr->WriteLn(wjnptr->text);
        writecount++;
        break;

      default:
        SwitchFault("WriterTask::JobLoop");
        break;
    };
    delete wjnptr;
  };

  DIPtr->Log("WriterTask::JobLoop writecount: " + EncodeUi64(writecount));
  DIPtr->Log("WriterTask::JobLoop ended");
}

User avatar
sje
Posts: 4675
Joined: Mon Mar 13, 2006 6:43 pm

Code sample: Thread.h

Post by sje » Tue Apr 21, 2015 12:43 pm

Code sample: Thread.h

Code: Select all

//// Symbolic: A portable multithreaded bitboard chessplaying program
//
// Copyright (C) 2015 by chessnotation@me.com  All rights reserved

#ifndef IncludedThread
#define IncludedThread

class SpinLock
{
public:
  SpinLock(void);
  ~SpinLock(void);

  void Lock(void);
  void Unlock(void);

private:
  volatile void *vsptr;  // Pointer to the allocated spinlock in use
};

class Mutex
{
public:
  Mutex(void);
  ~Mutex(void);

  void Lock(void);
  void Unlock(void);

  void *FetchMutexPtr(void) const {return vmptr;}

private:
  void *vmptr;  // Pointer to the allocated mutex in use
};

class CondVar
{
public:
  CondVar(void);
  ~CondVar(void);

  void Signal(void);
  void Wait(Mutex& mutex);

private:
  void *vcptr;  // Pointer to the allocated condition variable in use
};

class Semaphore
{
public:
  Semaphore(void);
  ~Semaphore(void);

  void Post(void);
  void Wait(void);

private:
  void        *vsptr;  // Pointer to the allocated named semaphore in use
  std::string  name;   // Unique semaphore name
};

class Thread
{
public:
  Thread(void *(*routineptr)(void *), void *argptr);
  ~Thread(void);

private:
  volatile void *idptr;  // Allocated native thread ID
};

#endif

User avatar
sje
Posts: 4675
Joined: Mon Mar 13, 2006 6:43 pm

Code sample: Thread.cpp

Post by sje » Tue Apr 21, 2015 12:45 pm

Code sample: Thread.cpp

Code: Select all

//// Symbolic: A portable multithreaded bitboard chessplaying program
//
// Copyright (C) 2015 by chessnotation@me.com  All rights reserved

#include <fcntl.h>
#include <pthread.h>
#include <semaphore.h>

#include <cstdlib>
#include <sstream>

#include "Definitions.h"
#include "Forward.h"

#include "Utilities.h"

#include "Thread.h"

#if &#40;HostOsApple&#41;
#include <libkern/OSAtomic.h>
#endif

SpinLock&#58;&#58;SpinLock&#40;void&#41;
&#123;
#if &#40;HostOsApple&#41;
  vsptr = &#40;void *) new OSSpinLock;
  *&#40;OSSpinLock *) vsptr = OS_SPINLOCK_INIT;
#endif

#if &#40;HostOsCygwin || HostOsLinux&#41;
  vsptr = new pthread_spinlock_t;
  if &#40;pthread_spin_init&#40;&#40;pthread_spinlock_t *) vsptr, PTHREAD_PROCESS_PRIVATE&#41;)
    Die&#40;"SpinLock&#58;&#58;SpinLock", "pthread_spin_init");
#endif

#if &#40;HostOsUnknown&#41;
  vsptr = new pthread_mutex_t;
  pthread_mutexattr_t *mutexattrptr = new pthread_mutexattr_t;

  pthread_mutexattr_init&#40;mutexattrptr&#41;;
  pthread_mutexattr_settype&#40;mutexattrptr, PTHREAD_MUTEX_ERRORCHECK&#41;;
  if &#40;pthread_mutex_init&#40;&#40;pthread_mutex_t *) vsptr, mutexattrptr&#41;)
    Die&#40;"SpinLock&#58;&#58;SpinLock", "pthread_mutex_init");
  pthread_mutexattr_destroy&#40;mutexattrptr&#41;;
  delete mutexattrptr;
#endif
&#125;

SpinLock&#58;&#58;~SpinLock&#40;void&#41;
&#123;
#if &#40;HostOsApple&#41;
  delete &#40;OSSpinLock *) vsptr;
#endif

#if &#40;HostOsCygwin || HostOsLinux&#41;
  if &#40;pthread_spin_destroy&#40;&#40;pthread_spinlock_t *) vsptr&#41;)
    Die&#40;"SpinLock&#58;&#58;~SpinLock", "pthread_spin_destroy");
  delete &#40;pthread_spinlock_t *) vsptr;
#endif

#if &#40;HostOsUnknown&#41;
  if &#40;pthread_mutex_destroy&#40;&#40;pthread_mutex_t *) vsptr&#41;)
    Die&#40;"SpinLock&#58;&#58;~SpinLock", "pthread_mutex_destroy");
  delete &#40;pthread_mutex_t *) vsptr;
#endif
&#125;

void SpinLock&#58;&#58;Lock&#40;void&#41;
&#123;
#if &#40;HostOsApple&#41;
  OSSpinLockLock&#40;&#40;OSSpinLock *) vsptr&#41;;
#endif

#if &#40;HostOsCygwin || HostOsLinux&#41;
  if &#40;pthread_spin_lock&#40;&#40;pthread_spinlock_t *) vsptr&#41;)
    Die&#40;"SpinLock&#58;&#58;Lock", "pthread_spin_lock");
#endif

#if &#40;HostOsUnknown&#41;
  if &#40;pthread_mutex_lock&#40;&#40;pthread_mutex_t *) vsptr&#41;)
    Die&#40;"SpinLock&#58;&#58;Lock", "pthread_mutex_lock");
#endif
&#125;

void SpinLock&#58;&#58;Unlock&#40;void&#41;
&#123;
#if &#40;HostOsApple&#41;
  OSSpinLockUnlock&#40;&#40;OSSpinLock *) vsptr&#41;;
#endif

#if &#40;HostOsCygwin || HostOsLinux&#41;
  if &#40;pthread_spin_unlock&#40;&#40;pthread_spinlock_t *) vsptr&#41;)
    Die&#40;"SpinLock&#58;&#58;Unlock", "pthread_spin_unlock");
#endif

#if &#40;HostOsUnknown&#41;
  if &#40;pthread_mutex_unlock&#40;&#40;pthread_mutex_t *) vsptr&#41;)
    Die&#40;"SpinLock&#58;&#58;Unlock", "pthread_mutex_unlock");
#endif
&#125;

Mutex&#58;&#58;Mutex&#40;void&#41;
&#123;
  vmptr = new pthread_mutex_t;
  pthread_mutexattr_t * const mutexattrptr = new pthread_mutexattr_t;

  pthread_mutexattr_init&#40;mutexattrptr&#41;;
  pthread_mutexattr_settype&#40;mutexattrptr, PTHREAD_MUTEX_NORMAL&#41;;
  if &#40;pthread_mutex_init&#40;&#40;pthread_mutex_t *) vmptr, mutexattrptr&#41;)
    Die&#40;"Mutex&#58;&#58;Mutex", "pthread_mutex_init");
  pthread_mutexattr_destroy&#40;mutexattrptr&#41;;
  delete mutexattrptr;
&#125;

Mutex&#58;&#58;~Mutex&#40;void&#41;
&#123;
  if &#40;pthread_mutex_destroy&#40;&#40;pthread_mutex_t *) vmptr&#41;)
    Die&#40;"Mutex&#58;&#58;~Mutex", "pthread_mutex_destroy");
  delete &#40;pthread_mutex_t *) vmptr;
&#125;

void Mutex&#58;&#58;Lock&#40;void&#41;
&#123;
  if &#40;pthread_mutex_lock&#40;&#40;pthread_mutex_t *) vmptr&#41;)
    Die&#40;"Mutex&#58;&#58;Lock", "pthread_mutex_lock");
&#125;

void Mutex&#58;&#58;Unlock&#40;void&#41;
&#123;
  if &#40;pthread_mutex_unlock&#40;&#40;pthread_mutex_t *) vmptr&#41;)
    Die&#40;"Mutex&#58;&#58;Unlock", "pthread_mutex_unlock");
&#125;

CondVar&#58;&#58;CondVar&#40;void&#41;
&#123;
  vcptr = new pthread_cond_t;
  pthread_condattr_t * const condattrptr = new pthread_condattr_t;

  pthread_condattr_init&#40;condattrptr&#41;;
  if &#40;pthread_cond_init&#40;&#40;pthread_cond_t *) vcptr, condattrptr&#41;)
    Die&#40;"CondVar&#58;&#58;CondVar", "pthread_cond_init");
  pthread_condattr_destroy&#40;condattrptr&#41;;
  delete condattrptr;
&#125;

CondVar&#58;&#58;~CondVar&#40;void&#41;
&#123;
  if &#40;pthread_cond_destroy&#40;&#40;pthread_cond_t *) vcptr&#41;)
    Die&#40;"CondVar&#58;&#58;~CondVar", "pthread_cond_destroy");
  delete &#40;pthread_cond_t *) vcptr;
&#125;

void CondVar&#58;&#58;Signal&#40;void&#41;
&#123;
  if &#40;pthread_cond_signal&#40;&#40;pthread_cond_t *) vcptr&#41;)
    Die&#40;"CondVar&#58;&#58;Signal", "pthread_cond_signal");
&#125;

void CondVar&#58;&#58;Wait&#40;Mutex& mutex&#41;
&#123;
  pthread_mutex_t * const pmptr = &#40;pthread_mutex_t *) mutex.FetchMutexPtr&#40;);

  if &#40;pthread_cond_wait&#40;(&#40;pthread_cond_t *) vcptr&#41;, pmptr&#41;)
    Die&#40;"CondVar&#58;&#58;Wait", "pthread_cond_wait");
&#125;

Semaphore&#58;&#58;Semaphore&#40;void&#41;
&#123;
  char tmpname&#91;17&#93; = "semaphore.XXXXXX";

  name = mktemp&#40;tmpname&#41;;
  vsptr = &#40;void *) sem_open&#40;tmpname, O_CREAT | O_EXCL, S_IRWXU, 0&#41;;
  if (!vsptr&#41;
    Die&#40;"Semaphore&#58;&#58;Semaphore", "sem_open");
&#125;

Semaphore&#58;&#58;~Semaphore&#40;void&#41;
&#123;
  if &#40;sem_close&#40;&#40;sem_t *) vsptr&#41;)
    Die&#40;"Semaphore&#58;&#58;~Semaphore", "sem_close");
  if &#40;sem_unlink&#40;name.c_str&#40;)))
    Die&#40;"Semaphore&#58;&#58;~Semaphore", "sem_unlink");
&#125;

void Semaphore&#58;&#58;Post&#40;void&#41;
&#123;
  if &#40;sem_post&#40;&#40;sem_t *) vsptr&#41;)
    Die&#40;"Semaphore&#58;&#58;Post", "sem_post");
&#125;

void Semaphore&#58;&#58;Wait&#40;void&#41;
&#123;
  if &#40;sem_wait&#40;&#40;sem_t *) vsptr&#41;)
    Die&#40;"Semaphore&#58;&#58;Wait", "sem_wait");
&#125;

Thread&#58;&#58;Thread&#40;void *(*routineptr&#41;&#40;void *), void *argptr&#41;
&#123;
  pthread_attr_t * const attrptr = new pthread_attr_t;

  pthread_attr_init&#40;attrptr&#41;;
  pthread_attr_setdetachstate&#40;attrptr, PTHREAD_CREATE_JOINABLE&#41;;
  idptr = &#40;void *) new pthread_t;
  if &#40;pthread_create&#40;&#40;pthread_t *) idptr, attrptr, routineptr, argptr&#41;)
    Die&#40;"Thread&#58;&#58;Thread", "pthread_create");
  pthread_attr_destroy&#40;attrptr&#41;;
  delete attrptr;
&#125;

Thread&#58;&#58;~Thread&#40;void&#41;
&#123;
  if &#40;pthread_join&#40;*&#40;pthread_t *) idptr, 0&#41;)
    Die&#40;"Thread&#58;&#58;~Thread", "pthread_join");
  delete &#40;pthread_t *) idptr;
&#125;

mar
Posts: 2122
Joined: Fri Nov 26, 2010 1:00 pm
Location: Czech Republic
Full name: Martin Sedlak

Re: Sample code using a semaphore

Post by mar » Tue Apr 21, 2015 6:26 pm

My ParallelFor does something else.
The intended use is to paralellize for loops like (i=0; i<n; i++).
I prefer to have the workers actively poll via atomic counter, so the only expensive thing to do is to wake up workers (ncores-1) at the beginning.
ParallelFor blocks the executing thread until the loop is done but since it's also one of the workers, it just signals the workers and starts crunching immediately.

At the moment, I use condition variable/mutex/flag combination (i.e. Event) for this.
The neat thing is I don't have to wake all threads if there's not enough work.

On the other hand, if ncores would be large, I have to signal many events.
I could easily work around this by using pthread_cond_broadcast (manual reset event on Win), but that would always wake all threads. I'm not conviced yet which path to take.

I also pass step (=batch size) to ParallelFor which allows me to parallelize loops where one iteration takes very little time, so the callback then looks like this:

Code: Select all

void ( Int idx, Int /*thrIdx*/ )
&#123;
    Int idxMax = Min&#40; count, idx + step );
    for (; idx < idxMax; idx++ ) &#123;
        // do stuff
    &#125;
&#125;
With this simple approach I was able to beat OpenMP parallel for by a small margin [EDIT: actually not small at all in this particular case] (at least the MSC implementation), which does naively split count by n (threads). This can be a problem for large n because a worker may run out of work before other threads finish.
Of course the drawback is that it requires me to specify the step manually but it's not a big deal since I know what I'm doing.
[note: this was also for the first time when I fully appreciated C++11 lambdas because capturing the scope manually can be painful]

This is only feasible for paralellizing simple loops and cannot be used to paralellize search, but it's very handy in many cases where trivial paralellization is required. It might be used to split at root though.

It each loop iteration takes varying amount of time, it's obviously best to sort it so that expensive iterations come first.

The solution you presented is nice but it's only feasible for heavy duty jobs (which was intended I guess). Also thanks for posting this, I learned a couple of new things.

brtzsnr
Posts: 433
Joined: Fri Jan 16, 2015 3:02 pm
Contact:

Re: Thread synchronization questions for experts

Post by brtzsnr » Tue Apr 21, 2015 7:23 pm

mar wrote:From what I understood (and what I do), the typical use of pthread_cond_wait goes like this:

Code: Select all

pthread_mutex_lock&#40;);
while&#40;!condition&#41;
    pthread_cond_wait&#40;);
pthread_mutex_unlock&#40;);
similar code goes for signalling:

Code: Select all

pthread_mutex_lock&#40;);
condition = 1;
pthread_cond_signal&#40;);
pthread_mutex_unlock&#40;);
On Windows, when using Events, simply waiting on/signalling an event will do.
This is the correct way to do it. The worker needs to check the condition inside the mutex for two reasons:
1) the mutex protects the condition variable (and avoids race conditions)
2) signals are not delivered if nobody is listening to them. so if worker was busy when master signaled it will not pick the signal when it calls pthread_cond_wait.

Depending on how many tasks you enqueue you can optimize by:
1) enqueue more than one task at once
2) dequeue more than one task at once

AFAIK pthread_cond is normally starvation free so you don't have to worry about it.

Post Reply