Re: [PATCH 07/12] writeback: support > 1 flusher thread per bdi

From: Paul E. McKenney
Date: Thu Jun 04 2009 - 13:44:58 EST


On Tue, May 26, 2009 at 11:33:45AM +0200, Jens Axboe wrote:
> Build on the bdi_writeback support by allowing registration of
> more than 1 flusher thread. File systems can call bdi_add_flusher_task(bdi)
> to add more flusher threads to the device. If they do so, they must also
> provide a super_operations function to return the suitable bdi_writeback
> struct from any given inode.

Looks good from an RCU perspective. SRCU used for wb_list, RCU for the
other RCU-protected data. ;-)

Thanx, Paul

> Signed-off-by: Jens Axboe <jens.axboe@xxxxxxxxxx>
> ---
> fs/fs-writeback.c | 448 +++++++++++++++++++++++++++++++++++--------
> include/linux/backing-dev.h | 34 +++-
> include/linux/fs.h | 3 +
> mm/backing-dev.c | 233 ++++++++++++++++++-----
> mm/page-writeback.c | 4 +-
> 5 files changed, 586 insertions(+), 136 deletions(-)
>
> diff --git a/fs/fs-writeback.c b/fs/fs-writeback.c
> index e72db8b..8e0902e 100644
> --- a/fs/fs-writeback.c
> +++ b/fs/fs-writeback.c
> @@ -34,83 +34,247 @@
> */
> int nr_pdflush_threads;
>
> -/**
> - * writeback_acquire - attempt to get exclusive writeback access to a device
> - * @bdi: the device's backing_dev_info structure
> - *
> - * It is a waste of resources to have more than one pdflush thread blocked on
> - * a single request queue. Exclusion at the request_queue level is obtained
> - * via a flag in the request_queue's backing_dev_info.state.
> - *
> - * Non-request_queue-backed address_spaces will share default_backing_dev_info,
> - * unless they implement their own. Which is somewhat inefficient, as this
> - * may prevent concurrent writeback against multiple devices.
> +static void generic_sync_wb_inodes(struct bdi_writeback *wb,
> + struct super_block *sb,
> + struct writeback_control *wbc);
> +
> +/*
> + * Work items for the bdi_writeback threads
> */
> -static int writeback_acquire(struct bdi_writeback *wb)
> +struct bdi_work {
> + struct list_head list;
> + struct list_head wait_list;
> + struct rcu_head rcu_head;
> +
> + unsigned long seen;
> + atomic_t pending;
> +
> + unsigned long sb_data;
> + unsigned long nr_pages;
> + enum writeback_sync_modes sync_mode;
> +
> + unsigned long state;
> +};
> +
> +static struct super_block *bdi_work_sb(struct bdi_work *work)
> {
> - struct backing_dev_info *bdi = wb->bdi;
> + return (struct super_block *) (work->sb_data & ~1UL);
> +}
> +
> +static inline bool bdi_work_on_stack(struct bdi_work *work)
> +{
> + return work->sb_data & 1UL;
> +}
> +
> +static inline void bdi_work_init(struct bdi_work *work, struct super_block *sb,
> + unsigned long nr_pages,
> + enum writeback_sync_modes sync_mode)
> +{
> + INIT_RCU_HEAD(&work->rcu_head);
> + work->sb_data = (unsigned long) sb;
> + work->nr_pages = nr_pages;
> + work->sync_mode = sync_mode;
> + work->state = 1;
> +
> + /*
> + * state must not be reordered around the insert
> + */
> + smp_mb();
> +}
>
> - return !test_and_set_bit(wb->nr, &bdi->wb_active);
> +static inline void bdi_work_init_on_stack(struct bdi_work *work,
> + struct super_block *sb,
> + unsigned long nr_pages,
> + enum writeback_sync_modes sync_mode)
> +{
> + bdi_work_init(work, sb, nr_pages, sync_mode);
> + work->sb_data |= 1UL;
> }
>
> /**
> * writeback_in_progress - determine whether there is writeback in progress
> * @bdi: the device's backing_dev_info structure.
> *
> - * Determine whether there is writeback in progress against a backing device.
> + * Determine whether there is writeback waiting to be handled against a
> + * backing device.
> */
> int writeback_in_progress(struct backing_dev_info *bdi)
> {
> - return bdi->wb_active != 0;
> + return !list_empty(&bdi->work_list);
> }
>
> -/**
> - * writeback_release - relinquish exclusive writeback access against a device.
> - * @bdi: the device's backing_dev_info structure
> - */
> -static void writeback_release(struct bdi_writeback *wb)
> +static void bdi_work_clear(struct bdi_work *work)
> {
> - struct backing_dev_info *bdi = wb->bdi;
> + clear_bit(0, &work->state);
> + smp_mb__after_clear_bit();
> + wake_up_bit(&work->state, 0);
> +}
> +
> +static void bdi_work_free(struct rcu_head *head)
> +{
> + struct bdi_work *work = container_of(head, struct bdi_work, rcu_head);
>
> - wb->nr_pages = 0;
> - wb->sb = NULL;
> - clear_bit(wb->nr, &bdi->wb_active);
> + if (!bdi_work_on_stack(work))
> + kfree(work);
> + else
> + bdi_work_clear(work);
> }
>
> -static void wb_start_writeback(struct bdi_writeback *wb, struct super_block *sb,
> - long nr_pages,
> - enum writeback_sync_modes sync_mode)
> +static void wb_work_complete(struct bdi_work *work)
> {
> - if (!wb_has_dirty_io(wb))
> - return;
> + if (!bdi_work_on_stack(work)) {
> + bdi_work_clear(work);
> +
> + if (work->sync_mode == WB_SYNC_NONE)
> + call_rcu(&work->rcu_head, bdi_work_free);
> + } else
> + call_rcu(&work->rcu_head, bdi_work_free);
> +}
> +
> +static void wb_clear_pending(struct bdi_writeback *wb, struct bdi_work *work)
> +{
> + /*
> + * The caller has retrieved the work arguments from this work,
> + * drop our reference. If this is the last ref, delete and free it
> + */
> + if (atomic_dec_and_test(&work->pending)) {
> + struct backing_dev_info *bdi = wb->bdi;
>
> - if (writeback_acquire(wb)) {
> - wb->nr_pages = nr_pages;
> - wb->sb = sb;
> - wb->sync_mode = sync_mode;
> + spin_lock(&bdi->wb_lock);
> + list_del_rcu(&work->list);
> + spin_unlock(&bdi->wb_lock);
> +
> + wb_work_complete(work);
> + }
> +}
> +
> +static void wb_start_writeback(struct bdi_writeback *wb, struct bdi_work *work)
> +{
> + /*
> + * If we failed allocating the bdi work item, wake up the wb thread
> + * always. As a safety precaution, it'll flush out everything
> + */
> + if (!wb_has_dirty_io(wb) && work)
> + wb_clear_pending(wb, work);
> + else
> + wake_up(&wb->wait);
> +}
> +
> +static void bdi_queue_work(struct backing_dev_info *bdi, struct bdi_work *work)
> +{
> + if (work) {
> + work->seen = bdi->wb_mask;
> + atomic_set(&work->pending, bdi->wb_cnt);
>
> /*
> - * make above store seen before the task is woken
> + * Make sure stores are seen before it appears on the list
> */
> smp_mb();
> - wake_up(&wb->wait);
> +
> + spin_lock(&bdi->wb_lock);
> + list_add_tail_rcu(&work->list, &bdi->work_list);
> + spin_unlock(&bdi->wb_lock);
> }
> }
>
> -int bdi_start_writeback(struct backing_dev_info *bdi, struct super_block *sb,
> - long nr_pages, enum writeback_sync_modes sync_mode)
> +static void bdi_sched_work(struct backing_dev_info *bdi, struct bdi_work *work)
> +{
> + if (!bdi_wblist_needs_lock(bdi))
> + wb_start_writeback(&bdi->wb, work);
> + else {
> + struct bdi_writeback *wb;
> + int idx;
> +
> + idx = srcu_read_lock(&bdi->srcu);
> +
> + list_for_each_entry_rcu(wb, &bdi->wb_list, list)
> + wb_start_writeback(wb, work);
> +
> + srcu_read_unlock(&bdi->srcu, idx);
> + }
> +}
> +
> +static void __bdi_start_work(struct backing_dev_info *bdi,
> + struct bdi_work *work)
> +{
> + /*
> + * If the default thread isn't there, make sure we add it. When
> + * it gets created and wakes up, we'll run this work.
> + */
> + if (unlikely(list_empty_careful(&bdi->wb_list)))
> + bdi_add_default_flusher_task(bdi);
> + else
> + bdi_sched_work(bdi, work);
> +}
> +
> +static void bdi_start_work(struct backing_dev_info *bdi, struct bdi_work *work)
> {
> /*
> - * This only happens the first time someone kicks this bdi, so put
> - * it out-of-line.
> + * If the default thread isn't there, make sure we add it. When
> + * it gets created and wakes up, we'll run this work.
> */
> - if (unlikely(!bdi->wb.task)) {
> + if (unlikely(list_empty_careful(&bdi->wb_list))) {
> + mutex_lock(&bdi_lock);
> bdi_add_default_flusher_task(bdi);
> - return 1;
> + mutex_unlock(&bdi_lock);
> + } else
> + bdi_sched_work(bdi, work);
> +}
> +
> +/*
> + * Used for on-stack allocated work items. The caller needs to wait until
> + * the wb threads have acked the work before it's safe to continue.
> + */
> +static void bdi_wait_on_work_clear(struct bdi_work *work)
> +{
> + wait_on_bit(&work->state, 0, bdi_sched_wait, TASK_UNINTERRUPTIBLE);
> +}
> +
> +static struct bdi_work *bdi_alloc_work(struct super_block *sb, long nr_pages,
> + enum writeback_sync_modes sync_mode)
> +{
> + struct bdi_work *work;
> +
> + work = kmalloc(sizeof(*work), GFP_ATOMIC);
> + if (work)
> + bdi_work_init(work, sb, nr_pages, sync_mode);
> +
> + return work;
> +}
> +
> +void bdi_start_writeback(struct backing_dev_info *bdi, struct super_block *sb,
> + long nr_pages, enum writeback_sync_modes sync_mode)
> +{
> + const bool must_wait = sync_mode == WB_SYNC_ALL;
> + struct bdi_work work_stack, *work = NULL;
> +
> + if (!must_wait)
> + work = bdi_alloc_work(sb, nr_pages, sync_mode);
> +
> + if (!work) {
> + work = &work_stack;
> + bdi_work_init_on_stack(work, sb, nr_pages, sync_mode);
> }
>
> - wb_start_writeback(&bdi->wb, sb, nr_pages, sync_mode);
> - return 0;
> + bdi_queue_work(bdi, work);
> + bdi_start_work(bdi, work);
> +
> + /*
> + * If the sync mode is WB_SYNC_ALL, block waiting for the work to
> + * complete. If not, we only need to wait for the work to be started,
> + * if we allocated it on-stack. We use the same mechanism, if the
> + * wait bit is set in the bdi_work struct, then threads will not
> + * clear pending until after they are done.
> + *
> + * Note that work == &work_stack if must_wait is true, but that
> + * is implementation detail and we make it explicit here for
> + * ease of reading.
> + */
> + if (work == &work_stack || must_wait) {
> + bdi_wait_on_work_clear(work);
> + if (must_wait)
> + call_rcu(&work->rcu_head, bdi_work_free);
> + }
> }
>
> /*
> @@ -160,7 +324,7 @@ static void wb_kupdated(struct bdi_writeback *wb)
> wbc.more_io = 0;
> wbc.encountered_congestion = 0;
> wbc.nr_to_write = MAX_WRITEBACK_PAGES;
> - generic_sync_bdi_inodes(NULL, &wbc);
> + generic_sync_wb_inodes(wb, NULL, &wbc);
> if (wbc.nr_to_write > 0)
> break; /* All the old data is written */
> nr_to_write -= MAX_WRITEBACK_PAGES;
> @@ -177,22 +341,19 @@ static inline bool over_bground_thresh(void)
> global_page_state(NR_UNSTABLE_NFS) >= background_thresh);
> }
>
> -static void generic_sync_wb_inodes(struct bdi_writeback *wb,
> - struct super_block *sb,
> - struct writeback_control *wbc);
> -
> -static void wb_writeback(struct bdi_writeback *wb)
> +static void __wb_writeback(struct bdi_writeback *wb, long nr_pages,
> + struct super_block *sb,
> + enum writeback_sync_modes sync_mode)
> {
> struct writeback_control wbc = {
> .bdi = wb->bdi,
> - .sync_mode = wb->sync_mode,
> + .sync_mode = sync_mode,
> .older_than_this = NULL,
> .range_cyclic = 1,
> };
> - long nr_pages = wb->nr_pages;
>
> for (;;) {
> - if (wbc.sync_mode == WB_SYNC_NONE && nr_pages <= 0 &&
> + if (sync_mode == WB_SYNC_NONE && nr_pages <= 0 &&
> !over_bground_thresh())
> break;
>
> @@ -200,7 +361,7 @@ static void wb_writeback(struct bdi_writeback *wb)
> wbc.encountered_congestion = 0;
> wbc.nr_to_write = MAX_WRITEBACK_PAGES;
> wbc.pages_skipped = 0;
> - generic_sync_wb_inodes(wb, wb->sb, &wbc);
> + generic_sync_wb_inodes(wb, sb, &wbc);
> nr_pages -= MAX_WRITEBACK_PAGES - wbc.nr_to_write;
> /*
> * If we ran out of stuff to write, bail unless more_io got set
> @@ -214,68 +375,175 @@ static void wb_writeback(struct bdi_writeback *wb)
> }
>
> /*
> + * Return the next bdi_work struct that hasn't been processed by this
> + * wb thread yet
> + */
> +static struct bdi_work *get_next_work_item(struct backing_dev_info *bdi,
> + struct bdi_writeback *wb)
> +{
> + struct bdi_work *work, *ret = NULL;
> +
> + rcu_read_lock();
> +
> + list_for_each_entry_rcu(work, &bdi->work_list, list) {
> + if (!test_and_clear_bit(wb->nr, &work->seen))
> + continue;
> +
> + ret = work;
> + break;
> + }
> +
> + rcu_read_unlock();
> + return ret;
> +}
> +
> +/*
> + * Retrieve work items and do the writeback they describe
> + */
> +static void wb_writeback(struct bdi_writeback *wb)
> +{
> + struct backing_dev_info *bdi = wb->bdi;
> + struct bdi_work *work;
> +
> + while ((work = get_next_work_item(bdi, wb)) != NULL) {
> + struct super_block *sb = bdi_work_sb(work);
> + long nr_pages = work->nr_pages;
> + enum writeback_sync_modes sync_mode = work->sync_mode;
> +
> + /*
> + * If this isn't a data integrity operation, just notify
> + * that we have seen this work and we are now starting it.
> + */
> + if (sync_mode == WB_SYNC_NONE)
> + wb_clear_pending(wb, work);
> +
> + __wb_writeback(wb, nr_pages, sb, sync_mode);
> +
> + /*
> + * This is a data integrity writeback, so only do the
> + * notification when we have completed the work.
> + */
> + if (sync_mode == WB_SYNC_ALL)
> + wb_clear_pending(wb, work);
> + }
> +}
> +
> +/*
> + * This will be inlined in bdi_writeback_task() once we get rid of any
> + * dirty inodes on the default_backing_dev_info
> + */
> +static void wb_do_writeback(struct bdi_writeback *wb)
> +{
> + /*
> + * We get here in two cases:
> + *
> + * schedule_timeout() returned because the dirty writeback
> + * interval has elapsed. If that happens, the work item list
> + * will be empty and we will proceed to do kupdated style writeout.
> + *
> + * Someone called bdi_start_writeback(), which put one/more work
> + * items on the work_list. Process those.
> + */
> + if (list_empty(&wb->bdi->work_list))
> + wb_kupdated(wb);
> + else
> + wb_writeback(wb);
> +}
> +
> +/*
> * Handle writeback of dirty data for the device backed by this bdi. Also
> * wakes up periodically and does kupdated style flushing.
> */
> int bdi_writeback_task(struct bdi_writeback *wb)
> {
> + DEFINE_WAIT(wait);
> +
> while (!kthread_should_stop()) {
> unsigned long wait_jiffies;
> - DEFINE_WAIT(wait);
> +
> + wb_do_writeback(wb);
>
> prepare_to_wait(&wb->wait, &wait, TASK_INTERRUPTIBLE);
> wait_jiffies = msecs_to_jiffies(dirty_writeback_interval * 10);
> schedule_timeout(wait_jiffies);
> try_to_freeze();
> -
> - /*
> - * We get here in two cases:
> - *
> - * schedule_timeout() returned because the dirty writeback
> - * interval has elapsed. If that happens, we will be able
> - * to acquire the writeback lock and will proceed to do
> - * kupdated style writeout.
> - *
> - * Someone called bdi_start_writeback(), which will acquire
> - * the writeback lock. This means our writeback_acquire()
> - * below will fail and we call into bdi_pdflush() for
> - * pdflush style writeout.
> - *
> - */
> - if (writeback_acquire(wb))
> - wb_kupdated(wb);
> - else
> - wb_writeback(wb);
> -
> - writeback_release(wb);
> - finish_wait(&wb->wait, &wait);
> }
>
> + finish_wait(&wb->wait, &wait);
> return 0;
> }
>
> +/*
> + * Schedule writeback for all backing devices. Expensive! If this is a data
> + * integrity operation, writeback will be complete when this returns. If
> + * we are simply called for WB_SYNC_NONE, then writeback will merely be
> + * scheduled to run.
> + */
> void bdi_writeback_all(struct super_block *sb, long nr_pages,
> enum writeback_sync_modes sync_mode)
> {
> + const bool must_wait = sync_mode == WB_SYNC_ALL;
> struct backing_dev_info *bdi, *tmp;
> + struct bdi_work *work;
> + LIST_HEAD(list);
>
> mutex_lock(&bdi_lock);
>
> list_for_each_entry_safe(bdi, tmp, &bdi_list, bdi_list) {
> + struct bdi_work *work, work_stack;
> +
> if (!bdi_has_dirty_io(bdi))
> continue;
> - bdi_start_writeback(bdi, sb, nr_pages, sync_mode);
> +
> + work = bdi_alloc_work(sb, nr_pages, sync_mode);
> + if (!work) {
> + work = &work_stack;
> + bdi_work_init_on_stack(work, sb, nr_pages, sync_mode);
> + } else if (must_wait)
> + list_add_tail(&work->wait_list, &list);
> +
> + bdi_queue_work(bdi, work);
> + __bdi_start_work(bdi, work);
> +
> + /*
> + * Do the wait inline if this came from the stack. This
> + * only happens if we ran out of memory, so should very
> + * rarely trigger.
> + */
> + if (work == &work_stack) {
> + bdi_wait_on_work_clear(work);
> + if (must_wait)
> + call_rcu(&work->rcu_head, bdi_work_free);
> + }
> }
>
> mutex_unlock(&bdi_lock);
> +
> + /*
> + * If this is for WB_SYNC_ALL, wait for pending work to complete
> + * before returning.
> + */
> + while (!list_empty(&list)) {
> + work = list_entry(list.next, struct bdi_work, wait_list);
> + list_del(&work->wait_list);
> + bdi_wait_on_work_clear(work);
> + call_rcu(&work->rcu_head, bdi_work_free);
> + }
> }
>
> /*
> - * We have only a single wb per bdi, so just return that.
> + * If the filesystem didn't provide a way to map an inode to a dedicated
> + * flusher thread, it doesn't support more than 1 thread. So we know it's
> + * the default thread, return that.
> */
> static inline struct bdi_writeback *inode_get_wb(struct inode *inode)
> {
> - return &inode_to_bdi(inode)->wb;
> + const struct super_operations *sop = inode->i_sb->s_op;
> +
> + if (!sop->inode_get_wb)
> + return &inode_to_bdi(inode)->wb;
> +
> + return sop->inode_get_wb(inode);
> }
>
> /**
> @@ -729,8 +997,24 @@ void generic_sync_bdi_inodes(struct super_block *sb,
> struct writeback_control *wbc)
> {
> struct backing_dev_info *bdi = wbc->bdi;
> + struct bdi_writeback *wb;
>
> - generic_sync_wb_inodes(&bdi->wb, sb, wbc);
> + /*
> + * Common case is just a single wb thread and that is embedded in
> + * the bdi, so it doesn't need locking
> + */
> + if (!bdi_wblist_needs_lock(bdi))
> + generic_sync_wb_inodes(&bdi->wb, sb, wbc);
> + else {
> + int idx;
> +
> + idx = srcu_read_lock(&bdi->srcu);
> +
> + list_for_each_entry_rcu(wb, &bdi->wb_list, list)
> + generic_sync_wb_inodes(wb, sb, wbc);
> +
> + srcu_read_unlock(&bdi->srcu, idx);
> + }
> }
>
> /*
> @@ -757,7 +1041,7 @@ void generic_sync_sb_inodes(struct super_block *sb,
> struct writeback_control *wbc)
> {
> if (wbc->bdi)
> - generic_sync_bdi_inodes(sb, wbc);
> + bdi_start_writeback(wbc->bdi, sb, wbc->nr_to_write, wbc->sync_mode);
> else
> bdi_writeback_all(sb, wbc->nr_to_write, wbc->sync_mode);
>
> diff --git a/include/linux/backing-dev.h b/include/linux/backing-dev.h
> index 77dc62c..0559cf8 100644
> --- a/include/linux/backing-dev.h
> +++ b/include/linux/backing-dev.h
> @@ -13,6 +13,8 @@
> #include <linux/proportions.h>
> #include <linux/kernel.h>
> #include <linux/fs.h>
> +#include <linux/sched.h>
> +#include <linux/srcu.h>
> #include <linux/writeback.h>
> #include <asm/atomic.h>
>
> @@ -26,6 +28,7 @@ struct dentry;
> enum bdi_state {
> BDI_pending, /* On its way to being activated */
> BDI_wb_alloc, /* Default embedded wb allocated */
> + BDI_wblist_lock, /* bdi->wb_list now needs locking */
> BDI_async_congested, /* The async (write) queue is getting full */
> BDI_sync_congested, /* The sync queue is getting full */
> BDI_unused, /* Available bits start here */
> @@ -42,6 +45,8 @@ enum bdi_stat_item {
> #define BDI_STAT_BATCH (8*(1+ilog2(nr_cpu_ids)))
>
> struct bdi_writeback {
> + struct list_head list; /* hangs off the bdi */
> +
> struct backing_dev_info *bdi; /* our parent bdi */
> unsigned int nr;
>
> @@ -50,13 +55,12 @@ struct bdi_writeback {
> struct list_head b_dirty; /* dirty inodes */
> struct list_head b_io; /* parked for writeback */
> struct list_head b_more_io; /* parked for more writeback */
> -
> - unsigned long nr_pages;
> - struct super_block *sb;
> - enum writeback_sync_modes sync_mode;
> };
>
> +#define BDI_MAX_FLUSHERS 32
> +
> struct backing_dev_info {
> + struct srcu_struct srcu; /* for wb_list read side protection */
> struct list_head bdi_list;
> unsigned long ra_pages; /* max readahead in PAGE_CACHE_SIZE units */
> unsigned long state; /* Always use atomic bitops on this */
> @@ -75,8 +79,12 @@ struct backing_dev_info {
> unsigned int max_ratio, max_prop_frac;
>
> struct bdi_writeback wb; /* default writeback info for this bdi */
> - unsigned long wb_active; /* bitmap of active tasks */
> - unsigned long wb_mask; /* number of registered tasks */
> + spinlock_t wb_lock; /* protects update side of wb_list */
> + struct list_head wb_list; /* the flusher threads hanging off this bdi */
> + unsigned long wb_mask; /* bitmask of registered tasks */
> + unsigned int wb_cnt; /* number of registered tasks */
> +
> + struct list_head work_list;
>
> struct device *dev;
>
> @@ -93,17 +101,23 @@ int bdi_register(struct backing_dev_info *bdi, struct device *parent,
> const char *fmt, ...);
> int bdi_register_dev(struct backing_dev_info *bdi, dev_t dev);
> void bdi_unregister(struct backing_dev_info *bdi);
> -int bdi_start_writeback(struct backing_dev_info *bdi, struct super_block *sb,
> +void bdi_start_writeback(struct backing_dev_info *bdi, struct super_block *sb,
> long nr_pages, enum writeback_sync_modes sync_mode);
> int bdi_writeback_task(struct bdi_writeback *wb);
> void bdi_writeback_all(struct super_block *sb, long nr_pages,
> enum writeback_sync_modes sync_mode);
> void bdi_add_default_flusher_task(struct backing_dev_info *bdi);
> +void bdi_add_flusher_task(struct backing_dev_info *bdi);
> int bdi_has_dirty_io(struct backing_dev_info *bdi);
>
> extern struct mutex bdi_lock;
> extern struct list_head bdi_list;
>
> +static inline int bdi_wblist_needs_lock(struct backing_dev_info *bdi)
> +{
> + return test_bit(BDI_wblist_lock, &bdi->state);
> +}
> +
> static inline int wb_has_dirty_io(struct bdi_writeback *wb)
> {
> return !list_empty(&wb->b_dirty) ||
> @@ -316,4 +330,10 @@ static inline bool mapping_cap_swap_backed(struct address_space *mapping)
> return bdi_cap_swap_backed(mapping->backing_dev_info);
> }
>
> +static inline int bdi_sched_wait(void *word)
> +{
> + schedule();
> + return 0;
> +}
> +
> #endif /* _LINUX_BACKING_DEV_H */
> diff --git a/include/linux/fs.h b/include/linux/fs.h
> index ecdc544..d3bda5d 100644
> --- a/include/linux/fs.h
> +++ b/include/linux/fs.h
> @@ -1550,11 +1550,14 @@ extern ssize_t vfs_readv(struct file *, const struct iovec __user *,
> extern ssize_t vfs_writev(struct file *, const struct iovec __user *,
> unsigned long, loff_t *);
>
> +struct bdi_writeback;
> +
> struct super_operations {
> struct inode *(*alloc_inode)(struct super_block *sb);
> void (*destroy_inode)(struct inode *);
>
> void (*dirty_inode) (struct inode *);
> + struct bdi_writeback *(*inode_get_wb) (struct inode *);
> int (*write_inode) (struct inode *, int);
> void (*drop_inode) (struct inode *);
> void (*delete_inode) (struct inode *);
> diff --git a/mm/backing-dev.c b/mm/backing-dev.c
> index c8201f0..57e44e3 100644
> --- a/mm/backing-dev.c
> +++ b/mm/backing-dev.c
> @@ -199,53 +199,96 @@ static int __init default_bdi_init(void)
> }
> subsys_initcall(default_bdi_init);
>
> -static void bdi_wb_init(struct bdi_writeback *wb, struct backing_dev_info *bdi)
> +static int wb_assign_nr(struct backing_dev_info *bdi, struct bdi_writeback *wb)
> {
> - memset(wb, 0, sizeof(*wb));
> + unsigned long mask = BDI_MAX_FLUSHERS - 1;
> + unsigned int nr;
>
> - wb->bdi = bdi;
> - init_waitqueue_head(&wb->wait);
> - INIT_LIST_HEAD(&wb->b_dirty);
> - INIT_LIST_HEAD(&wb->b_io);
> - INIT_LIST_HEAD(&wb->b_more_io);
> -}
> + do {
> + if ((bdi->wb_mask & mask) == mask)
> + return 1;
> +
> + nr = find_first_zero_bit(&bdi->wb_mask, BDI_MAX_FLUSHERS);
> + } while (test_and_set_bit(nr, &bdi->wb_mask));
> +
> + wb->nr = nr;
> +
> + spin_lock(&bdi->wb_lock);
> + bdi->wb_cnt++;
> + spin_unlock(&bdi->wb_lock);
>
> -static int wb_assign_nr(struct backing_dev_info *bdi, struct bdi_writeback *wb)
> -{
> - set_bit(0, &bdi->wb_mask);
> - wb->nr = 0;
> return 0;
> }
>
> static void bdi_put_wb(struct backing_dev_info *bdi, struct bdi_writeback *wb)
> {
> clear_bit(wb->nr, &bdi->wb_mask);
> - clear_bit(BDI_wb_alloc, &bdi->state);
> +
> + if (wb == &bdi->wb)
> + clear_bit(BDI_wb_alloc, &bdi->state);
> + else
> + kfree(wb);
> +
> + spin_lock(&bdi->wb_lock);
> + bdi->wb_cnt--;
> + spin_unlock(&bdi->wb_lock);
> +}
> +
> +static int bdi_wb_init(struct bdi_writeback *wb, struct backing_dev_info *bdi)
> +{
> + memset(wb, 0, sizeof(*wb));
> +
> + wb->bdi = bdi;
> + init_waitqueue_head(&wb->wait);
> + INIT_LIST_HEAD(&wb->b_dirty);
> + INIT_LIST_HEAD(&wb->b_io);
> + INIT_LIST_HEAD(&wb->b_more_io);
> +
> + return wb_assign_nr(bdi, wb);
> }
>
> static struct bdi_writeback *bdi_new_wb(struct backing_dev_info *bdi)
> {
> struct bdi_writeback *wb;
>
> - set_bit(BDI_wb_alloc, &bdi->state);
> - wb = &bdi->wb;
> - wb_assign_nr(bdi, wb);
> + /*
> + * Default bdi->wb is already assigned, so just return it
> + */
> + if (!test_and_set_bit(BDI_wb_alloc, &bdi->state))
> + wb = &bdi->wb;
> + else {
> + wb = kmalloc(sizeof(struct bdi_writeback), GFP_KERNEL);
> + if (wb) {
> + if (bdi_wb_init(wb, bdi)) {
> + kfree(wb);
> + wb = NULL;
> + }
> + }
> + }
> +
> return wb;
> }
>
> -static int bdi_start_fn(void *ptr)
> +static void bdi_task_init(struct backing_dev_info *bdi,
> + struct bdi_writeback *wb)
> {
> - struct bdi_writeback *wb = ptr;
> - struct backing_dev_info *bdi = wb->bdi;
> struct task_struct *tsk = current;
> - int ret;
> + int was_empty;
>
> /*
> - * Add us to the active bdi_list
> + * Add us to the active bdi_list. If we are adding threads beyond
> + * the default embedded bdi_writeback, then we need to start using
> + * proper locking. Check the list for empty first, then set the
> + * BDI_wblist_lock flag if there's > 1 entry on the list now
> */
> - mutex_lock(&bdi_lock);
> - list_add(&bdi->bdi_list, &bdi_list);
> - mutex_unlock(&bdi_lock);
> + spin_lock(&bdi->wb_lock);
> +
> + was_empty = list_empty(&bdi->wb_list);
> + list_add_tail_rcu(&wb->list, &bdi->wb_list);
> + if (!was_empty)
> + set_bit(BDI_wblist_lock, &bdi->state);
> +
> + spin_unlock(&bdi->wb_lock);
>
> tsk->flags |= PF_FLUSHER | PF_SWAPWRITE;
> set_freezable();
> @@ -254,6 +297,22 @@ static int bdi_start_fn(void *ptr)
> * Our parent may run at a different priority, just set us to normal
> */
> set_user_nice(tsk, 0);
> +}
> +
> +static int bdi_start_fn(void *ptr)
> +{
> + struct bdi_writeback *wb = ptr;
> + struct backing_dev_info *bdi = wb->bdi;
> + int ret;
> +
> + /*
> + * Add us to the active bdi_list
> + */
> + mutex_lock(&bdi_lock);
> + list_add(&bdi->bdi_list, &bdi_list);
> + mutex_unlock(&bdi_lock);
> +
> + bdi_task_init(bdi, wb);
>
> /*
> * Clear pending bit and wakeup anybody waiting to tear us down
> @@ -264,13 +323,44 @@ static int bdi_start_fn(void *ptr)
>
> ret = bdi_writeback_task(wb);
>
> + /*
> + * Remove us from the list
> + */
> + spin_lock(&bdi->wb_lock);
> + list_del_rcu(&wb->list);
> + spin_unlock(&bdi->wb_lock);
> +
> + /*
> + * wait for rcu grace period to end, so we can free wb
> + */
> + synchronize_srcu(&bdi->srcu);
> +
> bdi_put_wb(bdi, wb);
> return ret;
> }
>
> int bdi_has_dirty_io(struct backing_dev_info *bdi)
> {
> - return wb_has_dirty_io(&bdi->wb);
> + struct bdi_writeback *wb;
> + int ret = 0;
> +
> + if (!bdi_wblist_needs_lock(bdi))
> + ret = wb_has_dirty_io(&bdi->wb);
> + else {
> + int idx;
> +
> + idx = srcu_read_lock(&bdi->srcu);
> +
> + list_for_each_entry_rcu(wb, &bdi->wb_list, list) {
> + ret = wb_has_dirty_io(wb);
> + if (ret)
> + break;
> + }
> +
> + srcu_read_unlock(&bdi->srcu, idx);
> + }
> +
> + return ret;
> }
>
> static void bdi_flush_io(struct backing_dev_info *bdi)
> @@ -291,6 +381,8 @@ static int bdi_forker_task(void *ptr)
> struct bdi_writeback *me = ptr;
> DEFINE_WAIT(wait);
>
> + bdi_task_init(me->bdi, me);
> +
> for (;;) {
> struct backing_dev_info *bdi, *tmp;
> struct bdi_writeback *wb;
> @@ -371,27 +463,70 @@ readd_flush:
> }
>
> /*
> - * Add a new flusher task that gets created for any bdi
> - * that has dirty data pending writeout
> + * bdi_lock held on entry
> */
> -void bdi_add_default_flusher_task(struct backing_dev_info *bdi)
> +static void bdi_add_one_flusher_task(struct backing_dev_info *bdi,
> + int(*func)(struct backing_dev_info *))
> {
> if (!bdi_cap_writeback_dirty(bdi))
> return;
>
> /*
> - * Someone already marked this pending for task creation
> + * Check with the helper whether to proceed adding a task. Will only
> + * abort if we two or more simultanous calls to
> + * bdi_add_default_flusher_task() occured, further additions will block
> + * waiting for previous additions to finish.
> */
> - if (test_and_set_bit(BDI_pending, &bdi->state))
> - return;
> + if (!func(bdi)) {
> + list_move_tail(&bdi->bdi_list, &bdi_pending_list);
>
> - mutex_lock(&bdi_lock);
> - list_move_tail(&bdi->bdi_list, &bdi_pending_list);
> + /*
> + * We are now on the pending list, wake up bdi_forker_task()
> + * to finish the job and add us back to the active bdi_list
> + */
> + wake_up(&default_backing_dev_info.wb.wait);
> + }
> +}
> +
> +static int flusher_add_helper_block(struct backing_dev_info *bdi)
> +{
> mutex_unlock(&bdi_lock);
> + wait_on_bit_lock(&bdi->state, BDI_pending, bdi_sched_wait,
> + TASK_UNINTERRUPTIBLE);
> + mutex_lock(&bdi_lock);
> + return 0;
> +}
>
> - wake_up(&default_backing_dev_info.wb.wait);
> +static int flusher_add_helper_test(struct backing_dev_info *bdi)
> +{
> + return test_and_set_bit(BDI_pending, &bdi->state);
> +}
> +
> +/*
> + * Add the default flusher task that gets created for any bdi
> + * that has dirty data pending writeout
> + */
> +void bdi_add_default_flusher_task(struct backing_dev_info *bdi)
> +{
> + bdi_add_one_flusher_task(bdi, flusher_add_helper_test);
> }
>
> +/**
> + * bdi_add_flusher_task - add one more flusher task to this @bdi
> + * @bdi: the bdi
> + *
> + * Add an additional flusher task to this @bdi. Will block waiting on
> + * previous additions, if any.
> + *
> + */
> +void bdi_add_flusher_task(struct backing_dev_info *bdi)
> +{
> + mutex_lock(&bdi_lock);
> + bdi_add_one_flusher_task(bdi, flusher_add_helper_block);
> + mutex_unlock(&bdi_lock);
> +}
> +EXPORT_SYMBOL(bdi_add_flusher_task);
> +
> int bdi_register(struct backing_dev_info *bdi, struct device *parent,
> const char *fmt, ...)
> {
> @@ -455,24 +590,21 @@ int bdi_register_dev(struct backing_dev_info *bdi, dev_t dev)
> }
> EXPORT_SYMBOL(bdi_register_dev);
>
> -static int sched_wait(void *word)
> -{
> - schedule();
> - return 0;
> -}
> -
> /*
> * Remove bdi from global list and shutdown any threads we have running
> */
> static void bdi_wb_shutdown(struct backing_dev_info *bdi)
> {
> + struct bdi_writeback *wb;
> +
> if (!bdi_cap_writeback_dirty(bdi))
> return;
>
> /*
> * If setup is pending, wait for that to complete first
> */
> - wait_on_bit(&bdi->state, BDI_pending, sched_wait, TASK_UNINTERRUPTIBLE);
> + wait_on_bit(&bdi->state, BDI_pending, bdi_sched_wait,
> + TASK_UNINTERRUPTIBLE);
>
> /*
> * Make sure nobody finds us on the bdi_list anymore
> @@ -482,9 +614,11 @@ static void bdi_wb_shutdown(struct backing_dev_info *bdi)
> mutex_unlock(&bdi_lock);
>
> /*
> - * Finally, kill the kernel thread
> + * Finally, kill the kernel threads. We don't need to be RCU
> + * safe anymore, since the bdi is gone from visibility.
> */
> - kthread_stop(bdi->wb.task);
> + list_for_each_entry(wb, &bdi->wb_list, list)
> + kthread_stop(wb->task);
> }
>
> void bdi_unregister(struct backing_dev_info *bdi)
> @@ -508,8 +642,12 @@ int bdi_init(struct backing_dev_info *bdi)
> bdi->min_ratio = 0;
> bdi->max_ratio = 100;
> bdi->max_prop_frac = PROP_FRAC_BASE;
> + spin_lock_init(&bdi->wb_lock);
> + bdi->wb_mask = 0;
> + bdi->wb_cnt = 0;
> INIT_LIST_HEAD(&bdi->bdi_list);
> - bdi->wb_mask = bdi->wb_active = 0;
> + INIT_LIST_HEAD(&bdi->wb_list);
> + INIT_LIST_HEAD(&bdi->work_list);
>
> bdi_wb_init(&bdi->wb, bdi);
>
> @@ -519,10 +657,15 @@ int bdi_init(struct backing_dev_info *bdi)
> goto err;
> }
>
> + err = init_srcu_struct(&bdi->srcu);
> + if (err)
> + goto err;
> +
> bdi->dirty_exceeded = 0;
> err = prop_local_init_percpu(&bdi->completions);
>
> if (err) {
> + cleanup_srcu_struct(&bdi->srcu);
> err:
> while (i--)
> percpu_counter_destroy(&bdi->bdi_stat[i]);
> @@ -540,6 +683,8 @@ void bdi_destroy(struct backing_dev_info *bdi)
>
> bdi_unregister(bdi);
>
> + cleanup_srcu_struct(&bdi->srcu);
> +
> for (i = 0; i < NR_BDI_STAT_ITEMS; i++)
> percpu_counter_destroy(&bdi->bdi_stat[i]);
>
> diff --git a/mm/page-writeback.c b/mm/page-writeback.c
> index 54a4a65..7dd7de7 100644
> --- a/mm/page-writeback.c
> +++ b/mm/page-writeback.c
> @@ -665,8 +665,7 @@ void throttle_vm_writeout(gfp_t gfp_mask)
>
> /*
> * Start writeback of `nr_pages' pages. If `nr_pages' is zero, write back
> - * the whole world. Returns 0 if a pdflush thread was dispatched. Returns
> - * -1 if all pdflush threads were busy.
> + * the whole world.
> */
> void wakeup_flusher_threads(long nr_pages)
> {
> @@ -674,7 +673,6 @@ void wakeup_flusher_threads(long nr_pages)
> nr_pages = global_page_state(NR_FILE_DIRTY) +
> global_page_state(NR_UNSTABLE_NFS);
> bdi_writeback_all(NULL, nr_pages, WB_SYNC_NONE);
> - return;
> }
>
> static void laptop_timer_fn(unsigned long unused);
> --
> 1.6.3.rc0.1.gf800
>
> --
> To unsubscribe from this list: send the line "unsubscribe linux-kernel" in
> the body of a message to majordomo@xxxxxxxxxxxxxxx
> More majordomo info at http://vger.kernel.org/majordomo-info.html
> Please read the FAQ at http://www.tux.org/lkml/
--
To unsubscribe from this list: send the line "unsubscribe linux-kernel" in
the body of a message to majordomo@xxxxxxxxxxxxxxx
More majordomo info at http://vger.kernel.org/majordomo-info.html
Please read the FAQ at http://www.tux.org/lkml/