Low latency I/O scheduling

From: Corrado Zoccolo
Date: Mon Jun 08 2009 - 13:40:29 EST


Hi,
sometime ago on the mailing list, the issue of too high latencies
introduced by I/O scheduling was raised.
I've done some study, and identified a way to improve on workloads
that have >1 random readers.

The problem:
CFQ partitions the requests into queues, so that requests coming from
different processes are in different queues.
Each queue will get its timeslice. If there are many processes, the
number of queues will increase linearly, and the I/O latency will as
well.
This works well if the processes are doing sequential I/O, in fact in
the timeslice, subsequent requests will complete much faster. But for
random I/O, we introduce too much latency, without getting back
bandwidth improvement.

The solution:
Divide the requests in two classes. Sequential requests are put in
per-process queues, while random requests are put in a shared data
structure (e.g. RB-tree), and we alternate between serving the
sequential queues and the random data structure.

A proof-of-concept implementation of the idea, based on AS (called
'fas' = Fair Anticipatory Scheduling in the following), is provided in
the attached file. I conducted some tests (attached the fio
configuration to replicate them), and I got the following
improvements:

2 parallel readers (both random) test:
* fas has lowest max and avg latencies, even lower than noop.

4 random readers/1 seq writer test:
* max latency is lowest for fas (218ms, compared with 617ms of CFQ).
CFQ has slightly lower avg latency, probably due to different
bandwidth allocation between readers and writer.

32 random readers/1 seq writer test:
* fas has lowest max and avg latency: 1.314s and 304ms resp., compared
with CFQ: 7.63s, 391ms resp. (while still providing higher bandwidth
both to the readers and the writer than CFQ).

When < 2 random readers are present, the heuristics doesn't matter,
but the tests show slight performance differences with CFQ, mainly due
to different time shares allocations. In particular, one relevant
difference is that time allocated to each process performing
sequential I/O is inversely proportional to the number of those
processes (but has a guaranteed minimum), and balance between
sequential and random I/O is done by varying the total time allowed
for sequential processes.


All test results data follows (all tables are sorted by bandiwdth):

Sequential read test:
BW (KiB/s), max latency (ms), avg latency (ms)
noop: 32457, 18, 0.124
deadline: 31746, 21, 0.127
as: 31014, 18, 0.13
fas: 30229, 18, 0.134
cfq: 30138, 18, 0.134

Sequential write test:
BW (KiB/s), max latency (ms), avg latency (ms)
as: 28082, 473, 0.132
noop: 27851, 283, 0.134
fas: 27417, 309, 0.136
deadline: 26971, 1924, 0.138
cfq: 26626, 414, 0.132

2 parallel readers (both sequential) test:
Aggr BW (KiB/s), max latency (ms), avg latency (ms)
fas: 31924 , 311, 0.255
cfq: 31916 , 150, 0.255
as: 31863 , 260, 0.255
noop: 20690, 64, 0.394
deadline: 20196, 64, 0.404

2 parallel readers (1 seq, 1 random):
Random BW (KiB/s), Seq BW (KiB/s), max latency (ms), avg latency (ms)
fas: 195, 16681, 186, 10
cfq: 191, 16129, 152, 10
as: 95, 24611, 269, 21
deadline: 75, 13581, 101, 27
noop: 72, 13240, 102, 28

2 parallel writers (both sequential) test:
Aggr BW (KiB/s), max latency (ms), avg latency (ms)
noop: 25979, 573, 0.286
fas: 25972, 1264, 0.266
as: 25131, 1025, 0.2745
deadline: 25086, 899, 0.276
cfq: 23469, 1066, 0.311

2 parallel readers (both random) test:
Aggr BW (KiB/s), max latency (ms), avg latency (ms)
as: 477, 173, 17
fas: 458, 39, 17
deadline: 456, 48, 17
cfq: 451, 136, 18
noop: 450, 44, 18

4 random readers/1 seq writer test:
READ: Aggr BW (KiB/s), max latency (ms), avg latency (ms)
cfq: 425, 617, 30
fas: 402, 218, 32
deadline: 389, 235, 33
as: 361, 340, 36
noop: 96, 400, 135
WRITE: BW (KiB/s), max latency (ms), avg latency (ms)
noop: 21920, 354, 0.037
as: 10014, 3493, 0.0814
deadline: 7665, 5383, 0.1064
fas: 5669, 10599, 0.144
cfq: 4009, 15076, 0.2038

32 random readers/1 seq writer test:
READ: Aggr BW (KiB/s), max latency (ms), avg latency (ms)
fas: 414, 1314, 304
deadline: 363, 1704, 345
cfq: 326, 7630, 391
as: 319, 1919, 395
noop: 66, 2132, 1526
WRITE: BW (KiB/s), max latency (ms), avg latency (ms)
noop: 19839, 784, 0.0053
as: 18302, 8534, 0.0059
fas: 4416, 23507, 0.025
cfq: 3792, 16797, 0.026
deadline: 2484, 5629, 0.042

--
__________________________________________________________________________

dott. Corrado Zoccolo mailto:czoccolo@xxxxxxxxx
PhD - Department of Computer Science - University of Pisa, Italy
--------------------------------------------------------------------------
/* -*- mode:C; tab-width:8; c-basic-offset:8 -*-
* Anticipatory & deadline i/o scheduler.
*
* Copyright (C) 2002 Jens Axboe <axboe@xxxxxxxxx>
* Nick Piggin <nickpiggin@xxxxxxxxxxxx>
* Copyright (C) 2009 Corrado Zoccolo <czoccolo@xxxxxxxxx>
*
*/
#include <linux/kernel.h>
#include <linux/fs.h>
#include <linux/blkdev.h>
#include <linux/elevator.h>
#include <linux/bio.h>
#include <linux/module.h>
#include <linux/slab.h>
#include <linux/init.h>
#include <linux/compiler.h>
#include <linux/rbtree.h>
#include <linux/interrupt.h>

/*
* See Documentation/block/as-iosched.txt
*/

#define debug_printk(X...) ((void)0)
/*
* max time we may wait to anticipate (default around 6ms)
*/
static const unsigned long default_antic_expire = (HZ / 150) ? HZ / 150 : 1;
/*
* Base of all time allocation computations.
* Should roughly be 1/3 of the maximum acceptable latency.
* Default 100ms
*/
static const unsigned long default_time_quantum = (HZ / 10) ? HZ / 10 : 1;
/*
* Number of times that other workloads can be scheduled before async
*/
static const unsigned long default_max_async_starved = 2;

/*
* Keep track of up to 20ms thinktimes. We can go as big as we like here,
* however huge values tend to interfere and not decay fast enough. A program
* might be in a non-io phase of operation. Waiting on user input for example,
* or doing a lengthy computation. A small penalty can be justified there, and
* will still catch out those processes that constantly have large thinktimes.
*/
#define MAX_THINKTIME (HZ/50UL)

/* Bits in as_io_context.state */
enum as_io_states {
AS_TASK_RUNNING=0, /* Process has not exited */
AS_TASK_IOSTARTED, /* Process has started some IO */
AS_TASK_IORUNNING, /* Process has completed some IO */
};

enum anticipation_status {
ANTIC_OFF=0, /* Not anticipating (normal operation) */
ANTIC_WAIT_REQ, /* The last read has not yet completed */
ANTIC_WAIT_NEXT, /* Currently anticipating a request vs
last read (which has completed) */
ANTIC_FINISHED, /* Anticipating but have found a candidate
* or timed out */
};

struct as_data {
/*
* run time data
*/

struct request_queue *q; /* the "owner" queue */

/*
* per_workload state
*/

struct async_wl {
struct rb_root sort_list;
struct list_head fifo_list;
unsigned count;
} async_workload;

struct random_sync_wl {
struct rb_root phases[2];
struct list_head fifo_list;
unsigned current_phase;
unsigned count;
} random_sync_workload[2];

struct seq_sync_wl {
struct list_head fifo_list;
struct list_head fast_path;
unsigned count;
unsigned long expire_current; /* jiffies + f(time_quantum) */
} seq_sync_workload;

struct request *next_rq; /* for async & random sync wl */
sector_t active_sector; /* for close_req computation */

/* phase data */
unsigned long workload_expires;
unsigned int current_workload : 2;
unsigned int rand_sync_ddir : 1;
unsigned int async_starved : 4;

/*as state*/
unsigned int ioc_finished : 1; /* IO associated with io_context is finished */
unsigned int reads_delayed : 8;

/* same as Anticipatory I/O scheduler */
enum anticipation_status antic_status;
unsigned long antic_start; /* jiffies: when it started */
struct timer_list antic_timer; /* anticipatory scheduling timer */
struct work_struct antic_work; /* Deferred unplugging */
struct io_context *io_context; /* Identify the expected process */
int nr_dispatched;

/*
* settings that change how the i/o scheduler behaves
*/
unsigned long antic_expire;
unsigned long time_quantum;
unsigned long max_async_starved;

/*
* statistics collection
*/
unsigned stats[2][5];
};

/*
* per-request data.
*/
enum arq_state {
AS_RQ_NEW=0, /* New - not referenced and not on any lists */
AS_RQ_QUEUED, /* In the request queue. It belongs to the
scheduler */
AS_RQ_DISPATCHED, /* On the dispatch list. It belongs to the
driver now */
AS_RQ_PRESCHED, /* Debug poisoning for requests being used */
AS_RQ_REMOVED,
AS_RQ_MERGED,
AS_RQ_POSTSCHED, /* when they shouldn't be */
};

enum arq_workload {
AS_RQ_ASYNC=0,
AS_RQ_RANDSYNC,
AS_RQ_SEQSYNC
};

static inline struct io_context * RQ_IOC(struct request *rq) {
return (struct io_context *)rq->elevator_private;
}
static inline enum arq_state RQ_STATE(struct request *rq) {
return ((unsigned)rq->elevator_private2)&7;
}

static inline enum arq_state RQ_WORKLOAD(struct request *rq) {
return ((unsigned)rq->elevator_private2)>>3;
}

static inline void RQ_SET_STATE_WL(struct request *rq, enum arq_state state, unsigned workload) {
rq->elevator_private2 = (void *)((state&7) | (workload<<3));
}

#define RQ_SET_STATE(rq, state) RQ_SET_STATE_WL(rq, state, RQ_WORKLOAD(rq))
#define RQ_SET_WL(rq, workload) RQ_SET_STATE_WL(rq, RQ_STATE(rq), workload)

static DEFINE_PER_CPU(unsigned long, ioc_count);
static struct completion *ioc_gone;
static DEFINE_SPINLOCK(ioc_gone_lock);

static void as_move_to_dispatch(struct as_data *ad, struct request *rq);
static void as_antic_stop(struct as_data *ad);

/*
* IO Context helper functions
* We share the same ioc pointer slot that is used by AS,
* so we have to fill the information consistently
*/

/* Called to deallocate the as_io_context */
static void free_as_io_context(struct as_io_context *aic)
{
kfree(aic);
elv_ioc_count_dec(ioc_count);
if (ioc_gone) {
/*
* AS scheduler is exiting, grab exit lock and check
* the pending io context count. If it hits zero,
* complete ioc_gone and set it back to NULL.
*/
spin_lock(&ioc_gone_lock);
if (ioc_gone && !elv_ioc_count_read(ioc_count)) {
complete(ioc_gone);
ioc_gone = NULL;
}
spin_unlock(&ioc_gone_lock);
}
}

static void as_trim(struct io_context *ioc)
{
spin_lock_irq(&ioc->lock);
if (ioc->aic)
free_as_io_context(ioc->aic);
ioc->aic = NULL;
spin_unlock_irq(&ioc->lock);
}

/* Called when the task exits */
static void exit_as_io_context(struct as_io_context *aic)
{
WARN_ON(!test_bit(AS_TASK_RUNNING, &aic->state));
clear_bit(AS_TASK_RUNNING, &aic->state);
}

static struct as_io_context *alloc_as_io_context(void)
{
struct as_io_context *ret;

ret = kmalloc(sizeof(*ret), GFP_ATOMIC);
if (ret) {
ret->dtor = free_as_io_context;
ret->exit = exit_as_io_context;
ret->state = 1 << AS_TASK_RUNNING;
atomic_set(&ret->nr_queued, 0);
atomic_set(&ret->nr_dispatched, 0);
spin_lock_init(&ret->lock);
ret->ttime_total = 0;
ret->ttime_samples = 0;
ret->ttime_mean = 0;
ret->seek_total = 0;
ret->seek_samples = 0;
ret->seek_mean = 0;
elv_ioc_count_inc(ioc_count);
}

return ret;
}

/*
* If the current task has no AS IO context then create one and initialise it.
* Then take a ref on the task's io context and return it.
*/
static struct io_context *as_get_io_context(int node)
{
struct io_context *ioc = get_io_context(GFP_ATOMIC, node);
if (ioc && !ioc->aic) {
ioc->aic = alloc_as_io_context();
if (!ioc->aic) {
put_io_context(ioc);
ioc = NULL;
}
}
return ioc;
}

static void as_put_io_context(struct request *rq)
{
struct as_io_context *aic;

if (unlikely(!RQ_IOC(rq)))
return;

aic = RQ_IOC(rq)->aic;

if (rq_is_sync(rq) && aic) {
unsigned long flags;

spin_lock_irqsave(&aic->lock, flags);
set_bit(AS_TASK_IORUNNING, &aic->state);
aic->last_end_request = jiffies;
spin_unlock_irqrestore(&aic->lock, flags);
}

put_io_context(RQ_IOC(rq));
}

/*
* Request to workload data structure mapping
*/
static inline unsigned *
as_count(struct as_data *ad, struct request *rq)
{
const int ddir = rq_data_dir(rq);
const int rq_wl = RQ_WORKLOAD(rq);
unsigned * counts[] = {
&ad->async_workload.count,
&ad->random_sync_workload[ddir].count,
&ad->random_sync_workload[ddir].count,
&ad->seq_sync_workload.count,
&ad->seq_sync_workload.count };
return counts[rq_wl];
}

static inline struct list_head *
as_fifo(struct as_data *ad, struct request *rq)
{
const int ddir = rq_data_dir(rq);
const int rq_wl = RQ_WORKLOAD(rq);
struct list_head *fifos[] = {
&ad->async_workload.fifo_list,
&ad->random_sync_workload[ddir].fifo_list,
&ad->random_sync_workload[ddir].fifo_list,
&ad->seq_sync_workload.fifo_list,
&ad->seq_sync_workload.fast_path };
return fifos[rq_wl];
}

static inline struct rb_root *
as_rb_root(struct as_data *ad, struct request *rq)
{
const int ddir = rq_data_dir(rq);
const int rq_wl = RQ_WORKLOAD(rq);
struct rb_root * roots[] = {
&ad->async_workload.sort_list,
&ad->random_sync_workload[ddir].phases[0],
&ad->random_sync_workload[ddir].phases[1],
NULL, NULL };
return roots[rq_wl];
}

/*
* rb tree support functions
*/

static inline struct request *
as_next_request(struct request *rq)
{
struct rb_node *node = rb_next(&rq->rb_node);

if (node)
return rb_entry_rq(node);

return NULL;
}

static void
as_add_rq_rb(struct as_data *ad, struct request *rq)
{
struct request *__alias;
struct rb_root *root = as_rb_root(ad, rq);
if (root)
while (unlikely(__alias = elv_rb_add(root, rq))) {
as_move_to_dispatch(ad, __alias);
as_antic_stop(ad);
}

++*as_count(ad, rq);
}

static void
as_del_rq_rb(struct as_data *ad, struct request *rq)
{
struct rb_root *root = as_rb_root(ad, rq);
if (ad->next_rq == rq)
ad->next_rq = root ? as_next_request(rq) : NULL;

if (root) elv_rb_del(root, rq);

--*as_count(ad, rq);
}

/*
* anticipatory scheduling functions follow
*/

/*
* as_antic_expired tells us when we have anticipated too long.
*/
static inline int as_antic_expired(struct as_data *ad)
{
return time_after(jiffies, ad->antic_start + ad->antic_expire);
}

/*
* as_antic_waitnext starts anticipating that a nice request will soon be
* submitted. See also as_antic_waitreq
*/
static void as_antic_waitnext(struct as_data *ad)
{
unsigned long timeout;

BUG_ON(ad->antic_status != ANTIC_OFF
&& ad->antic_status != ANTIC_WAIT_REQ);

timeout = ad->antic_start + ad->antic_expire;
mod_timer(&ad->antic_timer, timeout);
ad->antic_status = ANTIC_WAIT_NEXT;
}

/*
* as_antic_waitreq starts anticipating. We don't start timing the anticipation
* until the request that we're anticipating on has finished. This means we
* are timing from when the candidate process wakes up hopefully.
*/
static void as_antic_waitreq(struct as_data *ad)
{
BUG_ON(ad->antic_status == ANTIC_FINISHED);
if (ad->antic_status == ANTIC_OFF) {
if (!ad->io_context || ad->ioc_finished)
as_antic_waitnext(ad);
else
ad->antic_status = ANTIC_WAIT_REQ;
}
}

/*
* This is called directly by the functions in this file to stop anticipation.
* We kill the timer and schedule a call to the request_fn asap.
*/
static void as_antic_stop(struct as_data *ad)
{
int status = ad->antic_status;

if (status == ANTIC_WAIT_REQ || status == ANTIC_WAIT_NEXT) {
if (status == ANTIC_WAIT_NEXT)
del_timer(&ad->antic_timer);
ad->antic_status = ANTIC_FINISHED;
/* see as_work_handler */
kblockd_schedule_work(ad->q, &ad->antic_work);
}
}

/*
* as_antic_timeout is the timer function set by as_antic_waitnext.
*/
static void as_antic_timeout(unsigned long data)
{
struct request_queue *q = (struct request_queue *)data;
struct as_data *ad = q->elevator->elevator_data;
unsigned long flags;

spin_lock_irqsave(q->queue_lock, flags);
if (ad->antic_status == ANTIC_WAIT_REQ
|| ad->antic_status == ANTIC_WAIT_NEXT) {
struct as_io_context *aic;
spin_lock(&ad->io_context->lock);
aic = ad->io_context->aic;

ad->antic_status = ANTIC_FINISHED;
kblockd_schedule_work(q, &ad->antic_work);

spin_unlock(&ad->io_context->lock);
}
spin_unlock_irqrestore(q->queue_lock, flags);
}

static void as_update_thinktime(struct as_data *ad, struct as_io_context *aic,
unsigned long ttime)
{
/* fixed point: 1.0 == 1<<8 */
aic->ttime_samples = (7*aic->ttime_samples + 256) / 8;
aic->ttime_total = (7*aic->ttime_total + 256*ttime) / 8;
aic->ttime_mean = (aic->ttime_total + 128) / aic->ttime_samples;
}

static void as_update_seekdist(struct as_data *ad, struct as_io_context *aic,
sector_t sdist)
{
u64 total;

/*
* Don't allow the seek distance to get too large from the
* odd fragment, pagein, etc
*/
if (aic->seek_samples <= 60) /* second&third seek */
sdist = min(sdist, (aic->seek_mean * 4) + 2*1024*1024);
else
sdist = min(sdist, (aic->seek_mean * 4) + 2*1024*64);

aic->seek_samples = (7*aic->seek_samples + 256) / 8;
aic->seek_total = (7*aic->seek_total + (u64)256*sdist) / 8;
total = aic->seek_total + (aic->seek_samples/2);
do_div(total, aic->seek_samples);
aic->seek_mean = (sector_t)total;
}

/*
* as_update_iohist keeps a decaying histogram of IO thinktimes, and
* updates @aic->ttime_mean based on that. It is called when a new
* request is queued.
*/
static void as_update_iohist(struct as_data *ad, struct as_io_context *aic,
struct request *rq)
{
unsigned long thinktime = 0;
sector_t seek_dist;

if (aic == NULL)
return;

if (rq_is_sync(rq) == BLK_RW_SYNC) {
unsigned long in_flight = atomic_read(&aic->nr_queued)
+ atomic_read(&aic->nr_dispatched);
spin_lock(&aic->lock);
if (test_bit(AS_TASK_IORUNNING, &aic->state) ||
test_bit(AS_TASK_IOSTARTED, &aic->state)) {
/* Calculate read -> read thinktime */
if (test_bit(AS_TASK_IORUNNING, &aic->state)
&& in_flight == 0) {
thinktime = jiffies - aic->last_end_request;
thinktime = min(thinktime, MAX_THINKTIME-1);
}
as_update_thinktime(ad, aic, thinktime);

/* Calculate read -> read seek distance */
if (aic->last_request_pos < rq->sector)
seek_dist = rq->sector - aic->last_request_pos;
else
seek_dist = aic->last_request_pos - rq->sector;
as_update_seekdist(ad, aic, seek_dist);
}
aic->last_request_pos = rq->sector + rq->nr_sectors;
set_bit(AS_TASK_IOSTARTED, &aic->state);
spin_unlock(&aic->lock);
}
}

/*
* as_close_req decides if one request is considered "close" to the
* previous one issued.
*/
static int as_close_req(struct as_data *ad, struct as_io_context *aic,
struct request *rq)
{
unsigned long delay; /* jiffies */
sector_t last = ad->active_sector;
sector_t next = rq->sector;
sector_t delta; /* acceptable close offset (in sectors) */
sector_t s;

if (ad->antic_status == ANTIC_OFF || !ad->ioc_finished)
delay = 0;
else
delay = jiffies - ad->antic_start;

if (delay == 0)
delta = 8192;
else if (delay <= (20 * HZ / 1000) && delay <= ad->antic_expire)
delta = 8192 << delay;
else
return 1;

if ((last <= next + (delta>>1)) && (next <= last + delta))
return 1;

if (last < next)
s = next - last;
else
s = last - next;

if (aic->seek_samples == 0) {
return 1;
} else {
if (aic->seek_mean > s) {
/* this request is better than what we're expecting */
return 1;
}
}

return 0;
}

/*
* as_can_break_anticipation returns true if we have been anticipating this
* request, or anticipation expired (including switching to a different workload).
*
* It also returns true if the process against which we are anticipating
* submits a write - that's presumably an fsync, O_SYNC write, etc. We want to
* dispatch it ASAP, because we know that application will not be submitting
* any new reads.
*
* If the task which has submitted the request has exited, break anticipation.
*
* If this task has queued some other IO, do not enter enticipation.
*/
static int as_can_break_anticipation(struct as_data *ad, struct request *rq)
{
struct io_context *ioc;
struct as_io_context *aic;

ioc = ad->io_context;
BUG_ON(!ioc);
spin_lock(&ioc->lock);

if (rq && ioc == RQ_IOC(rq)) {
/* request from same process */
spin_unlock(&ioc->lock);
return 1;
}

if (rq && ad->current_workload == AS_RQ_RANDSYNC &&
(RQ_WORKLOAD(rq) == 1 || RQ_WORKLOAD(rq) == 2)) {
/* random sync request, matching current workload, we break anticipation */
spin_unlock(&ioc->lock);
return 1;
}

if (ad->ioc_finished && as_antic_expired(ad)) {
/*
* In this situation status should really be FINISHED,
* however the timer hasn't had the chance to run yet.
*/
spin_unlock(&ioc->lock);
return 1;
}

if (rq && ioc != RQ_IOC(rq) && ad->current_workload == AS_RQ_SEQSYNC &&
time_after(jiffies, ad->seq_sync_workload.expire_current)) {
/* request from different process, but former process exhausted its time quantum */
spin_unlock(&ioc->lock);
return 1;
}

aic = ioc->aic;
if (!aic) {
spin_unlock(&ioc->lock);
return 0;
}

if (atomic_read(&aic->nr_queued) > 0) {
/* process has more requests queued */
spin_unlock(&ioc->lock);
return 1;
}

if (atomic_read(&aic->nr_dispatched) > 0) {
/* process has more requests dispatched */
spin_unlock(&ioc->lock);
return 1;
}

if (rq && rq_is_sync(rq) && as_close_req(ad, aic, rq)) {
/*
* Found a close request that is not one of ours.
*
* This makes close requests from another process update
* our IO history. Is generally useful when there are
* two or more cooperating processes working in the same
* area.
*/

as_update_iohist(ad, aic, rq);
spin_unlock(&ioc->lock);
return 1;
}

if (!test_bit(AS_TASK_RUNNING, &aic->state)) {
/* process anticipated on has exited */
spin_unlock(&ioc->lock);
return 1;
}

if (aic->ttime_samples == 0) {
spin_unlock(&ioc->lock);
return 1;
} else if (aic->ttime_mean > ad->antic_expire) {
/* the process thinks too much between requests */
spin_unlock(&ioc->lock);
return 1;
}
spin_unlock(&ioc->lock);
return 0;
}

/*
* as_can_anticipate indicates whether we should either run rq
* or keep anticipating a better request.
*/
static int as_can_anticipate(struct as_data *ad, struct request *rq)
{
if (!ad->io_context)
/*
* Last request submitted was a write
*/
return 0;

if (ad->antic_status == ANTIC_FINISHED)
/*
* Don't restart if we have just finished. Run the next request
*/
return 0;

if (as_can_break_anticipation(ad, rq))
/*
* This request is a good candidate. Don't keep anticipating,
* run it.
*/
return 0;

/*
* OK from here, we haven't finished, and don't have a decent request!
* Status is either ANTIC_OFF so start waiting,
* ANTIC_WAIT_REQ so continue waiting for request to finish
* or ANTIC_WAIT_NEXT so continue waiting for an acceptable request.
*/

return 1;
}

/*
* as_update_rq must be called whenever a request (rq) is added to
* the sort_list. This function keeps caches up to date, and checks if the
* request might be one we are "anticipating"
*/
static void as_update_rq(struct as_data *ad, struct request *rq)
{
/*
* have we been anticipating this request?
* or does it come from the same process as the one we are anticipating
* for?
*/
if (ad->antic_status == ANTIC_WAIT_REQ
|| ad->antic_status == ANTIC_WAIT_NEXT) {
if (as_can_break_anticipation(ad, rq))
as_antic_stop(ad);
}
}

/*
* as_completed_request is to be called when a request has completed and
* returned something to the requesting process, be it an error or data.
*/
static void as_completed_request(struct request_queue *q, struct request *rq)
{
struct as_data *ad = q->elevator->elevator_data;

WARN_ON(!list_empty(&rq->queuelist));

if (RQ_STATE(rq) != AS_RQ_REMOVED) {
WARN(1, "rq->state %d\n", RQ_STATE(rq));
goto out;
}

WARN_ON(ad->nr_dispatched == 0);
ad->nr_dispatched--;

/*
* Start counting the batch from when a request of that direction is
* actually serviced. This should help devices with big TCQ windows
* and writeback caches
*/
if (ad->io_context == RQ_IOC(rq) && ad->io_context) {
ad->antic_start = jiffies;
ad->ioc_finished = 1;
if (ad->antic_status == ANTIC_WAIT_REQ) {
/*
* We were waiting on this request, now anticipate
* the next one
*/
as_antic_waitnext(ad);
}
}

as_put_io_context(rq);
out:
RQ_SET_STATE(rq, AS_RQ_POSTSCHED);
}

/*
* as_remove_queued_request removes a request from the pre dispatch queue
* without updating refcounts. It is expected the caller will drop the
* reference unless it replaces the request at somepart of the elevator
* (ie. the dispatch queue)
*/
static void as_remove_queued_request(struct request_queue *q,
struct request *rq)
{
struct as_data *ad = q->elevator->elevator_data;
struct io_context *ioc;

WARN_ON(RQ_STATE(rq) != AS_RQ_QUEUED);

ioc = RQ_IOC(rq);
if (ioc && ioc->aic) {
BUG_ON(!atomic_read(&ioc->aic->nr_queued));
atomic_dec(&ioc->aic->nr_queued);
}

rq_fifo_clear(rq);
as_del_rq_rb(ad, rq);
}

/*
* move an entry to dispatch queue
*/
static void as_move_to_dispatch(struct as_data *ad, struct request *rq)
{
as_antic_stop(ad);
ad->antic_status = ANTIC_OFF;
ad->active_sector = rq->sector + rq->nr_sectors;

if (RQ_WORKLOAD(rq) > 0) {
struct io_context *ioc = RQ_IOC(rq);

/* logic to determine if we are delaying too much read requests:
* in case, we penalize async writes.
*/
if (!time_before(jiffies, rq_fifo_time(rq) + 2*ad->time_quantum) && rq_data_dir(rq)==READ) {
ad->reads_delayed = max( (int)((jiffies - rq_fifo_time(rq)) / (2*ad->time_quantum)), (int)ad->reads_delayed);
debug_printk(KERN_INFO "late request (delay = %d, queues: [%d,%d,%d]): rq=%p ioc=%p sync=%d wl=%d\n",
(int)(jiffies - (rq_fifo_time(rq)+2*ad->time_quantum)),
ad->async_workload.count,
ad->random_sync_workload[0].count + ad->random_sync_workload[1].count,
ad->seq_sync_workload.count,
rq, ioc, rq_is_sync(rq), RQ_WORKLOAD(rq));
}

/* In case we have to anticipate after this */
if (ioc != ad->io_context) {
copy_io_context(&ad->io_context, &ioc);
if (RQ_WORKLOAD(rq) >= 3) {
/*
* This process' time slice among sequential processes
*/
ad->seq_sync_workload.expire_current = jiffies +
max(4*ad->antic_expire,
4 * ad->time_quantum / (1 + ad->seq_sync_workload.count/2));
}
}
} else {
put_io_context(ad->io_context);
ad->io_context = NULL;
}
ad->ioc_finished = 0;

ad->next_rq = (!RB_EMPTY_NODE(&rq->rb_node)) ?
as_next_request(rq) : NULL;

/*
* take it off the sort and fifo list, add to dispatch queue
*/
as_remove_queued_request(ad->q, rq);
WARN_ON(RQ_STATE(rq) != AS_RQ_QUEUED);

elv_dispatch_add_tail(ad->q, rq);

RQ_SET_STATE(rq, AS_RQ_DISPATCHED);
if (RQ_IOC(rq) && RQ_IOC(rq)->aic)
atomic_inc(&RQ_IOC(rq)->aic->nr_dispatched);
ad->nr_dispatched++;
}

typedef int dispatcher_fun(struct request_queue *, int);
typedef dispatcher_fun * dispatcher_fun_ptr;
static dispatcher_fun as_dispatch_async, as_dispatch_random_sync, as_dispatch_seq_sync;

/*
* as_dispatch_request selects the best request according to
* read/write expire, batch expire, etc, and moves it to the dispatch
* queue. Returns 1 if a request was found, 0 otherwise.
*/
static int as_dispatch_request(struct request_queue *q, int force)
{
struct as_data *ad = q->elevator->elevator_data;

static dispatcher_fun_ptr const dispatch_workloads[] = {
as_dispatch_async,
as_dispatch_random_sync,
as_dispatch_seq_sync
};

/* Current workload not expired, with more data already scheduled */
if (ad->next_rq && time_before(jiffies, ad->workload_expires))
return dispatch_workloads[ad->current_workload](q,force);

/* honor anticipation */
if (!force && (ad->antic_status == ANTIC_WAIT_REQ
|| ad->antic_status == ANTIC_WAIT_NEXT))
return 0;

{
int pending[] = {
ad->async_workload.count,
ad->random_sync_workload[0].count + ad->random_sync_workload[1].count,
ad->seq_sync_workload.count
};

int sync_pending = pending[AS_RQ_RANDSYNC] + pending[AS_RQ_SEQSYNC];
int workloads = !!pending[AS_RQ_RANDSYNC] + !!pending[AS_RQ_SEQSYNC] + !!pending[AS_RQ_ASYNC];
unsigned long adjusted_time_quantum = ad->time_quantum;

if (!workloads) {
ad->reads_delayed = 0;
return 0;
}

/* just finished workload still counts for time share computations */
workloads += !pending[ad->current_workload];

/* Current workload not expired, with more data */
if (time_before(jiffies, ad->workload_expires) &&
pending[ad->current_workload])
return dispatch_workloads[ad->current_workload](q,force);

/* Current workload is (seq or random) sync reads ... */
if (ad->current_workload != AS_RQ_ASYNC) {
/* ... and current sweep/fast_path is not yet finished, we give some more time for it */
if (ad->next_rq && rq_data_dir(ad->next_rq) == READ &&
time_before(jiffies, ad->workload_expires + ad->time_quantum/workloads))
return dispatch_workloads[ad->current_workload](q,force);

/* ... not expired, no more data, current process has nothing pending, can we anticipate? */
if (time_before(jiffies, ad->workload_expires) && !force &&
(ad->io_context && ad->io_context->aic && !atomic_read(&ad->io_context->aic->nr_queued)) &&
as_can_anticipate(ad, NULL)) {
debug_printk(KERN_INFO "as_dispatch_request is anticipating: [%d,%d,%d] cur=%d next=%p starv=%d\n",
pending[0], pending[1], pending[2],
ad->current_workload, ad->next_rq, ad->async_starved);
ad->reads_delayed = 0;
/*
* Anticipate other requests for current workload
*/
as_antic_waitreq(ad);
return 0;
}
}

debug_printk(KERN_INFO "as_dispatch_request [%d,%d,%d] cur=%d next=%p starv=%d\n",
pending[0], pending[1], pending[2],
ad->current_workload, ad->next_rq, ad->async_starved);

/* default expiration of workload, can be overridden below */
adjusted_time_quantum = 3 * ad->time_quantum / workloads;

if (ad->current_workload != AS_RQ_ASYNC && pending[AS_RQ_ASYNC] &&
(!sync_pending || ad->async_starved++ >= ad->max_async_starved * (1+ad->reads_delayed))) {
ad->next_rq = NULL;
ad->current_workload = AS_RQ_ASYNC;
ad->async_starved = 0;
adjusted_time_quantum /= 3;
adjusted_time_quantum >>= ad->reads_delayed;
} else if (ad->current_workload != AS_RQ_RANDSYNC && pending[AS_RQ_RANDSYNC]) {
ad->next_rq = NULL;
ad->current_workload = AS_RQ_RANDSYNC;
} else if (ad->current_workload != AS_RQ_SEQSYNC && pending[AS_RQ_SEQSYNC]) {
ad->next_rq = NULL;
ad->current_workload = AS_RQ_SEQSYNC;
adjusted_time_quantum /= (sync_pending/pending[AS_RQ_SEQSYNC]);
}

ad->workload_expires = jiffies + max(4*ad->antic_expire, adjusted_time_quantum);
}
return dispatch_workloads[ad->current_workload](q,force);
}

static int as_dispatch_async(struct request_queue *q, int force)
{
struct as_data *ad = q->elevator->elevator_data;
struct request *rq = ad->next_rq;

if (!rq && !list_empty(&ad->async_workload.fifo_list)) {
rq = rq_entry_fifo(ad->async_workload.fifo_list.next);
}

if (rq) {
as_move_to_dispatch(ad, rq);
return 1;
}
return 0;
}

static int as_dispatch_random_sync(struct request_queue *q, int force)
{
struct as_data *ad = q->elevator->elevator_data;
struct request *rq = ad->next_rq;

if (!rq) {
int cases =
(ad->random_sync_workload[0].count > 0) * 4 +
(ad->random_sync_workload[1].count > 0) * 2 +
!ad->rand_sync_ddir;
static const int case_to_ddir[]={-1, -1, 1, 1, 0, 0, 0, 1};
int ddir = case_to_ddir[cases];
unsigned * cur_phase = &ad->random_sync_workload[ddir].current_phase;
if (ddir >= 0) {
struct rb_node * fn = rb_first(&ad->random_sync_workload[ddir].phases[*cur_phase]);
if (!fn) {
*cur_phase = !*cur_phase;
fn = rb_first(&ad->random_sync_workload[ddir].phases[*cur_phase]);
}

BUG_ON(!fn);
rq = rb_entry_rq(fn);
ad->rand_sync_ddir = ddir;
}
debug_printk(KERN_INFO "as_dispatch_random_sync cases=%d ddir=%d rq=%p\n",
cases, ddir, rq);
}

if (rq) {
as_move_to_dispatch(ad, rq);
return 1;
}
return 0;
}

static int as_dispatch_seq_sync(struct request_queue *q, int force)
{
struct as_data *ad = q->elevator->elevator_data;
struct request *rq = ad->next_rq;
if (!rq && !list_empty(&ad->seq_sync_workload.fast_path)) {
rq = rq_entry_fifo(ad->seq_sync_workload.fast_path.next);
}
if (!rq && !list_empty(&ad->seq_sync_workload.fifo_list)) {
rq = rq_entry_fifo(ad->seq_sync_workload.fifo_list.next);

debug_printk(KERN_INFO "as_dispatch_seq_sync(cases=%d ddir=%d rq=%p)\n",
cases, ddir, rq);

/*
* Anticipate other sequential requests
*/
if (rq && !force && ad->io_context != RQ_IOC(rq) && as_can_anticipate(ad, rq)) {
debug_printk(KERN_INFO "as_dispatch_seq_sync anticipate, waiting4=%p\n",
ad->io_context);
as_antic_waitreq(ad);
return 0;
}

if (rq && RQ_IOC(rq)) {
struct io_context * ioc = RQ_IOC(rq);
struct list_head *entry, *next;
unsigned cnt = 0;
/*
* Move all requests from next process in the fifo to the fast path queue
*/
list_for_each_safe(entry, next, &ad->seq_sync_workload.fifo_list) {
struct request *pos = list_entry_rq(entry);
if (RQ_IOC(pos) == ioc) {
if (pos == rq) continue;
list_del(entry);
list_add_tail(&pos->queuelist,
&ad->seq_sync_workload.fast_path);
++cnt;
}
}
debug_printk(KERN_INFO "as_dispatch_seq_sync switched to %p, moved=%u\n",
ioc, cnt);
}
}

if (rq) {
as_move_to_dispatch(ad, rq);
if (!list_empty(&ad->seq_sync_workload.fast_path))
ad->next_rq = rq_entry_fifo(ad->seq_sync_workload.fast_path.next);
return 1;
}
return 0;
}

/*
* add rq to rbtree and fifo
* determines workload for each request
*/
static void as_add_request(struct request_queue *q, struct request *rq)
{
struct as_data *ad = q->elevator->elevator_data;
const int ddir = rq_data_dir(rq);
const bool sync = rq_is_sync(rq);
bool is_seq = false;
bool fast_path = false;
struct io_context * rq_ioc = as_get_io_context(q->node);

RQ_SET_STATE(rq, AS_RQ_NEW);
rq->elevator_private = rq_ioc;

if (rq_ioc) {
struct as_io_context *aic = rq_ioc->aic;

if (sync) {
/* does this belong to current process doing sequential I/O? */
is_seq = fast_path = ad->current_workload == AS_RQ_SEQSYNC
&& rq_ioc == ad->io_context &&
time_before(jiffies, ad->workload_expires);

if (!fast_path) {
sector_t delta = min(aic->last_request_pos-rq->sector,
(rq->sector-aic->last_request_pos)/2);

/* is this a good candidate to be followed by other sequential requests? */
is_seq = fast_path || ((delta < 1024) && (aic->ttime_mean <= ad->antic_expire) && (aic->seek_mean <= 1024));
}

as_update_iohist(ad, aic, rq);
}

atomic_inc(&aic->nr_queued);
}

RQ_SET_WL(rq, is_seq ? 3 + fast_path : sync ?
1 + !ad->random_sync_workload[ddir].current_phase
: 0);

ad->stats[ddir][RQ_WORKLOAD(rq)]++;

debug_printk(KERN_INFO "as_add_request(rq=%p ioc=%p) sync=%d ddir=%d seq=%d,%d wl=%d\n",
rq, rq_ioc, sync, ddir, is_seq, fast_path, RQ_WORKLOAD(rq));

as_add_rq_rb(ad, rq);

/*
* set insertion time and add to fifo list
*/
rq_set_fifo_time(rq, jiffies);
list_add_tail(&rq->queuelist, as_fifo(ad, rq));

if (fast_path && !ad->next_rq && !list_empty(&ad->seq_sync_workload.fast_path))
ad->next_rq = rq_entry_fifo(ad->seq_sync_workload.fast_path.next);

as_update_rq(ad, rq); /* keep state machine up to date */
RQ_SET_STATE(rq, AS_RQ_QUEUED);
}

static void as_activate_request(struct request_queue *q, struct request *rq)
{
WARN_ON(RQ_STATE(rq) != AS_RQ_DISPATCHED);
RQ_SET_STATE(rq, AS_RQ_REMOVED);
if (RQ_IOC(rq) && RQ_IOC(rq)->aic)
atomic_dec(&RQ_IOC(rq)->aic->nr_dispatched);
}

static void as_deactivate_request(struct request_queue *q, struct request *rq)
{
WARN_ON(RQ_STATE(rq) != AS_RQ_REMOVED);
RQ_SET_STATE(rq, AS_RQ_DISPATCHED);
if (RQ_IOC(rq) && RQ_IOC(rq)->aic)
atomic_inc(&RQ_IOC(rq)->aic->nr_dispatched);
}

/*
* as_queue_empty tells us if there are requests left in the device. It may
* not be the case that a driver can get the next request even if the queue
* is not empty - it is used in the block layer to check for plugging and
* merging opportunities
*/
static int as_queue_empty(struct request_queue *q)
{
struct as_data *ad = q->elevator->elevator_data;
unsigned i;

if (ad->async_workload.count || ad->seq_sync_workload.count)
return 0;

for (i=0;i<2; ++i) {
if (ad->random_sync_workload[i].count)
return 0;
}

return 1;
}

static void as_merged_requests(struct request_queue *q, struct request *req,
struct request *next)
{
/*
* if next expires before rq, assign its expire time to arq
* and move into next position (next will be deleted) in fifo
*/
if (!list_empty(&req->queuelist) && !list_empty(&next->queuelist)) {
if (time_before(rq_fifo_time(next), rq_fifo_time(req))) {
list_move(&req->queuelist, &next->queuelist);
rq_set_fifo_time(req, rq_fifo_time(next));
}
}

/*
* kill knowledge of next, this one is a goner
*/
as_remove_queued_request(q, next);
as_put_io_context(next);

RQ_SET_STATE(next, AS_RQ_MERGED);
}

/*
* This is executed in a "deferred" process context, by kblockd. It calls the
* driver's request_fn so the driver can submit that request.
*
* IMPORTANT! This guy will reenter the elevator, so set up all queue global
* state before calling, and don't rely on any state over calls.
*
* FIXME! dispatch queue is not a queue at all!
*/
static void as_work_handler(struct work_struct *work)
{
struct as_data *ad = container_of(work, struct as_data, antic_work);
struct request_queue *q = ad->q;
unsigned long flags;

spin_lock_irqsave(q->queue_lock, flags);
blk_start_queueing(q);
spin_unlock_irqrestore(q->queue_lock, flags);
}

static int as_may_queue(struct request_queue *q, int rw)
{
int ret = ELV_MQUEUE_MAY;
struct as_data *ad = q->elevator->elevator_data;
struct io_context *ioc;
if (ad->antic_status == ANTIC_WAIT_REQ ||
ad->antic_status == ANTIC_WAIT_NEXT) {
ioc = as_get_io_context(q->node);
if (ad->io_context == ioc)
ret = ELV_MQUEUE_MUST;
put_io_context(ioc);
}

return ret;
}

static void as_exit_queue(struct elevator_queue *e)
{
struct as_data *ad = e->elevator_data;

del_timer_sync(&ad->antic_timer);
cancel_work_sync(&ad->antic_work);

BUG_ON(!list_empty(&ad->async_workload.fifo_list));
BUG_ON(!list_empty(&ad->random_sync_workload[READ].fifo_list));
BUG_ON(!list_empty(&ad->random_sync_workload[WRITE].fifo_list));
BUG_ON(!list_empty(&ad->seq_sync_workload.fifo_list));
BUG_ON(!list_empty(&ad->seq_sync_workload.fast_path));

put_io_context(ad->io_context);
kfree(ad);
}

/*
* initialize elevator private data (as_data).
*/
static void *as_init_queue(struct request_queue *q)
{
struct as_data *ad;
unsigned i;

ad = kmalloc_node(sizeof(*ad), GFP_KERNEL | __GFP_ZERO, q->node);
if (!ad)
return NULL;

ad->q = q; /* Identify what queue the data belongs to */

/* anticipatory scheduling helpers */
ad->antic_timer.function = as_antic_timeout;
ad->antic_timer.data = (unsigned long)q;
init_timer(&ad->antic_timer);
INIT_WORK(&ad->antic_work, as_work_handler);

ad->async_workload.sort_list = RB_ROOT;
for (i=0;i<2; ++i) {
ad->random_sync_workload[i].phases[0] = RB_ROOT;
ad->random_sync_workload[i].phases[1] = RB_ROOT;
INIT_LIST_HEAD(&ad->random_sync_workload[i].fifo_list);

}
INIT_LIST_HEAD(&ad->async_workload.fifo_list);
INIT_LIST_HEAD(&ad->seq_sync_workload.fifo_list);
INIT_LIST_HEAD(&ad->seq_sync_workload.fast_path);

ad->antic_expire = default_antic_expire;
ad->time_quantum = default_time_quantum;
ad->max_async_starved = default_max_async_starved;

return ad;
}

/*
* sysfs parts below
*/

static ssize_t
as_var_show(unsigned int var, char *page)
{
return sprintf(page, "%d\n", var);
}

static ssize_t
as_var_store(unsigned long *var, const char *page, size_t count)
{
char *p = (char *) page;

*var = simple_strtoul(p, &p, 10);
return count;
}

static ssize_t stats_show(struct elevator_queue *e, char *page)
{
struct as_data *ad = e->elevator_data;
int pos = 0;
unsigned i,j;
for (i=0;i<2;++i) {
for (j=0;j<5;++j)
pos += sprintf(page+pos, "[%5u]", ad->stats[i][j]);
page[pos++]='\n';
}
page[pos]='\0';
return pos;
}

#define SHOW_FUNCTION(__FUNC, __VAR, __CONV) \
static ssize_t __FUNC(struct elevator_queue *e, char *page) \
{ \
struct as_data *ad = e->elevator_data; \
return as_var_show((__CONV)?jiffies_to_msecs((__VAR)):(__VAR), (page)); \
}
SHOW_FUNCTION(as_antic_expire_show, ad->antic_expire, 1);
SHOW_FUNCTION(as_time_quantum_show, ad->time_quantum, 1);
SHOW_FUNCTION(as_max_async_starved_show, ad->max_async_starved, 0);
#undef SHOW_FUNCTION

#define STORE_FUNCTION(__FUNC, __PTR, __CONV, MIN, MAX) \
static ssize_t __FUNC(struct elevator_queue *e, const char *page, size_t count) \
{ \
struct as_data *ad = e->elevator_data; \
int ret = as_var_store(__PTR, (page), count); \
if (__CONV) *(__PTR) = msecs_to_jiffies(*(__PTR)); \
if (*(__PTR) < (MIN)) \
*(__PTR) = (MIN); \
else if (*(__PTR) > (MAX)) \
*(__PTR) = (MAX); \
return ret; \
}
STORE_FUNCTION(as_antic_expire_store, &ad->antic_expire, 1, 0, max(1UL,ad->time_quantum/4));
STORE_FUNCTION(as_time_quantum_store, &ad->time_quantum, 1, 4*ad->antic_expire, INT_MAX);
STORE_FUNCTION(as_max_async_starved_store, &ad->max_async_starved, 0, 0, 16);
#undef STORE_FUNCTION

#define AS_ATTR(name) \
__ATTR(name, S_IRUGO|S_IWUSR, as_##name##_show, as_##name##_store)

static struct elv_fs_entry as_attrs[] = {
__ATTR_RO(stats),
AS_ATTR(antic_expire),
AS_ATTR(time_quantum),
AS_ATTR(max_async_starved),
__ATTR_NULL
};

static struct elevator_type iosched_as = {
.ops = {
.elevator_merge_req_fn = as_merged_requests,
.elevator_dispatch_fn = as_dispatch_request,
.elevator_add_req_fn = as_add_request,
.elevator_activate_req_fn = as_activate_request,
.elevator_deactivate_req_fn = as_deactivate_request,
.elevator_queue_empty_fn = as_queue_empty,
.elevator_completed_req_fn = as_completed_request,
.elevator_former_req_fn = elv_rb_former_request,
.elevator_latter_req_fn = elv_rb_latter_request,
.elevator_may_queue_fn = as_may_queue,
.elevator_init_fn = as_init_queue,
.elevator_exit_fn = as_exit_queue,
.trim = as_trim,
},

.elevator_attrs = as_attrs,
.elevator_name = "fas",
.elevator_owner = THIS_MODULE,
};

static int __init as_init(void)
{
elv_register(&iosched_as);

return 0;
}

static void __exit as_exit(void)
{
DECLARE_COMPLETION_ONSTACK(all_gone);
elv_unregister(&iosched_as);
ioc_gone = &all_gone;
/* ioc_gone's update must be visible before reading ioc_count */
smp_wmb();
if (elv_ioc_count_read(ioc_count))
wait_for_completion(&all_gone);
synchronize_rcu();
}

module_init(as_init);
module_exit(as_exit);

MODULE_AUTHOR("Corrado Zoccolo");
MODULE_LICENSE("GPL");
MODULE_DESCRIPTION("Fair Anticipatory IO scheduler");

Attachment: test2.fio
Description: Binary data