Sunday, January 16, 2011

Implementing Haskell-style MVars in C++ using futexes

Last time we talked about what Linux futexes were. Now, I want to use them to implement a concept from Haskell called an MVar. An MVar can be thought of as concurrent queue with a size of 1, that is it is a box which you can put things in, and take things out safely from multiple threads. The basic operations are two:

  • put(T value)

  • T take()


When the MVar is empty, take'ing blocks, when it is full, put'ing blocks. Real Haskell MVars have guarantees with regards to fairness, we're not going to address those here. In fact, we're going to do a "cute" implementation, and it has several flaws. First, it's only going to work on x86-32, this will not be a portable implemnetation. But this lets us be "cute", our MVar is going to be only the size of an int. Below is `ptrmvar32`, it is a 32bit void* MVar implementation that we we'll use to build our MVar<T>. NULL will represent empty, and we'll use the LSB to note whether we need to wake anyone up (using only one bit for this is one of the flaws).


#ifndef PTRMVAR32_H
#define PTRMVAR32_H
#include <stdint.h>
#include <assert.h>
class ptrmvar32
{
uintptr_t m_val;
static char s_pointers_are_32_bits[sizeof(void*) == 4 ? 1 : -1];
static void futex_wait(void *, uintptr_t);
static void futex_wake(void *);
public:
ptrmvar32(void *initial_value) : m_val(reinterpret_cast<uintptr_t>(initial_value))
{
assert(m_val);
}

ptrmvar32() : m_val(0)
{}

void* take()
{
uintptr_t curr;
while(1) {
curr = m_val;
if( curr == 0 )
{
if( __sync_bool_compare_and_swap(&m_val, 0, 1) )
futex_wait(&m_val, 1);
} else if( curr == 1 ) {
futex_wait(&m_val, 1);
} else {
if( __sync_bool_compare_and_swap(&m_val, curr, 0) )
{
if( curr & 1 )
{
futex_wake(&m_val);
}
curr &= ~1;
return reinterpret_cast(curr);
}
}
}
}

void put(void *val)
{
const uintptr_t new_val = reinterpret_cast<uintptr_t>(val);
uintptr_t curr;
while(1) {
curr = m_val;
if( curr == 0 )
{
if( __sync_bool_compare_and_swap(&m_val, 0, new_val) )
return;
} else if( curr == 1 ) {
if( __sync_bool_compare_and_swap(&m_val, 1, new_val) ) {
futex_wake(&m_val);
return;
}
} else {
if( __sync_bool_compare_and_swap(&m_val, curr, curr | 1) ) {
futex_wait(&m_val, curr | 1);
}
}
}
}

void* unsafe_value()
{
return reinterpret_cast<void*>( m_val & ~1);
}
};
#endif

ptrmvar32.cpp contains the implementation of futex_wake and futex_wait which are wrappers around the futex syscall


#include "ptrmvar32.h"
#include &t;linux/futex.h>
#include <sys/syscall.h>
#include <unistd.h>
#include <limits>

void ptrmvar32::futex_wait(void *on, uintptr_t when)
{
syscall(__NR_futex, on, FUTEX_WAIT, when, NULL, 0, 0);
}

void ptrmvar32::futex_wake(void *on)
{
syscall(__NR_futex, on, FUTEX_WAKE, std::numeric_limits<int32_t>::max(), NULL, 0, 0);
}



As you can see futex_wake wakes up everyone, not just one waiter. This is because someone could be waiting to take and someone could be waiting to put and if we wake the wrong one of the two we'll never wake up. If this situation does actually occur we end up in thudering herd territory.

Next we build our typesafe, templated version on top of this:


#include "ptrmvar32.h"

template<typename T>
class MVar
{
ptrmvar32 m_impl;

public:
MVar() : m_impl()
{}

MVar(const T& val) : m_impl(static_cast(new T(val)))
{}

void put(const T& val)
{
T* p = new T(val);
m_impl.put(static_cast(p));
}

T take()
{
T *p = static_cast(m_impl.take());
T r(*p);
delete p;
return r;
}

~MVar()
{
delete static_cast(m_impl.unsafe_value());
}
};


Now put and take use new and delete, really we should be using a thread local allocator here, but I got too lazy to implement that part. Maybe next time. Ignoring that, you can see that this is a very light weight primitive, only taking 4 bytes. The pthread based implementation would take at least a mutex and condition variable, probably two condition variables to avoid the problem of waking up the wrong threads that this implementation is suspectible to. However, this is implemented the way it is to set up for next time, where we are going to talk about the implications of laziness and purity, and the operation that Haskell doesn't provide but could.

Here's a sample program making use of the MVars:


#include "mvar_template.h"
#include <string>
#include <iostream>
#include <pthread.h>

using namespace std;

namespace
{
MVar<string> in;
MVar<string> out;

void *worker(void *)
{
cout << "waiting for value" << endl;
string v = in.take();
cout << "got value: " << v << endl;
v += " gotten";
out.put(v);
return NULL;
}

}

int main()
{
cout << sizeof(in) << endl;
string s = "in";
pthread_t t;
pthread_create(&t, NULL, &worker, NULL);
pthread_detach(t);
in.put(s);
string r = out.take();
cout << "got " << r << endl;
}

No comments: