[RFC PATCH 1/2] ring-buffer: Introducing ring-buffer mapping functions

From: Vincent Donnefort
Date: Sun Feb 12 2023 - 10:33:17 EST


In preparation for allowing the user-space to map a ring-buffer, add
a set of mapping functions:

ring_buffer_{map,unmap}()
ring_buffer_map_fault()

And controls on the ring-buffer:

ring_buffer_get_reader_page() /* swap reader and head */
ring_buffer_update_meta_page()

Mapping the ring-buffer also involves:

A unique ID for each page of the ring-buffer, as currently the pages
are only identified through their in-kernel VA.

A meta-page, where are stored statistics about the ring-buffer and
the list of pages ID, ordered. A field gives what page is the reader
one and one to gives where the ring-buffer starts in the list of data
pages.

The linear mapping exposes the meta-page, and each page of the
ring-buffer, ordered following their unique ID, assigned during the
first mapping.

Once mapped, no page can get in or out of the ring-buffer: the buffer
size will remain unmodified and the splice enabling functions will in
reality simply memcpy the data instead of swapping pages.

Also, the meta-page being... a single page, this limits at the moment the
number of pages in the ring-buffer that can be mapped.

Signed-off-by: Vincent Donnefort <vdonnefort@xxxxxxxxxx>

diff --git a/include/linux/ring_buffer.h b/include/linux/ring_buffer.h
index 782e14f62201..4897e17ebdde 100644
--- a/include/linux/ring_buffer.h
+++ b/include/linux/ring_buffer.h
@@ -6,6 +6,8 @@
#include <linux/seq_file.h>
#include <linux/poll.h>

+#include <uapi/linux/trace_mmap.h>
+
struct trace_buffer;
struct ring_buffer_iter;

@@ -211,4 +213,10 @@ int trace_rb_cpu_prepare(unsigned int cpu, struct hlist_node *node);
#define trace_rb_cpu_prepare NULL
#endif

+int ring_buffer_map(struct trace_buffer *buffer, int cpu);
+int ring_buffer_unmap(struct trace_buffer *buffer, int cpu);
+struct page *ring_buffer_map_fault(struct trace_buffer *buffer, int cpu,
+ unsigned long pgoff);
+int ring_buffer_get_reader_page(struct trace_buffer *buffer, int cpu);
+int ring_buffer_update_meta_page(struct trace_buffer *buffer, int cpu);
#endif /* _LINUX_RING_BUFFER_H */
diff --git a/include/uapi/linux/trace_mmap.h b/include/uapi/linux/trace_mmap.h
new file mode 100644
index 000000000000..0f3282fa1a94
--- /dev/null
+++ b/include/uapi/linux/trace_mmap.h
@@ -0,0 +1,14 @@
+#ifndef _UAPI_TRACE_MMAP_H_
+#define _UAPI_TRACE_MMAP_H_
+
+struct ring_buffer_meta_page {
+ __u64 entries;
+ __u64 overrun;
+ __u32 pages_touched;
+ __u32 reader_page;
+ __u32 nr_data_pages;
+ __u32 data_page_head;
+ __u32 data_pages[];
+};
+
+#endif /* _UAPI_TRACE_MMAP_H_ */
diff --git a/kernel/trace/ring_buffer.c b/kernel/trace/ring_buffer.c
index c366a0a9ddba..ffbb216a18a9 100644
--- a/kernel/trace/ring_buffer.c
+++ b/kernel/trace/ring_buffer.c
@@ -20,6 +20,7 @@
#include <linux/percpu.h>
#include <linux/mutex.h>
#include <linux/delay.h>
+#include <linux/rwsem.h>
#include <linux/slab.h>
#include <linux/init.h>
#include <linux/hash.h>
@@ -332,6 +333,7 @@ struct buffer_page {
local_t entries; /* entries on this page */
unsigned long real_end; /* real end of data */
struct buffer_data_page *page; /* Actual data page */
+ u32 id; /* ID for external mapping */
};

/*
@@ -529,6 +531,12 @@ struct ring_buffer_per_cpu {
rb_time_t before_stamp;
u64 event_stamp[MAX_NEST];
u64 read_stamp;
+
+ atomic_t mapped;
+ struct rw_semaphore mapping_lock;
+ unsigned long *page_ids; /* ID to addr */
+ void *meta_page;
+
/* ring buffer pages to update, > 0 to add, < 0 to remove */
long nr_pages_to_update;
struct list_head new_pages; /* new pages to add */
@@ -1452,12 +1460,41 @@ static inline void rb_inc_page(struct buffer_page **bpage)
*bpage = list_entry(p, struct buffer_page, list);
}

+static inline void
+rb_meta_page_head_move(struct ring_buffer_per_cpu *cpu_buffer, unsigned long num)
+{
+ struct ring_buffer_meta_page *meta = cpu_buffer->meta_page;
+ unsigned long head_id = meta->data_page_head;
+
+ /* No bookkeeping necessary */
+ if (!atomic_read(&cpu_buffer->mapped))
+ return;
+
+ meta->data_page_head = (head_id + num) % cpu_buffer->nr_pages;
+}
+
+static inline void
+rb_meta_page_head_swap(struct ring_buffer_per_cpu *cpu_buffer)
+{
+ struct ring_buffer_meta_page *meta = cpu_buffer->meta_page;
+
+ /* No bookkeeping necessary */
+ if (!atomic_read(&cpu_buffer->mapped))
+ return;
+
+ meta->reader_page = cpu_buffer->reader_page->id;
+ meta->data_pages[meta->data_page_head] = cpu_buffer->head_page->id;
+
+ rb_meta_page_head_move(cpu_buffer, 1);
+}
+
static struct buffer_page *
rb_set_head_page(struct ring_buffer_per_cpu *cpu_buffer)
{
struct buffer_page *head;
struct buffer_page *page;
struct list_head *list;
+ unsigned long cnt = 0;
int i;

if (RB_WARN_ON(cpu_buffer, !cpu_buffer->head_page))
@@ -1479,9 +1516,11 @@ rb_set_head_page(struct ring_buffer_per_cpu *cpu_buffer)
do {
if (rb_is_head_page(page, page->list.prev)) {
cpu_buffer->head_page = page;
+ rb_meta_page_head_move(cpu_buffer, cnt);
return page;
}
rb_inc_page(&page);
+ cnt++;
} while (page != head);
}

@@ -1757,6 +1796,7 @@ rb_allocate_cpu_buffer(struct trace_buffer *buffer, long nr_pages, int cpu)
init_irq_work(&cpu_buffer->irq_work.work, rb_wake_up_waiters);
init_waitqueue_head(&cpu_buffer->irq_work.waiters);
init_waitqueue_head(&cpu_buffer->irq_work.full_waiters);
+ init_rwsem(&cpu_buffer->mapping_lock);

bpage = kzalloc_node(ALIGN(sizeof(*bpage), cache_line_size()),
GFP_KERNEL, cpu_to_node(cpu));
@@ -2195,7 +2235,6 @@ int ring_buffer_resize(struct trace_buffer *buffer, unsigned long size,
/* prevent another thread from changing buffer sizes */
mutex_lock(&buffer->mutex);

-
if (cpu_id == RING_BUFFER_ALL_CPUS) {
/*
* Don't succeed if resizing is disabled, as a reader might be
@@ -3504,7 +3543,7 @@ static void check_buffer(struct ring_buffer_per_cpu *cpu_buffer,
return;

/*
- * If this interrupted another event,
+ * If this interrupted another event,
*/
if (atomic_inc_return(this_cpu_ptr(&checking)) != 1)
goto out;
@@ -4665,6 +4704,7 @@ rb_get_reader_page(struct ring_buffer_per_cpu *cpu_buffer)
* Now make the new head point back to the reader page.
*/
rb_list_head(reader->list.next)->prev = &cpu_buffer->reader_page->list;
+ rb_meta_page_head_swap(cpu_buffer);
rb_inc_page(&cpu_buffer->head_page);

local_inc(&cpu_buffer->pages_read);
@@ -5511,6 +5551,12 @@ int ring_buffer_swap_cpu(struct trace_buffer *buffer_a,
cpu_buffer_a = buffer_a->buffers[cpu];
cpu_buffer_b = buffer_b->buffers[cpu];

+ if (atomic_read(&cpu_buffer_a->mapped) ||
+ atomic_read(&cpu_buffer_b->mapped)) {
+ ret = -EBUSY;
+ goto out;
+ }
+
/* At least make sure the two buffers are somewhat the same */
if (cpu_buffer_a->nr_pages != cpu_buffer_b->nr_pages)
goto out;
@@ -5804,14 +5850,19 @@ int ring_buffer_read_page(struct trace_buffer *buffer,
cpu_buffer->read += rb_page_entries(reader);
cpu_buffer->read_bytes += BUF_PAGE_SIZE;

- /* swap the pages */
- rb_init_page(bpage);
- bpage = reader->page;
- reader->page = *data_page;
- local_set(&reader->write, 0);
- local_set(&reader->entries, 0);
- reader->read = 0;
- *data_page = bpage;
+ if (likely(!atomic_read(&cpu_buffer->mapped))) {
+ /* swap the pages */
+ rb_init_page(bpage);
+ bpage = reader->page;
+ reader->page = *data_page;
+ local_set(&reader->write, 0);
+ local_set(&reader->entries, 0);
+ reader->read = 0;
+ *data_page = bpage;
+ } else {
+ memcpy(bpage->data, cpu_buffer->reader_page->page->data,
+ PAGE_SIZE);
+ }

/*
* Use the real_end for the data size,
@@ -5856,6 +5907,290 @@ int ring_buffer_read_page(struct trace_buffer *buffer,
}
EXPORT_SYMBOL_GPL(ring_buffer_read_page);

+#define META_PAGE_MAX_PAGES \
+ ((PAGE_SIZE - (offsetof(struct ring_buffer_meta_page, data_page_head))) >> 2)
+
+static void rb_free_page_ids(struct ring_buffer_per_cpu *cpu_buffer)
+{
+ kfree(cpu_buffer->page_ids);
+ cpu_buffer->page_ids = NULL;
+}
+
+static int rb_alloc_meta_page(struct ring_buffer_per_cpu *cpu_buffer)
+{
+ if (cpu_buffer->meta_page)
+ return 0;
+
+ if ((cpu_buffer->nr_pages + 1) > META_PAGE_MAX_PAGES)
+ return -E2BIG;
+
+ cpu_buffer->meta_page = page_to_virt(alloc_page(GFP_USER));
+ if (!cpu_buffer->meta_page)
+ return -ENOMEM;
+
+ return 0;
+}
+
+static void rb_free_meta_page(struct ring_buffer_per_cpu *cpu_buffer)
+{
+ free_page((unsigned long)cpu_buffer->meta_page);
+ cpu_buffer->meta_page = NULL;
+}
+
+static void rb_update_meta_page(struct ring_buffer_per_cpu *cpu_buffer)
+{
+ struct ring_buffer_meta_page *meta;
+ unsigned long flags;
+
+ /* Update the head page if the writer moved it */
+ raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags);
+ rb_set_head_page(cpu_buffer);
+ raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags);
+
+ /*
+ * Instead of letting the writer carry the meta page burden,
+ * give the responsibility to the reader.
+ */
+ meta = (struct ring_buffer_meta_page *)cpu_buffer->meta_page;
+ meta->entries = local_read(&cpu_buffer->entries);
+ meta->pages_touched = local_read(&cpu_buffer->pages_touched);
+ meta->overrun = local_read(&cpu_buffer->overrun);
+}
+
+static void rb_setup_ids_meta_page(struct ring_buffer_per_cpu *cpu_buffer,
+ unsigned long *page_ids)
+{
+ struct buffer_page *first_page, *bpage;
+ struct ring_buffer_meta_page *meta;
+ int id = 0, i = 0;
+
+ meta = (struct ring_buffer_meta_page *)cpu_buffer->meta_page;
+
+ meta->reader_page = cpu_buffer->reader_page->id;
+ meta->nr_data_pages = cpu_buffer->nr_pages;
+ meta->data_page_head = 0;
+
+ page_ids[id] = (unsigned long)cpu_buffer->reader_page->page;
+ cpu_buffer->reader_page->id = id;
+
+ id++;
+
+ first_page = bpage = rb_set_head_page(cpu_buffer);
+ do {
+ if (i >= META_PAGE_MAX_PAGES) {
+ WARN_ON(1);
+ break;
+ }
+
+ page_ids[id] = (unsigned long)bpage->page;
+ bpage->id = id;
+ meta->data_pages[i] = id;
+
+ rb_inc_page(&bpage);
+ i++; id++;
+ } while (bpage != first_page);
+
+ /* install page ID to kern VA translation */
+ cpu_buffer->page_ids = page_ids;
+}
+
+static struct ring_buffer_per_cpu *
+rb_get_mapped_buffer(struct trace_buffer *buffer, int cpu)
+{
+ struct ring_buffer_per_cpu *cpu_buffer;
+
+ if (!cpumask_test_cpu(cpu, buffer->cpumask))
+ return ERR_PTR(-EINVAL);
+
+ cpu_buffer = buffer->buffers[cpu];
+
+ down_read(&cpu_buffer->mapping_lock);
+
+ if (!atomic_read(&cpu_buffer->mapped)) {
+ up_read(&cpu_buffer->mapping_lock);
+ return ERR_PTR(-ENODEV);
+ }
+
+ return cpu_buffer;
+}
+
+void rb_put_mapped_buffer(struct ring_buffer_per_cpu *cpu_buffer)
+{
+ up_read(&cpu_buffer->mapping_lock);
+}
+
+int ring_buffer_map(struct trace_buffer *buffer, int cpu)
+{
+ struct ring_buffer_per_cpu *cpu_buffer;
+ unsigned long flags, *page_ids;
+ int err = 0;
+
+ if (!cpumask_test_cpu(cpu, buffer->cpumask))
+ return -EINVAL;
+
+ cpu_buffer = buffer->buffers[cpu];
+
+ down_write(&cpu_buffer->mapping_lock);
+
+ if (atomic_read(&cpu_buffer->mapped)) {
+ atomic_inc(&cpu_buffer->mapped);
+ goto unlock;
+ }
+
+ /* prevent another thread from changing buffer sizes */
+ mutex_lock(&buffer->mutex);
+ atomic_inc(&cpu_buffer->resize_disabled);
+ mutex_unlock(&buffer->mutex);
+
+ err = rb_alloc_meta_page(cpu_buffer);
+ if (err) {
+ atomic_dec(&cpu_buffer->resize_disabled);
+ goto unlock;
+ }
+
+ /* page_ids include the reader page while nr_pages does not */
+ page_ids = kzalloc(sizeof(*page_ids) * (cpu_buffer->nr_pages + 1),
+ GFP_KERNEL);
+ if (!page_ids) {
+ rb_free_meta_page(cpu_buffer);
+ atomic_dec(&cpu_buffer->resize_disabled);
+ err = -ENOMEM;
+ goto unlock;
+ }
+
+ /*
+ * Lock all other readers as we'll disable the splice and enable the
+ * meta-page data_head_page book keeping.
+ */
+ raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags);
+
+ rb_setup_ids_meta_page(cpu_buffer, page_ids);
+ atomic_inc(&cpu_buffer->mapped);
+
+ raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags);
+unlock:
+ up_write(&cpu_buffer->mapping_lock);
+
+ return err;
+}
+
+int ring_buffer_unmap(struct trace_buffer *buffer, int cpu)
+{
+ struct ring_buffer_per_cpu *cpu_buffer;
+ int err = 0;
+
+ if (!cpumask_test_cpu(cpu, buffer->cpumask))
+ return -EINVAL;
+
+ cpu_buffer = buffer->buffers[cpu];
+
+ down_write(&cpu_buffer->mapping_lock);
+
+ if (!atomic_read(&cpu_buffer->mapped)) {
+ err = -ENODEV;
+ goto unlock;
+ }
+
+ if (atomic_dec_and_test(&cpu_buffer->mapped)) {
+ unsigned long flags;
+
+ /*
+ * Lock all readers to make sure none will attempt meta-page
+ * book keeping while we free the resources
+ */
+ raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags);
+
+ rb_free_page_ids(cpu_buffer);
+ rb_free_meta_page(cpu_buffer);
+ atomic_dec(&cpu_buffer->resize_disabled);
+
+ raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags);
+ }
+
+unlock:
+ up_write(&cpu_buffer->mapping_lock);
+
+ return err;
+}
+
+/*
+ * +--------------+
+ * | meta page | pgoff=0
+ * +--------------+
+ * | data page1 | pgoff=1 page_ids=0
+ * +--------------+
+ * | data page2 | pgoff=2 page_ids=1
+ * ...
+ */
+struct page *ring_buffer_map_fault(struct trace_buffer *buffer, int cpu,
+ unsigned long pgoff)
+{
+ struct ring_buffer_per_cpu *cpu_buffer;
+ struct page *page = NULL;
+
+ cpu_buffer = rb_get_mapped_buffer(buffer, cpu);
+ if (IS_ERR(cpu_buffer))
+ return NULL;
+
+ if (!cpu_buffer->page_ids) {
+ WARN_ON(1);
+ goto put;
+ }
+
+ if (pgoff == 0) {
+ page = virt_to_page(cpu_buffer->meta_page);
+ } else {
+ pgoff--;
+
+ if (pgoff > (cpu_buffer->nr_pages))
+ goto put;
+
+ page = virt_to_page(cpu_buffer->page_ids[pgoff]);
+ }
+put:
+ rb_put_mapped_buffer(cpu_buffer);
+
+ return page;
+}
+
+int ring_buffer_get_reader_page(struct trace_buffer *buffer, int cpu)
+{
+ struct ring_buffer_per_cpu *cpu_buffer;
+ struct buffer_page *bpage, *reader;
+ unsigned long flags;
+ int err = 0;
+
+ cpu_buffer = rb_get_mapped_buffer(buffer, cpu);
+ if (IS_ERR(cpu_buffer))
+ return (int)PTR_ERR(cpu_buffer);
+
+ raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags);
+ reader = cpu_buffer->reader_page;
+ reader->read = rb_page_size(reader);
+ bpage = rb_get_reader_page(cpu_buffer);
+ if (!bpage)
+ err = -ENODEV;
+ raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags);
+
+ rb_put_mapped_buffer(cpu_buffer);
+
+ return err;
+}
+
+int ring_buffer_update_meta_page(struct trace_buffer *buffer, int cpu)
+{
+ struct ring_buffer_per_cpu *cpu_buffer;
+
+ cpu_buffer = rb_get_mapped_buffer(buffer, cpu);
+ if (IS_ERR(cpu_buffer))
+ return PTR_ERR(cpu_buffer);
+
+ rb_update_meta_page(cpu_buffer);
+ rb_put_mapped_buffer(cpu_buffer);
+
+ return 0;
+}
+
/*
* We only allocate new buffers, never free them if the CPU goes down.
* If we were to free the buffer, then the user would lose any trace that was in
--
2.39.1.581.gbfd45094c4-goog