7/20/2011

07-20-11 - Some condition var implementations

Okay, time for lots of ugly code posting.

Perhaps the simplest condvar implementation is FIFO and SCHED_OTHER using an explicit waiter list. The waiter list is protected by an internal mutex. It looks like this :

(BTW I'm putting the external mutex in the condvar for purposes of this code, but you may not actually want to do that ; see previous notes )


struct cond_var_mutex_list
{
    std::mutex  external_m;
    std::mutex  internal_m;
    // (*1)
    std::list<HANDLE>   waitset;

    cond_var_mutex_list() { }
    ~cond_var_mutex_list() { }
    
    void lock() { external_m.lock($); }
    void unlock() { external_m.unlock($); }
    
    // (*1) should be the event from TLS :
    HANDLE get_event()
    {
        return CreateEvent(NULL,0,0,NULL);
    }
    void free_event(HANDLE h)
    {
        CloseHandle(h);
    }
    
    void unlock_wait_lock() 
    {
        HANDLE h = get_event();
        
        // taking internal_m lock prevents us from racing with signal
        {
        internal_m.lock($);

        waitset.push_back(h);
        
        internal_m.unlock($);
        }
        
        // (*2)
        external_m.unlock($);
        
        WaitForSingleObject(h,INFINITE);
        
        free_event(h);

        // I will often wake from the signal and immediately go to sleep here :
        external_m.lock($);
    }
    
    // could return if one was signalled
    void signal()
    {
        HANDLE h = 0;
        
        // pop a waiter off the front, if any :
        {
        internal_m.lock($);
        
        if ( ! waitset.empty() )
        {
            h = waitset.front();
            waitset.pop_front();
        }
        
        internal_m.unlock($);
        }
        
        if ( h == 0 )
            return;
    
        SetEvent(h);        
    }
    
    // could return # signalled
    void broadcast()
    {
        std::list<HANDLE> local_waitset;
        
        // grab local copy of the waitset
        // this enforces wait generations correctly
        {
        internal_m.lock($);
        
        local_waitset.swap(waitset);

        internal_m.unlock($);
        }

        // (*3)     

        // set events one by one;
        // this is a bit ugly, SCHED_OTHER and thread thrashing
        while( ! local_waitset.empty() )
        {
            HANDLE h = local_waitset.front();
            local_waitset.pop_front();
            SetEvent(h);
        }   
    }

};

I think it's pretty trivial and self-explanatory. A few important notes :

*1 : We use std::list here for simplicity, but in practice a better way would be to have a per-thread struct which contains the per-thread event and a forward & back pointer for linking. Then you don't have any dynamic allocations at all. One per-thread event here is all you need because a thread can only be in one wait at a time. Also there's no event lifetime issue because each thread only waits on its own event (we'll see issues with this in later implementations). (see for example Thomasson's sketch of such but it's pretty self-explanatory )

*2 : This is the crucial line of code for cond-var correctness. The external mutex is unlocked *after* the current thread is put in the waitset. This means that after we unlock the external mutex, even though we don't atomically go into the wait, we won't miss signal that happens between the unlock and the wait.

*3 : This where the "wait generation" is incremented. We swap the waiter set to a local copy and will signal the local copy. At this point new waiters can come in, and they will get added to the member variable waitset, but they don't affect our generation.

The nice thing about this style of implementation is that it only needs mutex and auto-reset events, which are probably the most portable of all synchronization primitives. So you can use this on absolutely any platform.

The disadvantage is that it's SCHED_OTHER (doesn't respect OS priorities) and it can have rather more thread switches than necessary.


The next version we'll look at is Thomassons two-event cond_var. There are a lot of broken versions of this idea around the net, so it's instructive to compare to (what I believe is) a correct one.

The basic idea is that you use two events. One is auto-reset (an auto-reset event is just like a semaphore with a max count of 1); the other is manual reset. signal() sets the auto-reset event to release one thread. broadcast() sets the manual-reset event to release all the threads (opens the gate and leaves it open). Sounds simple enough. The problem is that manual reset events are fraught with peril. Any time you see anyone say "manual reset event" you should think "ruh roh, race likely". However, handling it in this case is not that hard.

The easy way is to use the same trick we used above to handle broadcast with generations - we just swap out the "waitset" (in this case, the broadcast event) when we broadcast(). That way it is associated only with previous waiters, and new waiters can immediately come in and wait on the next generation's manual reset event.

The only ugly bit is handling the lifetime of the broadcast event. We want it to be killed when the last member of its generation is woken, and to get this right we need a little ref-counting mechanism.

So, here it is , based on the latest version from Thomasson of an idea that he posted many times in slightly different forms :


class thomasson_win_condvar
{
    enum { event_broadcast=0, event_signal = 1 };

    struct waitset
    {
        HANDLE m_events[2];
        std::atomic<int> m_refs;

        waitset(HANDLE signalEvent) : m_refs(1)
        {
            // signalEvent is always the same :
            m_events[event_signal] = signalEvent;

            // broadcast is manual reset : (that's the TRUE)
            m_events[event_broadcast] = CreateEvent(NULL, TRUE, FALSE, NULL);
        }
        
        ~waitset()
        {
            RL_ASSERT( m_refs($) == 0 );
    
            //if ( m_events[event_broadcast] )
            CloseHandle(m_events[event_broadcast]);
        }
    };


private:
    VAR_T(waitset*) m_waitset;
    CRITICAL_SECTION m_internal_mutex;
    CRITICAL_SECTION m_external_mutex;
    HANDLE m_signal_event;


public:
    thomasson_win_condvar()
    :   m_waitset(NULL)
    {
        m_signal_event = CreateEvent(NULL,0,0,NULL);
        InitializeCriticalSection(&m_internal_mutex);
        InitializeCriticalSection(&m_external_mutex);
    }

    ~thomasson_win_condvar()
    {
        RL_ASSERT( VAR(m_waitset) == NULL );
        CloseHandle(m_signal_event);
        DeleteCriticalSection(&m_internal_mutex);
        DeleteCriticalSection(&m_external_mutex);
    }


    void dec_ref_count(waitset * w)
    {
        EnterCriticalSection(&m_internal_mutex);
        // if I took waitsets refs to zero, free it

        if (w->m_refs($).fetch_add(-1, std::mo_relaxed) == 1)
        {
            std::atomic_thread_fence(std::mo_acquire,$);
            delete w;
            if ( w == VAR(m_waitset) )
                VAR(m_waitset) = NULL;
        }

        LeaveCriticalSection(&m_internal_mutex);
    }

    void inc_ref_count(waitset * w)
    {
        if ( ! w ) return;

        w->m_refs($).fetch_add(1,std::mo_relaxed);

        LeaveCriticalSection(&m_internal_mutex);
    }
        
public:
    void lock ()
    {
        EnterCriticalSection(&m_external_mutex);
    }
    void unlock ()
    {
        LeaveCriticalSection(&m_external_mutex);
    }

    void unlock_wait_lock()
    {
        waitset* w;
        
        {
        EnterCriticalSection(&m_internal_mutex);

        // make waitset on demand :
        w = VAR(m_waitset);

        if (! w)
        {
            w = new waitset(m_signal_event);
            VAR(m_waitset) = w;
        }
        else
        {
            inc_ref_count(w);
        }
        
        LeaveCriticalSection(&m_internal_mutex);
        }

        // note unlock of external after waitset update :
        LeaveCriticalSection(&m_external_mutex);

        // wait for *either* event :
        WaitForMultipleObjects(2, w->m_events, false, INFINITE);

        EnterCriticalSection(&m_external_mutex);
        
        dec_ref_count(w);
    }


    void broadcast()
    {
        EnterCriticalSection(&m_internal_mutex);

        // swap waitset to local state :
        waitset* w = VAR(m_waitset);

        VAR(m_waitset) = NULL;

        inc_ref_count(w);
        
        LeaveCriticalSection(&m_internal_mutex);

        // at this point a new generation of waiters can come in,
        //  but they will be on a new waitset

        if (w)
        {
            SetEvent(w->m_events[event_broadcast]);

            // note : broadcast event is actually never cleared (that would be a tricky race)
            // instead the waitset it used is deleted and not used again
            // a new waitset will be made with an un-set broadcast event

            dec_ref_count(w);
        }
    }


    void signal()
    {        
        EnterCriticalSection(&m_internal_mutex);

        waitset* w = VAR(m_waitset);

        inc_ref_count(w);
        
        LeaveCriticalSection(&m_internal_mutex);

        if (w)
        {
            SetEvent(w->m_events[event_signal]);

            dec_ref_count(w);
        }
    }

};

I don't think there's anything too interesting to say about this, the interesting bits are all commented in the code.

Basically the trick for avoiding the evilness of a manual reset event is just to make a new one after you set it to "open" and never try to set it to "closed" again. (of course you could set it to closed and recycle it through a pool instead of allocating a new one each time).

This code can be simplified/optimized in various ways, for example when you signal() you don't actually need to make or delete a waitset at all.

I believe you could also get rid of m_internal_mutex completely with a bit of care. Actually it doesn't take any care; if you require that signal() and broadcast() are always called from within the external lock, then the internal mutex isn't needed at all (the external lock serves to protect the things that it protects, namely the waitset).


The Terekhov condvar in pthreads-win32 (reportedly) uses a barrier to block entry to "wait" for new waiters after you "broadcast" but before all the waiters have woken up. It's a gate that's closed when you broadcast, the waiter count is remembered, and it's opened after they all wake up. This works but does cause thread thrashing; waiters who were blocked will go to sleep on the barrier, then wake up and rush in and immediately go to sleep in the wait on the condvar. (caveat : I haven't actually looked at the pthreads-win32 code other than to see that it's huge and complex and I didn't want to read it)

Doug Schmidt wrote the nice page on Strategies for Implementing POSIX cond vars on Win32 (which describes a lot of bad or broken ways (such as using PulseEvent or using SetEvent and trying to count down to reset it)). The way he implemented it in his ACE package is sort of similar to Terekhov's blocking mechanism. this extraction for qemu is a lot easier to follow than the ACE code and uses the same technique. At first I didn't think it worked at all, but the secret is that it blocks wake-stealers using the external mutex. The key is that in this implementation, "broadcast" has to be called inside the external mutex held. So what happens is broadcast wakes a bunch of guys, then waits on their wakes being done - it's still holding the external mutex. The waiters wake up, and dec the count and then try to lock the mutex and block on that. Eventually they all wake up and set an event so the broadcaster is allowed to resume. Now he leaves broadcast and unlocks the mutex and the guys who were woken up can now run. Stolen wakeups are prevented because the external mutex is held the whole time, so nobody can get into the cv to even try to wait.

I'm really not a fan of this style of condvar implementation. It causes lots of thread thrashing. It requires every single one of the threads broadcasted-to to wake up and go back to sleep before any one can run. Particularly in the Windows environment where individual threads can lose time for a very long time on multi-core machines, this is very bad.

Thomasson's earlier waitset didn't swap out the waitset in notify_all so it didn't get generations right. (he did later correct versions such as the one above)

Derevyago has posted a lot of condvars that are broken (manual reset event with ResetEvent() being used is immediately smelly and in this case is in fact broken). He also posted one that works which is similar to my first one here (FIFO, SCHED_OTHER, manual wait list).

Anthony Williams posted a reasonably simple sketch of a condvar ; it uses a manual reset event per generation which is swapped out on broadcast ; it's functionally identical to the Thomasson condvar, except that Anthony maintains a linked list of waiters instead of just inc/dec'ing a refcount. Anthony didn't provide signal() but it's trivial to do so by adding another manual-reset event.

Dmitry's fine grained eventcount looks like a nice way to build condvar, but I'm scared of it. If somebody ever makes a C++0x/Relacy version of that, let me know.

No comments:

old rants