Re: [PATCH 1/7] General notification queue with user mmap()'able ring buffer

From: Jann Horn
Date: Tue May 28 2019 - 15:18:44 EST


On Tue, May 28, 2019 at 6:03 PM David Howells <dhowells@xxxxxxxxxx> wrote:
> Implement a misc device that implements a general notification queue as a
> ring buffer that can be mmap()'d from userspace.
[...]
> +receive notifications from the kernel. This is can be used in conjunction

typo: s/is can/can/

[...]
> +Overview
> +========
> +
> +This facility appears as a misc device file that is opened and then mapped and
> +polled. Each time it is opened, it creates a new buffer specific to the
> +returned file descriptor. Then, when the opening process sets watches, it
> +indicates that particular buffer it wants notifications from that watch to be
> +written into. Note that there are no read() and write() methods (except for

s/that particular buffer/the particular buffer/

> +debugging). The user is expected to access the ring directly and to use poll
> +to wait for new data.
[...]
> +/**
> + * __post_watch_notification - Post an event notification
> + * @wlist: The watch list to post the event to.
> + * @n: The notification record to post.
> + * @cred: The creds of the process that triggered the notification.
> + * @id: The ID to match on the watch.
> + *
> + * Post a notification of an event into a set of watch queues and let the users
> + * know.
> + *
> + * If @n is NULL then WATCH_INFO_LENGTH will be set on the next event posted.
> + *
> + * The size of the notification should be set in n->info & WATCH_INFO_LENGTH and
> + * should be in units of sizeof(*n).
> + */
> +void __post_watch_notification(struct watch_list *wlist,
> + struct watch_notification *n,
> + const struct cred *cred,
> + u64 id)
> +{
> + const struct watch_filter *wf;
> + struct watch_queue *wqueue;
> + struct watch *watch;
> +
> + rcu_read_lock();
> +
> + hlist_for_each_entry_rcu(watch, &wlist->watchers, list_node) {
> + if (watch->id != id)
> + continue;
> + n->info &= ~(WATCH_INFO_ID | WATCH_INFO_OVERRUN);
> + n->info |= watch->info_id;
> +
> + wqueue = rcu_dereference(watch->queue);
> + wf = rcu_dereference(wqueue->filter);
> + if (wf && !filter_watch_notification(wf, n))
> + continue;
> +
> + post_one_notification(wqueue, n, cred);
> + }
> +
> + rcu_read_unlock();
> +}
> +EXPORT_SYMBOL(__post_watch_notification);
[...]
> +static vm_fault_t watch_queue_fault(struct vm_fault *vmf)
> +{
> + struct watch_queue *wqueue = vmf->vma->vm_file->private_data;
> + struct page *page;
> +
> + page = wqueue->pages[vmf->pgoff];

I don't see you setting any special properties on the VMA that would
prevent userspace from extending its size via mremap() - no
VM_DONTEXPAND or VM_PFNMAP. So I think you might get an out-of-bounds
access here?

> + get_page(page);
> + if (!lock_page_or_retry(page, vmf->vma->vm_mm, vmf->flags)) {
> + put_page(page);
> + return VM_FAULT_RETRY;
> + }
> + vmf->page = page;
> + return VM_FAULT_LOCKED;
> +}
> +
> +static void watch_queue_map_pages(struct vm_fault *vmf,
> + pgoff_t start_pgoff, pgoff_t end_pgoff)
> +{
> + struct watch_queue *wqueue = vmf->vma->vm_file->private_data;
> + struct page *page;
> +
> + rcu_read_lock();
> +
> + do {
> + page = wqueue->pages[start_pgoff];

Same as above.

> + if (trylock_page(page)) {
> + vm_fault_t ret;
> + get_page(page);
> + ret = alloc_set_pte(vmf, NULL, page);
> + if (ret != 0)
> + put_page(page);
> +
> + unlock_page(page);
> + }
> + } while (++start_pgoff < end_pgoff);
> +
> + rcu_read_unlock();
> +}
[...]
> +static int watch_queue_mmap(struct file *file, struct vm_area_struct *vma)
> +{
> + struct watch_queue *wqueue = file->private_data;
> +
> + if (vma->vm_pgoff != 0 ||
> + vma->vm_end - vma->vm_start > wqueue->nr_pages * PAGE_SIZE ||
> + !(pgprot_val(vma->vm_page_prot) & pgprot_val(PAGE_SHARED)))
> + return -EINVAL;

This thing should probably have locking against concurrent
watch_queue_set_size()?

> + vma->vm_ops = &watch_queue_vm_ops;
> +
> + vma_interval_tree_insert(vma, &wqueue->mapping.i_mmap);
> + return 0;
> +}
> +
> +/*
> + * Allocate the required number of pages.
> + */
> +static long watch_queue_set_size(struct watch_queue *wqueue, unsigned long nr_pages)
> +{
> + struct watch_queue_buffer *buf;
> + u32 len;
> + int i;
> +
> + if (nr_pages == 0 ||
> + nr_pages > 16 || /* TODO: choose a better hard limit */
> + !is_power_of_2(nr_pages))
> + return -EINVAL;
> +
> + wqueue->pages = kcalloc(nr_pages, sizeof(struct page *), GFP_KERNEL);
> + if (!wqueue->pages)
> + goto err;
> +
> + for (i = 0; i < nr_pages; i++) {
> + wqueue->pages[i] = alloc_page(GFP_KERNEL | __GFP_ZERO);
> + if (!wqueue->pages[i])
> + goto err_some_pages;
> + wqueue->pages[i]->mapping = &wqueue->mapping;
> + SetPageUptodate(wqueue->pages[i]);
> + }
> +
> + buf = vmap(wqueue->pages, nr_pages, VM_MAP, PAGE_SHARED);
> + if (!buf)
> + goto err_some_pages;
> +
> + wqueue->buffer = buf;
> + wqueue->nr_pages = nr_pages;
> + wqueue->size = ((nr_pages * PAGE_SIZE) / sizeof(struct watch_notification));
> +
> + /* The first four slots in the buffer contain metadata about the ring,
> + * including the head and tail indices and mask.
> + */
> + len = sizeof(buf->meta) / sizeof(buf->slots[0]);
> + buf->meta.watch.info = len << WATCH_LENGTH_SHIFT;
> + buf->meta.watch.type = WATCH_TYPE_META;
> + buf->meta.watch.subtype = WATCH_META_SKIP_NOTIFICATION;
> + buf->meta.tail = len;
> + buf->meta.mask = wqueue->size - 1;
> + smp_store_release(&buf->meta.head, len);

Why is this an smp_store_release()? The entire buffer should not be visible to
userspace before this setup is complete, right?

> + return 0;
> +
> +err_some_pages:
> + for (i--; i >= 0; i--) {
> + ClearPageUptodate(wqueue->pages[i]);
> + wqueue->pages[i]->mapping = NULL;
> + put_page(wqueue->pages[i]);
> + }
> +
> + kfree(wqueue->pages);
> + wqueue->pages = NULL;
> +err:
> + return -ENOMEM;
> +}
[...]
> +
> +/*
> + * Set parameters.
> + */
> +static long watch_queue_ioctl(struct file *file, unsigned int cmd, unsigned long arg)
> +{
> + struct watch_queue *wqueue = file->private_data;
> + struct inode *inode = file_inode(file);
> + long ret;
> +
> + switch (cmd) {
> + case IOC_WATCH_QUEUE_SET_SIZE:
> + if (wqueue->buffer)
> + return -EBUSY;

The preceding check occurs without any locks held and therefore has no
reliable effect. It should probably be moved below the
inode_lock(...).

> + inode_lock(inode);
> + ret = watch_queue_set_size(wqueue, arg);
> + inode_unlock(inode);
> + return ret;
> +
> + case IOC_WATCH_QUEUE_SET_FILTER:
> + inode_lock(inode);
> + ret = watch_queue_set_filter(
> + inode, wqueue,
> + (struct watch_notification_filter __user *)arg);
> + inode_unlock(inode);
> + return ret;
> +
> + default:
> + return -EOPNOTSUPP;
> + }
> +}
[...]
> +static void free_watch(struct rcu_head *rcu)
> +{
> + struct watch *watch = container_of(rcu, struct watch, rcu);
> +
> + put_watch_queue(rcu_access_pointer(watch->queue));

This should be rcu_dereference_protected(..., 1).

> +/**
> + * remove_watch_from_object - Remove a watch or all watches from an object.
> + * @wlist: The watch list to remove from
> + * @wq: The watch queue of interest (ignored if @all is true)
> + * @id: The ID of the watch to remove (ignored if @all is true)
> + * @all: True to remove all objects
> + *
> + * Remove a specific watch or all watches from an object. A notification is
> + * sent to the watcher to tell them that this happened.
> + */
> +int remove_watch_from_object(struct watch_list *wlist, struct watch_queue *wq,
> + u64 id, bool all)
> +{
> + struct watch_notification n;
> + struct watch_queue *wqueue;
> + struct watch *watch;
> + int ret = -EBADSLT;
> +
> + rcu_read_lock();
> +
> +again:
> + spin_lock(&wlist->lock);
> + hlist_for_each_entry(watch, &wlist->watchers, list_node) {
> + if (all ||
> + (watch->id == id && rcu_access_pointer(watch->queue) == wq))
> + goto found;
> + }
> + spin_unlock(&wlist->lock);
> + goto out;
> +
> +found:
> + ret = 0;
> + hlist_del_init_rcu(&watch->list_node);
> + rcu_assign_pointer(watch->watch_list, NULL);
> + spin_unlock(&wlist->lock);
> +
> + n.type = WATCH_TYPE_META;
> + n.subtype = WATCH_META_REMOVAL_NOTIFICATION;
> + n.info = watch->info_id | sizeof(n);
> +
> + wqueue = rcu_dereference(watch->queue);
> + post_one_notification(wqueue, &n, wq ? wq->cred : NULL);
> +
> + /* We don't need the watch list lock for the next bit as RCU is
> + * protecting everything from being deallocated.

Does "everything" mean "the wqueue" or more than that?

> + */
> + if (wqueue) {
> + spin_lock_bh(&wqueue->lock);
> +
> + if (!hlist_unhashed(&watch->queue_node)) {
> + hlist_del_init_rcu(&watch->queue_node);
> + put_watch(watch);
> + }
> +
> + spin_unlock_bh(&wqueue->lock);
> + }
> +
> + if (wlist->release_watch) {
> + rcu_read_unlock();
> + wlist->release_watch(wlist, watch);
> + rcu_read_lock();
> + }
> + put_watch(watch);
> +
> + if (all && !hlist_empty(&wlist->watchers))
> + goto again;
> +out:
> + rcu_read_unlock();
> + return ret;
> +}
> +EXPORT_SYMBOL(remove_watch_from_object);
> +
> +/*
> + * Remove all the watches that are contributory to a queue. This will
> + * potentially race with removal of the watches by the destruction of the
> + * objects being watched or the distribution of notifications.
> + */
> +static void watch_queue_clear(struct watch_queue *wqueue)
> +{
> + struct watch_list *wlist;
> + struct watch *watch;
> + bool release;
> +
> + rcu_read_lock();
> + spin_lock_bh(&wqueue->lock);
> +
> + /* Prevent new additions and prevent notifications from happening */
> + wqueue->defunct = true;
> +
> + while (!hlist_empty(&wqueue->watches)) {
> + watch = hlist_entry(wqueue->watches.first, struct watch, queue_node);
> + hlist_del_init_rcu(&watch->queue_node);
> + spin_unlock_bh(&wqueue->lock);
> +
> + /* We can't do the next bit under the queue lock as we need to
> + * get the list lock - which would cause a deadlock if someone
> + * was removing from the opposite direction at the same time or
> + * posting a notification.
> + */
> + wlist = rcu_dereference(watch->watch_list);
> + if (wlist) {
> + spin_lock(&wlist->lock);
> +
> + release = !hlist_unhashed(&watch->list_node);
> + if (release) {
> + hlist_del_init_rcu(&watch->list_node);
> + rcu_assign_pointer(watch->watch_list, NULL);
> + }
> +
> + spin_unlock(&wlist->lock);
> +
> + if (release) {
> + if (wlist->release_watch) {
> + rcu_read_unlock();
> + /* This might need to call dput(), so
> + * we have to drop all the locks.
> + */
> + wlist->release_watch(wlist, watch);

How are you holding a reference to `wlist` here? You got the reference through
rcu_dereference(), you've dropped the RCU read lock, and I don't see anything
that stabilizes the reference.

> + rcu_read_lock();
> + }
> + put_watch(watch);
> + }
> + }
> +
> + put_watch(watch);
> + spin_lock_bh(&wqueue->lock);
> + }
> +
> + spin_unlock_bh(&wqueue->lock);
> + rcu_read_unlock();
> +}
> +
> +/*
> + * Release the file.
> + */
> +static int watch_queue_release(struct inode *inode, struct file *file)
> +{
> + struct watch_filter *wfilter;
> + struct watch_queue *wqueue = file->private_data;
> + int i, pgref;
> +
> + watch_queue_clear(wqueue);
> +
> + if (wqueue->pages && wqueue->pages[0])
> + WARN_ON(page_ref_count(wqueue->pages[0]) != 1);

Is there a reason why there couldn't still be references to the pages
from get_user_pages()/get_user_pages_fast()?

> + if (wqueue->buffer)
> + vfree(wqueue->buffer);
> + for (i = 0; i < wqueue->nr_pages; i++) {
> + ClearPageUptodate(wqueue->pages[i]);
> + wqueue->pages[i]->mapping = NULL;
> + pgref = page_ref_count(wqueue->pages[i]);
> + WARN(pgref != 1,
> + "FREE PAGE[%d] refcount %d\n", i, page_ref_count(wqueue->pages[i]));
> + __free_page(wqueue->pages[i]);
> + }
> +
> + wfilter = rcu_access_pointer(wqueue->filter);

Again, rcu_dereference_protected(..., 1).

> + if (wfilter)
> + kfree_rcu(wfilter, rcu);
> + kfree(wqueue->pages);
> + put_cred(wqueue->cred);
> + put_watch_queue(wqueue);
> + return 0;
> +}
> +
> +#ifdef DEBUG_WITH_WRITE
> +static ssize_t watch_queue_write(struct file *file,
> + const char __user *_buf, size_t len, loff_t *pos)
> +{
> + struct watch_notification *n;
> + struct watch_queue *wqueue = file->private_data;
> + ssize_t ret;
> +
> + if (!wqueue->buffer)
> + return -ENOBUFS;
> +
> + if (len & ~WATCH_INFO_LENGTH || len == 0 || !_buf)
> + return -EINVAL;
> +
> + n = memdup_user(_buf, len);
> + if (IS_ERR(n))
> + return PTR_ERR(n);
> +
> + ret = -EINVAL;
> + if ((n->info & WATCH_INFO_LENGTH) != len)
> + goto error;
> + n->info &= (WATCH_INFO_LENGTH | WATCH_INFO_TYPE_FLAGS | WATCH_INFO_ID);

Should the non-atomic modification of n->info (and perhaps also the
following uses of ->debug) be protected by some lock?

> + if (post_one_notification(wqueue, n, file->f_cred))
> + wqueue->debug = 0;
> + else
> + wqueue->debug++;
> + ret = len;
> + if (wqueue->debug > 20)
> + ret = -EIO;
> +
> +error:
> + kfree(n);
> + return ret;
> +}
> +#endif
[...]
> +#define IOC_WATCH_QUEUE_SET_SIZE _IO('s', 0x01) /* Set the size in pages */
> +#define IOC_WATCH_QUEUE_SET_FILTER _IO('s', 0x02) /* Set the filter */

Should these ioctl numbers be registered in
Documentation/ioctl/ioctl-number.txt?