7/09/2011

07-09-11 - LockFree - Thomasson's simple MPMC

Warming back up into this stuff, here's some very simple algos to study.

first fastsemaphore :


class fastsemaphore
{
    rl::atomic<long> m_count;
    rl::rl_sem_t m_waitset;

public:
    fastsemaphore(long count = 0)
    :   m_count(count)
    {
        RL_ASSERT(count > -1);
        sem_init(&m_waitset, 0, 0);
    }

    ~fastsemaphore()
    {
        sem_destroy(&m_waitset);
    }

public:
    void post()
    {
        if (m_count($).fetch_add(1) < 0)
        {
            sem_post(&m_waitset);
        }
    }

    void wait()
    {
        if (m_count($).fetch_add(-1) < 1)
        {
            // loop because sem_wait returns non-zero for spurious failure
            while (sem_wait(&m_waitset));
        }
    }
};

Most code I post will be in Relacy notation, which is just modified C++0x. Note that C++0x atomics without explicit memory ordering specifications (such as the fetch_adds here) default to memory_order_seq_cst (sequential consistency).

Basically your typical OS "semaphore" is a very heavy kernel-space object (on Win32 for example, semaphores are cross-process). Just doing P or V on it even when you don't modify wait states is very expensive. This is just a user-space wrapper which only calls to the kernel semaphore if it is at an edge transition that will cause a thread to either go to sleep or wake up.

So this is a simple thing that's nice to have. Note that m_count is always 0 or negative. If it's negative it's the (minus) number of threads that are sleeping on that semaphore. (between post and wakeup a thread can be sleeping but no longer counted, so we should say that threads which are not in minus m_count are either running or pending-running).

( see here )

Now we can look at Thomasson's very simple MPMC bounded blocking queue :


template<typename T, std::size_t T_depth>
class mpmcq
{
    rl::atomic<T*> m_slots[T_depth];
    rl::atomic<std::size_t> m_push_idx;
    rl::atomic<std::size_t> m_pop_idx;
    fastsemaphore m_push_sem;
    fastsemaphore m_pop_sem;

public:
    mpmcq() 
    :   m_push_idx(T_depth), 
        m_pop_idx(0),
        m_push_sem(T_depth),
        m_pop_sem(0)
    {
        for (std::size_t i = 0; i < T_depth; ++i)
        {
            m_slots[i]($).store(NULL);
        }
    }

public:
    void push(T* ptr)
    {
        m_push_sem.wait();

        std::size_t idx = m_push_idx($).fetch_add(1) & (T_depth - 1);

        rl::backoff backoff;

        while (m_slots[idx]($).load())
        {
            backoff.yield($);
        }

        RL_ASSERT(! m_slots[idx]($).load());

        m_slots[idx]($).store(ptr);

        m_pop_sem.post();
    }


    T* pop()
    {
        m_pop_sem.wait();

        std::size_t idx = m_pop_idx($).fetch_add(1) & (T_depth - 1);

        T* ptr;
        rl::backoff backoff;

        while ( (ptr = m_slots[idx]($).load()) == NULL )
        {
            backoff.yield($);
        }
        
        m_slots[idx]($).store(NULL);

        m_push_sem.post();

        return ptr;
    }
};

First let's understand what's going on here. It's just an array of slots with a reader index and writer index that loop around. "pop_sem" counts the number of filled slots - so the popper waits on that semaphore to see filled slots be non-zero. "push_sem" counts the number of available slots - so the pusher waits on that being greater than zero to be able to fill a slot.

So the producer and consumer both nicely go to sleep and wake each other when they should. Also because we use "fastsemaphore" they have reasonably low overhead when they are in the non-sleeping case.

Now, why is the weird backoff logic there? It's because of the "M" (for multiple) in MPMC. If this was an SPSC queue then it could be much simpler :


    void push(T* ptr)
    {
        m_push_sem.wait();

        std::size_t idx = m_push_idx($).fetch_add(1) & (T_depth - 1);

        RL_ASSERT(! m_slots[idx]($).load());

        m_slots[idx]($).store(ptr);

        m_pop_sem.post();
    }


    T* pop()
    {
        m_pop_sem.wait();

        std::size_t idx = m_pop_idx($).fetch_add(1) & (T_depth - 1);

        /* (*1) */

        T* ptr = m_slots[idx]($).exchange(NULL);
        RL_ASSERT( ptr != NULL );

        m_push_sem.post();

        return ptr;
    }

which should be pretty obviously correct for SPSC.

But, now consider you have multiple consumers and the queue is completely full.

Consumer 1 gets a pop_idx = 2. But then at (*1) it swaps out and doesn't run any more.

Consumer 2 gets a pop_idx = 3 and runs through and posts to the push semaphore.

Now a producer runs and gets push_idx = 2. It believes there is an empty slot it can write to, but it looks in slot 2 and there's still something there (because consumer 1 hasn't cleared it's slot yet). So, it has to do the backoff-yield loop to give consumer 1 some CPU time to let it run.

So the MPMC with backoff-yield works, but it's not great. As long as the queue is near empty it works reasonably well, but when it's full it acts like a mutex-based queue, in that one consumer being swapped out can block all your pushers from running (and because it's just a busy wait here, the normal OS hacks to rescue you (like Windows priority boosts) won't work here (this kind of thing is exactly why the Windows scheduler has so many hacks and why despite your whining you really do want it to be like that)).

1 comment:

malte said...

That fastsemaphore implementation reminded me of the benaphore in beos.

old rants