Re: [RFC][PATCH] ring-buffer: Have nested events still record running time stamp

From: Steven Rostedt
Date: Thu Jun 25 2020 - 22:36:23 EST


On Thu, 25 Jun 2020 15:35:02 -0400 (EDT)
Mathieu Desnoyers <mathieu.desnoyers@xxxxxxxxxxxx> wrote:

> Those performance and reentrancy concerns are why I always stick to local_t
> (long), and never use a full 64-bit type for anything that has to
> do with concurrent store/load between execution contexts in lttng.

Although the adding of this patch slows the recording down by around 7
nanoseconds (average 55ns brought to 62ns), but I can wrap it so that
it stays local64_t for 64 bit and use this for 32 bit.

I added three primitives:

bool rb_time_read(rb_time_t *t, u64 *val)

It loads val with t if t wasn't being updated at the time. If it is, it
returns false and val is not touched, otherwise it returns true and val
has a real value (either before, or updated by an interrupt).

void rb_time_set(rb_time_t *t, u64 val)

This will update t with val, and wont return until it has done so. It
will overwrite any update that interrupted the process.

bool rb_time_cmpxchg(rb_time_t *t, u64 expect, u64 set);

This will attempt to do a cmpxchg on t, if it matches "expect" it will
write "set". If it gets interrupted, then it will detect that, and return
false. Otherwise it will return true if it fully changed the value.


This is all I need, and is rather trivial to implement. The trick is
the order of rb_time_t is always written from:

start_cnt
top
bottom
end_cnt

This means that end_cnt will never be greater than start_cnt. If we see
that it is, it means that the process was interrupted with another
update. That process that interrupted a write, would complete its write.

I don't plan on applying this, but if it turns out that local64_t is a
problem on 32 bit archs (and I hear complaints), I add this (and but
still use local64_t on 64 bit archs).

-- Steve

Index: linux-trace.git/kernel/trace/ring_buffer.c
===================================================================
--- linux-trace.git.orig/kernel/trace/ring_buffer.c
+++ linux-trace.git/kernel/trace/ring_buffer.c
@@ -446,6 +446,22 @@ enum {
RB_CTX_MAX
};

+struct rb_time_struct {
+ local_t start_cnt;
+ local_t top;
+ local_t bottom;
+ local_t end_cnt;
+};
+
+struct rb_time_read {
+ unsigned long start_cnt;
+ unsigned long top;
+ unsigned long bottom;
+ unsigned long end_cnt;
+};
+
+typedef struct rb_time_struct rb_time_t;
+
/*
* head_page == tail_page && head == tail then buffer is empty.
*/
@@ -482,8 +498,8 @@ struct ring_buffer_per_cpu {
unsigned long read;
unsigned long read_bytes;
unsigned long next_write;
- local64_t write_stamp;
- local64_t before_stamp;
+ rb_time_t write_stamp;
+ rb_time_t before_stamp;
u64 read_stamp;
/* ring buffer pages to update, > 0 to add, < 0 to remove */
long nr_pages_to_update;
@@ -526,6 +542,78 @@ struct ring_buffer_iter {
int missed_events;
};

+static bool rb_time_read(rb_time_t *t, u64 *ret)
+{
+ struct rb_time_read r;
+ u64 val;
+
+ do {
+ r.start_cnt = local_read(&t->start_cnt);
+ r.top = local_read(&t->top);
+ r.bottom = local_read(&t->bottom);
+ r.end_cnt = local_read(&t->end_cnt);
+ } while (r.start_cnt < r.end_cnt);
+
+ if (r.start_cnt != r.end_cnt)
+ return false;
+
+ val = r.top;
+ val <<= 32;
+ val |= r.bottom;
+ *ret = val;
+ return true;
+}
+
+static inline void rb_time_read_set(struct rb_time_read *r, u64 val)
+{
+ r->top = (unsigned long)(val >> 32);
+ r->bottom = (unsigned long)(val & ((1ULL << 32) - 1));
+}
+
+static void rb_time_set(rb_time_t *t, u64 val)
+{
+ struct rb_time_read r;
+
+ rb_time_read_set(&r, val);
+
+ do {
+ r.start_cnt = local_inc_return(&t->start_cnt);
+ local_set(&t->top, r.top);
+ local_set(&t->bottom, r.bottom);
+ local_set(&t->end_cnt, r.start_cnt);
+ } while (r.start_cnt != local_read(&t->start_cnt));
+}
+
+static bool rb_time_read_cmpxchg(local_t *l, unsigned long expect, unsigned long set)
+{
+ unsigned long ret;
+
+ ret = local_cmpxchg(l, expect, set);
+ return ret == expect;
+}
+
+static bool rb_time_cmpxchg(rb_time_t *t, u64 expect, u64 set)
+{
+ struct rb_time_read e, s;
+
+ rb_time_read_set(&e, expect);
+ rb_time_read_set(&s, set);
+
+ e.start_cnt = local_read(&t->start_cnt);
+ e.end_cnt = local_read(&t->end_cnt);
+
+ s.start_cnt = e.start_cnt + 1;
+ s.end_cnt = e.start_cnt;
+
+ if (!rb_time_read_cmpxchg(&t->start_cnt, e.start_cnt, s.start_cnt))
+ return false;
+ if (!rb_time_read_cmpxchg(&t->top, e.top, s.top))
+ return false;
+ if (!rb_time_read_cmpxchg(&t->bottom, e.bottom, s.bottom))
+ return false;
+ return rb_time_read_cmpxchg(&t->end_cnt, e.end_cnt, s.end_cnt);
+}
+
/**
* ring_buffer_nr_pages - get the number of buffer pages in the ring buffer
* @buffer: The ring_buffer to get the number of pages from
@@ -2543,7 +2631,8 @@ rb_try_to_discard(struct ring_buffer_per

delta = rb_time_delta(event);

- write_stamp = local64_read(&cpu_buffer->write_stamp);
+ if (!rb_time_read(&cpu_buffer->write_stamp, &write_stamp))
+ return 0;

/* Make sure the write stamp is read before testing the location */
barrier();
@@ -2552,11 +2641,10 @@ rb_try_to_discard(struct ring_buffer_per
unsigned long write_mask =
local_read(&bpage->write) & ~RB_WRITE_MASK;
unsigned long event_length = rb_event_length(event);
- u64 ret;

- ret = local64_cmpxchg(&cpu_buffer->write_stamp, write_stamp, write_stamp - delta);
/* Something came in, can't discard */
- if (ret != write_stamp)
+ if (!rb_time_cmpxchg(&cpu_buffer->write_stamp,
+ write_stamp, write_stamp - delta))
return 0;

/*
@@ -2887,11 +2975,13 @@ static noinline void
rb_handle_timestamp(struct ring_buffer_per_cpu *cpu_buffer,
struct rb_event_info *info)
{
+ u64 write_stamp;
+
WARN_ONCE(info->delta > (1ULL << 59),
KERN_WARNING "Delta way too big! %llu ts=%llu write stamp = %llu\n%s",
(unsigned long long)info->delta,
(unsigned long long)info->ts,
- (unsigned long long)local64_read(&cpu_buffer->write_stamp),
+ (unsigned long long)(rb_time_read(&cpu_buffer->write_stamp, &write_stamp) ? write_stamp : 0),
sched_clock_stable() ? "" :
"If you just came from a suspend/resume,\n"
"please switch to the trace global clock:\n"
@@ -2909,14 +2999,16 @@ __rb_reserve_next(struct ring_buffer_per
unsigned long tail, write, w, next;
u64 delta, before, after;
bool abs = false;
+ bool a_ok;
+ bool b_ok;

/* Don't let the compiler play games with cpu_buffer->tail_page */
tail_page = info->tail_page = READ_ONCE(cpu_buffer->tail_page);

/*A*/ w = local_read(&tail_page->write) & RB_WRITE_MASK;
barrier();
- before = local64_read(&cpu_buffer->before_stamp);
- after = local64_read(&cpu_buffer->write_stamp);
+ b_ok = rb_time_read(&cpu_buffer->before_stamp, &before);
+ a_ok = rb_time_read(&cpu_buffer->write_stamp, &after);
barrier();
info->ts = rb_time_stamp(cpu_buffer->buffer);

@@ -2934,7 +3026,7 @@ __rb_reserve_next(struct ring_buffer_per
* If interrupting an event time update, we may need an absolute timestamp.
* Don't bother if this is the start of a new page (w == 0).
*/
- if (unlikely(before != after && w))
+ if (unlikely(!a_ok || !b_ok || (before != after && w)))
info->add_timestamp = RB_ADD_STAMP_FORCE;
/*
* If the time delta since the last event is too big to
@@ -2947,7 +3039,7 @@ __rb_reserve_next(struct ring_buffer_per
next = READ_ONCE(cpu_buffer->next_write);
WRITE_ONCE(cpu_buffer->next_write, w + info->length);

- /*B*/ local64_set(&cpu_buffer->before_stamp, info->ts);
+ /*B*/ rb_time_set(&cpu_buffer->before_stamp, info->ts);

/*C*/ write = local_add_return(info->length, &tail_page->write);

@@ -2960,21 +3052,23 @@ __rb_reserve_next(struct ring_buffer_per
if (unlikely(write > BUF_PAGE_SIZE)) {
if (tail != w) {
/* before and after may now different, fix it up*/
- before = local64_read(&cpu_buffer->before_stamp);
- after = local64_read(&cpu_buffer->write_stamp);
- if (before != after)
- (void)local64_cmpxchg(&cpu_buffer->before_stamp, before, after);
+ b_ok = rb_time_read(&cpu_buffer->before_stamp, &before);
+ a_ok = rb_time_read(&cpu_buffer->write_stamp, &after);
+ if (a_ok && b_ok && before != after)
+ (void)rb_time_cmpxchg(&cpu_buffer->before_stamp, before, after);
}
return rb_move_tail(cpu_buffer, tail, info);
}

if (likely(tail == w)) {
u64 save_before;
+ bool s_ok;

/* Nothing interrupted us between A and C */
- /*D*/ local64_set(&cpu_buffer->write_stamp, info->ts);
+ /*D*/ rb_time_set(&cpu_buffer->write_stamp, info->ts);
barrier();
- /*E*/ save_before = local64_read(&cpu_buffer->before_stamp);
+ /*E*/ s_ok = rb_time_read(&cpu_buffer->before_stamp, &save_before);
+ RB_WARN_ON(cpu_buffer, !s_ok);
if (likely(info->add_timestamp != RB_ADD_STAMP_FORCE))
/* This did not interrupt any time update */
info->delta = info->ts - after;
@@ -2985,27 +3079,29 @@ __rb_reserve_next(struct ring_buffer_per
if (unlikely(info->ts != save_before)) {
/* SLOW PATH - Interrupted between C and E */

- after = local64_read(&cpu_buffer->write_stamp);
+ a_ok = rb_time_read(&cpu_buffer->write_stamp, &after);
+ RB_WARN_ON(cpu_buffer, !a_ok);
+
/* Write stamp must only go forward */
if (save_before > after) {
/*
* We do not care about the result, only that
* it gets updated atomically.
*/
- (void)local64_cmpxchg(&cpu_buffer->write_stamp, after, save_before);
+ (void)rb_time_cmpxchg(&cpu_buffer->write_stamp, after, save_before);
}
}
} else {
u64 ts;
/* SLOW PATH - Interrupted between A and C */
- after = local64_read(&cpu_buffer->write_stamp);
+ a_ok = rb_time_read(&cpu_buffer->write_stamp, &after);
ts = rb_time_stamp(cpu_buffer->buffer);
barrier();
/*E*/ if (write == (local_read(&tail_page->write) & RB_WRITE_MASK) &&
after < ts) {
/* Nothing came after this event between C and E */
info->delta = ts - after;
- (void)local64_cmpxchg(&cpu_buffer->write_stamp, after, info->ts);
+ (void)rb_time_cmpxchg(&cpu_buffer->write_stamp, after, info->ts);
info->ts = ts;
} else {
/*
@@ -4567,8 +4663,8 @@ rb_reset_cpu(struct ring_buffer_per_cpu
cpu_buffer->read = 0;
cpu_buffer->read_bytes = 0;

- local64_set(&cpu_buffer->write_stamp, 0);
- local64_set(&cpu_buffer->before_stamp, 0);
+ rb_time_set(&cpu_buffer->write_stamp, 0);
+ rb_time_set(&cpu_buffer->before_stamp, 0);
cpu_buffer->next_write = 0;

cpu_buffer->lost_events = 0;