Re: [PATCH] ipc/msg: Implement lockless pipelined wakeups

From: Manfred Spraul
Date: Sun Nov 01 2015 - 00:32:13 EST


Hi Sebastian,

On 10/30/2015 12:26 PM, Sebastian Andrzej Siewior wrote:
This patch moves the wakeup_process() invocation so it is not done under
the perm->lock by making use of a lockless wake_q. With this change, the
waiter is woken up once the message has been assigned and it does not
need to loop on SMP if the message points to NULL. In the signal case we
still need to check the pointer under the lock to verify the state.

This change should also avoid the introduction of preempt_disable() in
-RT which avoids a busy-loop which pools for the NULL -> !NULL
change if the waiter has a higher priority compared to the waker.
with regards to functional tests:
fakeroot is a heavy system V user (at least if it is configured for sysv).
"make -j" under fakeroot was a useful stresstest

with regards to benchmarks:
I've attached one of my files.

Otherwise: Nice!

--
Manfred

/*
* pmsg.cpp, parallel sysv msg pingpong
*
* Copyright (C) 1999, 2001, 2005, 2008 by Manfred Spraul.
* All rights reserved except the rights granted by the GPL.
*
* Redistribution of this file is permitted under the terms of the GNU
* General Public License (GPL) version 2 or later.
* $Header$
*/

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <getopt.h>
#include <errno.h>
#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/msg.h>
#include <pthread.h>

//////////////////////////////////////////////////////////////////////////////

static enum {
WAITING,
RUNNING,
STOPPED,
} volatile g_state = WAITING;

unsigned long long *g_results;
int g_svmsg_id;
pthread_t *g_threads;

struct taskinfo {
int svmsg_id;
int threadid;
int cpuid;
int sender;
int offset;
};

#define DATASIZE 16

void* worker_thread(void *arg)
{
struct taskinfo *ti = (struct taskinfo*)arg;
unsigned long long rounds;
int ret;
struct {
long mtype;
unsigned char buffer[DATASIZE];
} mbuf;

{
cpu_set_t cpus;
CPU_ZERO(&cpus);
CPU_SET(ti->cpuid, &cpus);

ret = pthread_setaffinity_np(g_threads[ti->threadid], sizeof(cpus), &cpus);
if (ret < 0) {
printf("pthread_setaffinity_np failed for thread %d with errno %d.\n",
ti->threadid, errno);
}

ret = pthread_getaffinity_np(g_threads[ti->threadid], sizeof(cpus), &cpus);
if (ret < 0) {
printf("pthread_getaffinity_np() failed for thread %d with errno %d.\n",
ti->threadid, errno);
fflush(stdout);
} else {
printf("thread %d: sysvmsg %8d, offset 0x%08x type %d bound to %04lxh\n",ti->threadid,
ti->svmsg_id, ti->offset, ti->sender, cpus.__bits[0]);
}
fflush(stdout);
}

rounds = 0;
while(g_state == WAITING) {
#ifdef __i386__
__asm__ __volatile__("pause": : :"memory");
#endif
}

if (ti->sender) {
mbuf.mtype = ti->offset+ti->sender+1;
mbuf.buffer[0] = ti->offset & 0xff;
mbuf.buffer[1] = (ti->offset >> 8) & 0xff;
ret = msgsnd(ti->svmsg_id, &mbuf, DATASIZE, 0);
if (ret != 0) {
printf("Initial send failed, errno %d.\n", errno);
exit(1);
}
}
while(g_state == RUNNING) {
int target = ti->offset+1+!ti->sender;

ret = msgrcv(ti->svmsg_id, &mbuf, DATASIZE, target, 0);
if (ret != DATASIZE) {
if (errno == EIDRM)
break;
printf("Error on msgrcv, got %d, errno %d.\n", ret, errno);
exit(1);
}
if ((mbuf.buffer[0] != (unsigned)(ti->offset & 0xff)) ||
(mbuf.buffer[1] != (unsigned)((ti->offset >> 8) & 0xff))) {
printf("Error - incorrect message received.\n");
printf("cpu %d ti->offset 0x%08x ti->sender %d.\n",
ti->cpuid, ti->offset, ti->sender);

printf("got %02x%02x.\n",
(unsigned char)mbuf.buffer[0],
(unsigned char)mbuf.buffer[1]);
exit(1);
}
mbuf.mtype = ti->offset+ti->sender+1;
ret = msgsnd(ti->svmsg_id, &mbuf, DATASIZE, 0);
if (ret != 0) {
if (errno == EIDRM)
break;
printf("send failed, errno %d.\n", errno);
exit(1);
}
rounds++;
}
/* store result */
g_results[ti->threadid] = rounds;

pthread_exit(0);
return NULL;
}

void init_threads(int cpu, int cpus)
{
int ret;
struct taskinfo *ti1, *ti2;

ti1 = new (struct taskinfo);
ti2 = new (struct taskinfo);
if (!ti1 || !ti2) {
printf("Could not allocate task info\n");
exit(1);
}

if (cpu == 0) {
g_svmsg_id = msgget(IPC_PRIVATE,0777|IPC_CREAT);
if(g_svmsg_id == -1) {
printf(" message queue create failed.\n");
exit(1);
}
}

g_results[cpu] = 0;
g_results[cpu+cpus] = 0;

ti1->svmsg_id = g_svmsg_id;
ti1->offset = 3*cpu+5;
ti1->threadid = cpu;
ti1->cpuid = cpu;
ti1->sender = 1;
ti2->svmsg_id = g_svmsg_id;
ti2->offset = ti1->offset;
ti2->threadid = cpu+cpus;
ti2->cpuid = cpu;
ti2->sender = 0;

ret = pthread_create(&g_threads[ti1->threadid], NULL, worker_thread, ti1);
if (ret) {
printf(" pthread_create failed with error code %d\n", ret);
exit(1);
}
ret = pthread_create(&g_threads[ti2->threadid], NULL, worker_thread, ti2);
if (ret) {
printf(" pthread_create failed with error code %d\n", ret);
exit(1);
}
}

//////////////////////////////////////////////////////////////////////////////

int main(int argc, char **argv)
{
int queues, timeout;
unsigned long long totals;
int i;
int res;

printf("pmsg [nr queues] [timeout]\n");
if (argc != 3) {
printf(" Invalid parameters.\n");
return 0;
}
queues = atoi(argv[1]);
timeout = atoi(argv[2]);
printf("Using %d queues/cpus (%d threads) for %d seconds.\n",
queues, 2*queues, timeout);

g_results = new unsigned long long[2*queues];
g_threads = new pthread_t[2*queues];
for (i=0;i<queues;i++) {
init_threads(i, queues);
}

sleep(1);
g_state = RUNNING;
sleep(timeout);
g_state = STOPPED;
sleep(1);

res = msgctl(g_svmsg_id,IPC_RMID,NULL);
if (res < 0) {
printf("msgctl(IPC_RMID) failed for %d, errno%d.\n",
g_svmsg_id, errno);
}
for (i=0;i<2*queues;i++)
pthread_join(g_threads[i], NULL);

printf("Result matrix:\n");
totals = 0;
for (i=0;i<queues;i++) {
printf(" Thread %3d: %8lld %3d: %8lld\n",
i, g_results[i], i+queues, g_results[i+queues]);
totals += g_results[i] + g_results[i+queues];
}
printf("Total: %lld\n", totals);
}