[PATCH v2 0/2] Introducing trace buffer mapping by user-space

From: Vincent Donnefort
Date: Wed Mar 22 2023 - 06:27:02 EST


The tracing ring-buffers can be stored on disk or sent to network without any
copy via splice. However the later doesn't allow real time processing of the
traces. A solution is to give access to userspace to the ring-buffer pages
directly via a mapping. A piece of software can now become a reader of the
ring-buffer, and drive a consuming or non-consuming read in a similar fashion to
what trace and trace_pipe offer.

Attached to this cover letter an example of consuming read for a ring-buffer,
using libtracefs.

Vincent

v1 -> v2:
* Hide data_pages from the userspace struct
* Fix META_PAGE_MAX_PAGES
* Support for order > 0 meta-page
* Add missing page->mapping.

Vincent Donnefort (2):
ring-buffer: Introducing ring-buffer mapping functions
tracing: Allow user-space mapping of the ring-buffer

--

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <stdarg.h>
#include <signal.h>
#include <errno.h>
#include <unistd.h>
#include <tracefs.h>
#include <kbuffer.h>
#include <event-parse.h>

#include <asm/types.h>
#include <sys/mman.h>
#include <sys/ioctl.h>

#define TRACE_MMAP_IOCTL_GET_READER_PAGE _IO('T', 0x1)

struct ring_buffer_meta_page_header {
__u64 entries;
__u64 overrun;
__u32 pages_touched;
__u32 meta_page_size;
__u32 reader_page; /* ID of the current reader page */
__u32 nr_data_pages; /* doesn't take into account the reader_page */
__u32 data_page_head; /* ring-buffer head as an offset from data_start */
__u32 data_start; /* offset within the meta page */
};

/* Need to access private struct to save counters */
struct kbuffer {
unsigned long long timestamp;
long long lost_events;
unsigned long flags;
void *subbuffer;
void *data;
unsigned int index;
unsigned int curr;
unsigned int next;
unsigned int size;
unsigned int start;
unsigned int first;

unsigned int (*read_4)(void *ptr);
unsigned long long (*read_8)(void *ptr);
unsigned long long (*read_long)(struct kbuffer *kbuf, void *ptr);
int (*next_event)(struct kbuffer *kbuf);
};

static char *argv0;
static bool need_exit;

static char *get_this_name(void)
{
static char *this_name;
char *arg;
char *p;

if (this_name)
return this_name;

arg = argv0;
p = arg+strlen(arg);

while (p >= arg && *p != '/')
p--;
p++;

this_name = p;
return p;
}

static void __vdie(const char *fmt, va_list ap, int err)
{
int ret = errno;
char *p = get_this_name();

if (err && errno)
perror(p);
else
ret = -1;

fprintf(stderr, " ");
vfprintf(stderr, fmt, ap);

fprintf(stderr, "\n");
exit(ret);
}

void pdie(const char *fmt, ...)
{
va_list ap;

va_start(ap, fmt);
__vdie(fmt, ap, 1);
va_end(ap);
}

static void read_page(struct tep_handle *tep, struct kbuffer *kbuf,
void *data, int page)
{
static struct trace_seq seq;
struct tep_record record;

if (seq.buffer)
trace_seq_reset(&seq);
else
trace_seq_init(&seq);

while ((record.data = kbuffer_read_event(kbuf, &record.ts))) {
kbuffer_next_event(kbuf, NULL);
tep_print_event(tep, &seq, &record,
"%s-%d %9d\t%s\n", TEP_PRINT_COMM,
TEP_PRINT_PID, TEP_PRINT_TIME, TEP_PRINT_NAME);
trace_seq_do_printf(&seq);
trace_seq_reset(&seq);
}
}

static int next_reader_page(int fd, struct ring_buffer_meta_page_header *meta,
struct kbuffer *kbuf)
{
int prev_reader_page = meta->reader_page;

if (ioctl(fd, TRACE_MMAP_IOCTL_GET_READER_PAGE) < 0)
pdie("ioctl");

return meta->reader_page;
}

static void signal_handler(int unused)
{
printf("Exit!\n");
need_exit = true;
}

int main(int argc, char **argv)
{
int page_size, meta_len, data_len, page, fd, start = -1;
struct ring_buffer_meta_page_header *map;
struct kbuffer *kbuf, prev_kbuf;
struct tep_handle *tep;
__u64 prev_entries;
void *meta, *data;
char *buf, path[32];
int cpu;

argv0 = argv[0];
cpu = atoi(argv[1]);
snprintf(path, 32, "per_cpu/cpu%d/trace_pipe_raw", cpu);

signal(SIGINT, signal_handler);
tep = tracefs_local_events(NULL);
kbuf = tep_kbuffer(tep);
page_size = getpagesize();

fd = tracefs_instance_file_open(NULL, path, O_RDONLY);
if (fd < 0)
pdie("raw");

meta = mmap(NULL, page_size, PROT_READ, MAP_SHARED, fd, 0);
if (meta == MAP_FAILED)
pdie("mmap");
map = meta;
meta_len = map->meta_page_size;

if (meta_len > page_size) {
munmap(meta, page_size);
meta = mmap(NULL, meta_len, PROT_READ, MAP_SHARED, fd, 0);
if (meta == MAP_FAILED)
pdie("mmap");
map = meta;
}

printf("entries: %llu\n", map->entries);
printf("overrun: %llu\n", map->overrun);
printf("pages_touched: %u\n", map->pages_touched);
printf("reader_page: %u\n", map->reader_page);
printf("nr_data_pages: %u\n\n", map->nr_data_pages);

data_len = page_size * (map->nr_data_pages + 1);

data = mmap(NULL, data_len, PROT_READ, MAP_SHARED, fd, meta_len);
if (data == MAP_FAILED)
pdie("mmap data");

page = ((struct ring_buffer_meta_page_header *)meta)->reader_page;
again:
do {
kbuffer_load_subbuffer(kbuf, data + page_size * page);

if (page != start) {
printf("READER PAGE: %d\n", map->reader_page);
} else {
kbuf->curr = prev_kbuf.curr;
kbuf->index = prev_kbuf.index;
kbuf->next = prev_kbuf.next;
kbuf->timestamp = prev_kbuf.timestamp;
kbuffer_next_event(kbuf, NULL);
}

prev_entries = map->entries;
start = page;

read_page(tep, kbuf, data, page);
} while ((page = next_reader_page(fd, meta, kbuf)) != start);

prev_kbuf.curr = kbuf->curr;
prev_kbuf.index = kbuf->index;
prev_kbuf.next = kbuf->next;
prev_kbuf.timestamp = kbuf->timestamp;

while (prev_entries == *(volatile __u64 *)&map->entries && !need_exit)
usleep(100000);

if (!need_exit)
goto again;

munmap(data, data_len);
munmap(meta, page_size);
close(fd);

return 0;
}


include/linux/ring_buffer.h | 8 +
include/uapi/linux/trace_mmap.h | 28 +++
kernel/trace/ring_buffer.c | 384 +++++++++++++++++++++++++++++++-
kernel/trace/trace.c | 76 ++++++-
4 files changed, 490 insertions(+), 6 deletions(-)
create mode 100644 include/uapi/linux/trace_mmap.h

--
2.40.0.rc1.284.g88254d51c5-goog