7/30/2011

07-30-11 - A look at some bounded queues - part 2

Okay, let's look into making an MPMC bounded FIFO queue.

We can use basically the same two ideas that we worked up last time.

First let's try to do one based on the read and write indexes being atomic. Consider the consumer; the check for empty now is much more race prone, because there may be another consumer simultaneously reading, which could turn the queue into empty state while you are reading. Thus we need a more single atomic moment to detect "empty" and reserve our read slot.

The most brute-force way to do this kind of thing is always to munge the two variables together. In this case we stick the read & write index into one int together. Now we can atomically check "empty" in one go. We're going to put rdwr in a 32-bit int and use the top and bottom 16 bits for the read index and write index.

So you can reserve a read slot something like this :


    nonatomic<t_element> * read_fetch()
    {
        unsigned int rdwr = m_rdwr($).load(mo_acquire);
        unsigned int rd;
        for(;;)
        {
            rd = (rdwr>>16) & 0xFFFF;
            int wr = rdwr & 0xFFFF;
            
            if ( wr == rd ) // empty
                return false;
                
            if ( m_rdwr($).compare_exchange_weak(rdwr,rdwr+(1<<16),mo_acq_rel) )
                break;
        }
                
        nonatomic<t_element> * p = & ( m_array[ rd % t_size ] );

        return p;
    }

but this doesn't work by itself. We have succeeded in atomically checking "empty" and reserving our read slot, but now the read index no longer indicates that the read has completed, it only indicates that a reader reserved that slot. For the writer to be able to write to that slot it needs to know the read has completed, so we need to publish the read through a separate read counter.

The end result is this :


template <typename t_element, size_t t_size>
struct mpmc_boundq_1_alt
{
//private:

    // elements should generally be cache-line-size padded :
    nonatomic<t_element>  m_array[t_size];
    
    // rdwr counts the reads & writes that have started
    atomic<unsigned int>    m_rdwr;
    // "read" and "written" count the number completed
    atomic<unsigned int>    m_read;
    atomic<unsigned int>    m_written;

public:

    mpmc_boundq_1_alt() : m_rdwr(0), m_read(0), m_written(0)
    {
    }

    //-----------------------------------------------------

    nonatomic<t_element> * read_fetch()
    {
        unsigned int rdwr = m_rdwr($).load(mo_acquire);
        unsigned int rd,wr;
        for(;;)
        {
            rd = (rdwr>>16) & 0xFFFF;
            wr = rdwr & 0xFFFF;
            
            if ( wr == rd ) // empty
                return false;
                
            if ( m_rdwr($).compare_exchange_weak(rdwr,rdwr+(1<<16),mo_acq_rel) )
                break;
        }
        
        // (*1)
        rl::backoff bo;
        while ( (m_written($).load(mo_acquire) & 0xFFFF) != wr )
        {
            bo.yield($);
        }
        
        nonatomic<t_element> * p = & ( m_array[ rd % t_size ] );

        return p;
    }
    
    void read_consume() 
    {
        m_read($).fetch_add(1,mo_release);
    }

    //-----------------------------------------------------

    nonatomic<t_element> * write_prepare()
    {
        unsigned int rdwr = m_rdwr($).load(mo_acquire);
        unsigned int rd,wr;
        for(;;)
        {
            rd = (rdwr>>16) & 0xFFFF;
            wr = rdwr & 0xFFFF;
            
            if ( wr == ((rd + t_size)&0xFFFF) ) // full
                return NULL;
                
            if ( m_rdwr($).compare_exchange_weak(rdwr,(rd<<16) | ((wr+1)&0xFFFF),mo_acq_rel) )
                break;
        }
        
        // (*1)
        rl::backoff bo;
        while ( (m_read($).load(mo_acquire) & 0xFFFF) != rd )
        {
            bo.yield($);
        }
        
        nonatomic<t_element> * p = & ( m_array[ wr % t_size ] );
        
        return p;
    }
    
    void write_publish()
    {
        m_written($).fetch_add(1,mo_release);
    }

    //-----------------------------------------------------
    
    
};

We now have basically two read counters - one is the number of read_fetches and the other is the number of read_consumes (the difference is the number of reads that are currently in progress). Now we have the complication at the spot :

(*1) : after we reserve a read slot - we will be able to read it eventually, but the writer may not yet be done, so we have to wait for him to do his write_publish and let us know which one is done. Furthermore, we don't keep track of which thread is writing this particular slot, so we actually have to wait for all pending writes to be done. (if we just waited for the write count to increment, a later slot might get written first and we would read the wrong thing)

Now, the careful reader might think that the check at (*1) doesn't work. What they think is :

You can't wait for m_written to be == wr , because wr is just the write reservation count that we saw when we grabbed our read slot. After we grab our read slot, several writes might actually complete, which would make m_written > wr ! And we would infinite loop!

But no. In fact, that would be an issue if this was a real functioning asynchronous queue, but it actually isn't. This queue actually runs in lockstep. The reason is that at (*1) in read_fetch, I have already grabbed a read slot, but not done the read. What that means is that no writers can progress because they will see a read in progress that has not completed. So this case where m_written runs past wr can't happen. If a write is in progress, all readers wait until all the writes are done; then once the reads are in progress, any writers trying to get in wait for the reads to get done.

So, this queue sucks. It has an obvious "wait" spin loop, which is always bad. It also is an example of "apparently lockfree" code that actually acts just like a mutex. (in fact, you may have noticed that the code here is almost identical to a ticket lock - that's in fact what it is!).

How do we fix it? Well one obvious problem is when we wait at *1 we really only need to wait on that particular item, instead of all pending ops. So rather than a global read count and written count that we publish to notify that we're done, we should have a flag or a count in the slot, more like our second spsc bounded queue.

So we'll leave the "rdwr" single variable for where the indexes are, and we'll just wait on publication per slot :


template <typename t_element, size_t t_size>
struct mpmc_boundq_2
{
    enum { SEQ_EMPTY = 0x80000 };

    struct slot
    {
        atomic<unsigned int>    seq;
        nonatomic<t_element>    item;
        char pad[ LF_CACHE_LINE_SIZE - sizeof(t_element) - sizeof(unsigned int) ];
    };

    slot m_array[t_size];

    atomic<unsigned int>    m_rdwr;

public:

    mpmc_boundq_2() : m_rdwr(0)
    {
        for(int i=0;i<t_size;i++)
        {
            int next_wr = i& 0xFFFF;
            m_array[i].seq($).store( next_wr | SEQ_EMPTY , mo_seq_cst );
        }
    }

    //-----------------------------------------------------

    bool push( const t_element & T )
    {
        unsigned int rdwr = m_rdwr($).load(mo_relaxed);
        unsigned int rd,wr;
        for(;;)
        {
            rd = (rdwr>>16) & 0xFFFF;
            wr = rdwr & 0xFFFF;
            
            if ( wr == ((rd + t_size)&0xFFFF) ) // full
                return false;
                
            if ( m_rdwr($).compare_exchange_weak(rdwr,(rd<<16) | ((wr+1)&0xFFFF),mo_relaxed) )
                break;
        }
        
        slot * p = & ( m_array[ wr % t_size ] );
        
        // wait if reader has not actually finished consuming it yet :
        rl::backoff bo;
        while ( p->seq($).load(mo_acquire) != (SEQ_EMPTY|wr) )
        {
            bo.yield($);
        }
        
        p->item($) = T; 
        
        // this publishes that the write is done :
        p->seq($).store( wr , mo_release );
        
        return true;
    }

    //-----------------------------------------------------
    
    bool pop( t_element * pT )
    {
        unsigned int rdwr = m_rdwr($).load(mo_relaxed);
        unsigned int rd,wr;
        for(;;)
        {
            rd = (rdwr>>16) & 0xFFFF;
            wr = rdwr & 0xFFFF;
            
            if ( wr == rd ) // empty
                return false;
                
            if ( m_rdwr($).compare_exchange_weak(rdwr,rdwr+(1<<16),mo_relaxed) )
                break;
        }
        
        slot * p = & ( m_array[ rd % t_size ] );
        
        rl::backoff bo;
        while ( p->seq($).load(mo_acquire) != rd )
        {
            bo.yield($);
        }
        
        *pT = p->item($);
        
        // publish that the read is done :
        int next_wr = (rd+t_size)& 0xFFFF;
        p->seq($).store( next_wr | SEQ_EMPTY , mo_release );
    
        return true;
    }
    
};

just to confuse things I've changed the API to a more normal push/pop , but this is identical to the first queue except that we now wait on publication per slot.

So, this is a big improvement. In particular we actually get parallelism now, while one write is waiting, another write can go ahead and proceed if the read on its slot is still pending.

"mpmc_boundq_1_alt" suffered from the bad problem that if one reader swapped out during its read, then all writers would be blocked from proceeding (and that would then block all other readers). Now, we no longer have that. If a reader is swapped out, it only blocks the write of that particular slot (and of course blocks if you wrap around the circular buffer).

This is still bad, because you basically have a "wait" on a particular thread and you are just spinning.

Now, if you look at "mpmc_boundq_2" you may notice that the operations on the "rdwr" indexes are actually relaxed memory order - they need to be atomic RMW's, but they actually are not the gate for access and publication - the "seq" variable is now the gate.

This suggests that we could make the read and write indexes separate variables that are only owned by their particular side - like "spsc_boundq2" from the last post , we want to detect the full and empty conditions by using the "seq" variable in the slots, rather than looking across at the reader/writer indexes.

So it's obvious we can do this a lot like spsc_boundq2 ; the reader index is owned only by reader threads; we have to use an atomic RMW because there are now many readers instead of one. Publication and access checking is done only through the slots.

Each slot contains the index of the last access to that slot + a flag for whether the last access was a read or a write :


template <typename t_element, size_t t_size>
struct mpmc_boundq_3
{
    enum { SEQ_EMPTY = 0x80000 };
    enum { COUNTER_MASK = 0xFFFF };

    struct slot
    {
        atomic<unsigned int>    seq;
        nonatomic<t_element>    item;
        char pad[ LF_CACHE_LINE_SIZE - sizeof(t_element) - sizeof(unsigned int) ];
    };

    // elements should generally be cache-line-size padded :
    slot m_array[t_size];
    
    atomic<unsigned int>    m_rd;
    char m_pad[ LF_CACHE_LINE_SIZE ];
    atomic<unsigned int>    m_wr;

public:

    mpmc_boundq_3() : m_rd(0), m_wr(0)
    {
        for(int i=0;i<t_size;i++)
        {
            int next_wr = i & COUNTER_MASK;
            m_array[i].seq($).store( next_wr | SEQ_EMPTY , mo_seq_cst );
        }
    }

    //-----------------------------------------------------

    bool push( const t_element & T )
    {
        unsigned int wr = m_wr($).load(mo_acquire);
        slot * p = & ( m_array[ wr % t_size ] );
        rl::backoff bo;
        for(;;)
        {       
            unsigned int seq = p->seq($).load(mo_acquire);
            
            // if it's flagged empty and the index is right, take this slot :
            if ( seq == (SEQ_EMPTY|wr) )
            {
                // try acquire the slot and advance the write index :
                if ( m_wr($).compare_exchange_weak(wr,(wr+1)& COUNTER_MASK,mo_acq_rel) )
                    break;
            
                // contention, retry
            }
            else
            {
                // (*2)
                return false;
            }
            
            p = & ( m_array[ wr % t_size ] );

            // (*1)
            bo.yield($);
        }

        RL_ASSERT( p->seq($).load(mo_acquire) == (SEQ_EMPTY|wr) );
        
        // do the write :
        p->item($) = T; 
        
        // this publishes it :
        p->seq($).store( wr , mo_release );
        
        return true;
    }

    //-----------------------------------------------------
    
    bool pop( t_element * pT )
    {
        unsigned int rd = m_rd($).load(mo_acquire);
        slot * p = & ( m_array[ rd % t_size ] );
        rl::backoff bo;
        for(;;)
        {       
            unsigned int seq = p->seq($).load(mo_acquire);
            
            if ( seq == rd )
            {
                if ( m_rd($).compare_exchange_weak(rd,(rd+1)& COUNTER_MASK,mo_acq_rel) )
                    break;
            
                // retry
            }
            else
            {
                return false;
            }
            
            p = & ( m_array[ rd % t_size ] );

            bo.yield($);
        }
                            
        RL_ASSERT( p->seq($).load(mo_acquire) == rd );
        
        // do the read :
        *pT = p->item($);
        
        int next_wr = (rd+t_size)& 0xFFFF;
        p->seq($).store( next_wr | SEQ_EMPTY , mo_release );
    
        return true;
    }
    
};

So our cache line contention is pretty good. Only readers pass around the read index; only writers pass around the write index. The gate is on the slot that you have to share anyway. It becomes blocking only when near full or near empty. But all is not roses.

Some notes :

(*1) : the yield loop here might look analogous to before, but in fact you only loop here for RMW contention - that is, this is not a "spin wait" , it's a lockfree-contention-spin.

(*2) : when do we return false here? When the queue is full. But that's not all. We also return false when there is a pending read on this slot. In fact our spin-wait loop still exists and it's just been pushed out to the higher level.

To use this queue you have to do :


while ( ! push(item) ) {
    // wait-spin here
}

which is the spin-wait. The wait on a reader being done with your slot is inherent to these methods and it's still there.

What if we only want to return false when the queue is full, and spin for a busy wait ? We then have to look across at the reader index to check for full vs. read-in-progress. It looks like this :


    bool push( const t_element & T )
    {
        unsigned int wr = m_wr($).load(mo_acquire);
        rl::backoff bo;
        for(;;)
        {
            slot * p = & ( m_array[ wr % t_size ] );
        
            unsigned int seq = p->seq($).load(mo_acquire);
            
            if ( seq == (SEQ_EMPTY|wr) )
            {
                if ( m_wr($).compare_exchange_weak(wr,(wr+1)& COUNTER_MASK,mo_acq_rel) )
                    break;
            
                // retry spin due to RMW contention
            }
            else
            {
                // (*3)
                if ( seq <= wr ) // (todo : doesn't handle wrapping)
                {
                    // full?
                    // (*4)
                    unsigned int rd = m_rd($).load(mo_acquire);
                    if ( wr == ((rd+t_size)&COUNTER_MASK) )
                        return false;
                }
                            
                wr = m_wr($).load(mo_acquire);

                // retry spin due to read in progress
            }
            
            bo.yield($);
        }
        
        slot * p = & ( m_array[ wr % t_size ] );
        
        // wait if reader has not actually finished consuming it yet :
        RL_ASSERT( p->seq($).load(mo_acquire) == (SEQ_EMPTY|wr) );
        
        p->item($) = T; 
        
        // this publishes it :
        p->seq($).store( wr , mo_release );
        
        return true;
    }

which has the two different types of spins. (notes on *3 and *4 in a moment)

What if you try to use this boundedq to make a queue that blocks when it is empty or full?

Obviously you use two semaphores :


template <typename t_element, size_t t_size>
class mpmc_boundq_blocking
{
    mpmc_boundq_3<t_element,t_size> m_queue;

    fastsemaphore   pushsem;
    fastsemaphore   popsem;

public:
    
    mpmc_boundq_blocking() : pushsem(t_size), popsem(0)
    {
    }
    
    void push( const t_element & T )
    {
        pushsem.wait();
    
        bool pushed = queue.push(x);
        RL_ASSERT( pushed );
        
        popsem.post();
    }
    
    void pop( t_element * pT )
    {
        popsem.wait();
        
        bool popped = queue.pop(&x);
        RL_ASSERT( popped );
                
        pushsem.post(); 
    }
    
};

now push() blocks when it's full and pop() blocks when it's empty. The asserts are only correct when we use the modified push/pop that correctly checks for full/empty and spins during contention.

But what's up with the full check? At (*3) we see that the item we are trying to write has a previous write index in it that has not been read. So we must be full already, right? We have semaphores telling us that slots are available or not, why are they not reliable? Why do we need (*4) also?

Because reads can go out of order.


    say the queue was full
    then two reads come in and grab slots, but don't finish their read yet
    then only the second one finishes its read
    it posts the semaphore
    so I think I can write
    I grab a write slot
    but it's not empty yet
    it's actually a later slot that's empty
    and I need to wait for this reader to finish
    
Readers with good memory might remember this issue from our analysis of Thomasson's simple MPMC which was built on two semaphores - and had this exact same problem. If a reader is swapped out, then other readers can post the pushsem, so writers will wake up and try to write, but the first writer can't make progress because its slot is still in use by a swapped out reader.

Note that this queue that we've wound up with is identical to Dmitry's MPMC Bounded Queue .

There are a lot of hidden issues with it that are not at all apparent from Dmitry's page. If you look at his code you might not notice that : 1) enqueue doesn't only return false when full, 2) there is a cas-spin and a wait-spin together in the same loop, 3) while performance is good in the best case, it's arbitrarily bad if a thread can get swapped out.

Despite their popularity, I think all the MPMC bounded queues like this are a bad idea in non-kernel environments ("kernel-like" to me means you have manual control over which threads are running; eg. you can be sure that the thread you are waiting on is running; some game consoles are "kernel-like" BTW).

(in contrast, I think the spsc bounded queue we did last time is quite good; in fact this was a "rule of thumb" that I posted back in the original lockfree series ages ago - whenever possible avoid MPMC structures, prefer SPSC, even multi-plexed SPSC , hell even two-mutex-protected SPSC can be better than MPMC).

No comments:

old rants