Re: [rfc][patch] queued spinlocks (i386)

From: Nick Piggin
Date: Thu Mar 29 2007 - 03:16:37 EST


On Thu, Mar 29, 2007 at 03:36:52AM +0200, Nick Piggin wrote:
> In most cases, no. For the uncontended case they should be about the
> same. They have the same spinning behaviour. However there is a little
> window where they might be a bit slower I think... actually perhaps I'm
> wrong!
>
> Currently if you have 4 CPUs spinning and the lock is released, all 4
> CPU cachelines will be invalidated, then they will be loaded again, and
> found to be 0, so they all try to atomic_dec_return the counter, each
> one invalidating others' cachelines. 1 gets through.
>
> With my queued locks, all 4 cachelines are invalidated and loaded, but
> only one will be allowed to proceed, and there are 0 atomic operations
> or stores of any kind.
>
> So I take that back: our current spinlocks have a worse thundering herd
> behaviour under contention than my queued ones. So I'll definitely
> push the patch through.

OK, it isn't a big difference, but a user-space test is showing slightly
(~2%) improvement in the contended case on a 16 core Opteron.

There is a case where the present spinlocks are almost twice as fast on
this machine (in terms of aggregate throughput), and that is when a lock
is taken right after it is released. This is because the same CPU will
often be able to retake the lock without transitioning the cache. This is
going to be a rare case for us, and would suggest suboptimal code anyway
(ie. the lock should just be kept rather than dropped and retaken).

Actually, one situation where it comes up is when we drop and retake a
lock that needs_lockbreak. Of course, the queued lock behaviour is
desired in that case anyway.

However single-thread performance is presently a bit down. OTOH, the
assembly generated by gcc looks like it could be improved upon (even by
me :P).

This is what I've got so far. Should work for i386 and x86_64. Any
enhancements or results from other CPUs would be interesting.

--
#define _GNU_SOURCE
#include <stdlib.h>
#include <assert.h>
#include <stdio.h>
#include <pthread.h>
#include <sched.h>
#include <sys/time.h>

#define likely(x) __builtin_expect(!!(x), 1)
#define unlikely(x) __builtin_expect(!!(x), 0)
static inline void cpu_relax(void)
{
asm volatile("rep ; nop" : : : "memory");
}

static unsigned long delta(struct timeval start, struct timeval end)
{
return (end.tv_sec - start.tv_sec) * 1000000
+ end.tv_usec - start.tv_usec;
}

static unsigned long lps;
static void calibrate_delay(void)
{
struct timeval start, now;
unsigned long i;
gettimeofday(&start, NULL);
i = 0;
gettimeofday(&start, NULL);
do {
gettimeofday(&now, NULL);
i++;
} while (delta(start, now) < 1000000 * 2);
lps = i / 2;
printf("%lu lps\n", lps);
}

static void delay(unsigned long usec)
{
struct timeval now;
unsigned long loops = (lps*usec + 999999) / 1000000;
unsigned long i;

for (i = 0; i < loops; i++)
gettimeofday(&now, NULL);
}

#define QUEUE_LOCKS
#ifdef QUEUE_LOCKS

typedef struct {
unsigned short qhead;
unsigned short qtail;
} spinlock_t;

#define SPIN_LOCK_UNLOCKED (spinlock_t){ 0, 0 }

static inline void spin_lock(spinlock_t *lock)
{
unsigned short pos = 1;

asm volatile("lock ; xaddw %0, %1\n\t"
: "+r" (pos), "+m" (lock->qhead) : : "memory");
while (unlikely(pos != lock->qtail))
cpu_relax();
}

static inline void spin_unlock(spinlock_t *lock)
{
asm volatile("addw $1, %0\n\t"
: "+m" (lock->qtail) : : "memory");
}
#else
typedef struct {
unsigned int slock;
} spinlock_t;

#define SPIN_LOCK_UNLOCKED (spinlock_t){ 1 }

static inline void spin_lock(spinlock_t *lock)
{
asm volatile(
"1:\n\t"
"lock ; decl %0\n\t"
"jns 2f\n\t"
"3:\n\t"
"rep;nop\n\t"
"cmpl $0,%0\n\t"
"jle 3b\n\t"
"jmp 1b\n\t"
"2:\n\t" : "=m" (lock->slock) : : "memory");
}

static inline void spin_unlock(spinlock_t *lock)
{
asm volatile("movl $1,%0" :"=m" (lock->slock) :: "memory");
}
#endif

static struct {
spinlock_t lock;
unsigned long count;
} __attribute__((__aligned__(128))) data;


#undef MULTI_THREAD
#ifdef MULTI_THREAD
#define NR_THREADS 16
#define NR_ITERS 200000
#else
#define NR_THREADS 1
#define NR_ITERS 1000000000
#endif

static long go = 0;

static void *thread(void *arg)
{
cpu_set_t mask;
struct sched_param param;
long nr = (long)arg;
long i;

assert(nr < NR_THREADS);

CPU_ZERO(&mask);
CPU_SET(nr, &mask);

if (sched_setaffinity(0, sizeof(mask), &mask) == -1)
perror("sched_setaffinity"), exit(1);

param.sched_priority = 90;
if (sched_setscheduler(0, SCHED_FIFO, &param) == -1)
perror("sched_setscheduler"), exit(1);

while (!go)
sched_yield();

for (i = 0; i < NR_ITERS; i++) {
/*
* delay outside the lock should be non-zero to simulate
* reasonable code. The larger the delay inside the lock
* in relation to the delay outside, the more contention.
* For single-thread mode, delays aren't so critical.
*/
spin_lock(&data.lock);
data.count++;
spin_unlock(&data.lock);
// delay(1);
}

return NULL;
}

int main(void)
{
pthread_t threads[NR_THREADS];
struct sched_param param;
long i;

param.sched_priority = 90;
if (sched_setscheduler(0, SCHED_FIFO, &param) == -1)
perror("sched_setscheduler"), exit(1);

data.lock = SPIN_LOCK_UNLOCKED;
data.count = 0;

calibrate_delay();

for (i = 0; i < NR_THREADS; i++) {
if (pthread_create(&threads[i], NULL, thread, (void *)i) == -1)
perror("pthread_create"), exit(1);
}

go = 1;
sched_yield();

for (i = 0; i < NR_THREADS; i++) {
if (pthread_join(threads[i], NULL) == -1)
perror("pthread_join"), exit(1);
}

assert(data.count == NR_ITERS*NR_THREADS);

exit(0);
}
-
To unsubscribe from this list: send the line "unsubscribe linux-kernel" in
the body of a message to majordomo@xxxxxxxxxxxxxxx
More majordomo info at http://vger.kernel.org/majordomo-info.html
Please read the FAQ at http://www.tux.org/lkml/