From: Suparna Bhattacharya Core retry infrastructure for AIO. Allows an AIO request to be executed as a series of non-blocking iterations, where each iteration retries the remaining part of the request from where the last iteration left off, by reissuing the corresponding AIO fop routine with modified arguments representing the remaining I/O. The retries are "kicked" via the AIO waitqueue callback aio_wake_function() which replaces the default wait queue entry used for blocking waits. The high level retry infrastructure is responsible for running the iterations in the mm context (address space) of the caller, and ensures that only one retry instance is active at a given time, thus relieving the fops themselves from having to deal with potential races of that sort. Follow on fixes: DESC Fix aio process hang on EINVAL EDESC From: Daniel McNeil Here is a patch to fix EINVAL handling in io_submit_one() that was causing a process hang when attempting AIO to a device not able to handle aio. I hit this doing a AIO read from /dev/zero. The process would hang on exit in wait_for_all_aios(). The fix is to check for EINVAL coming back from aio_setup_iocb() in addition to the EFAULT and EBADF already there. This causes the io_submit to fail with EINVAL. That check looks error prone. Are there other error return values where it should jump to the aio_put_req()? Should the check be: if (ret != 0 && ret != -EIOCBQUEUED) goto out_put_req; DESC AIO: flush workqueues before destroying ioctx'es EDESC From: Suparna Bhattacharya Flush out the workqueue before destroying the ioctx which may be sitting on it. DESC AIO: hold the context lock across unuse_mm EDESC Hold the context lock across unuse_mm DESC task task_lock in use_mm() EDESC From: Suparna Bhattacharya Another patch based on a suggestion from Ben. use_mm wasn't acquiring the task_lock - its possible this might be causing a race with procps. DESC Allow fops to override the retry method with their own EDESC From: Suparna Bhattacharya David Brownell has a patch to implement AIO support for gadgetfs which specified its own ->retry() method to perform the copy_to_user from the AIO workqueue after I/O completion. To enable that to work correctly, with our retry code, we should drop the lines that nullify and reinstate the ->ki_retry() field in aio_run_iocb. fs/aio.c | 576 ++++++++++++++++++++++++++++++++++++++-------- include/linux/aio.h | 25 + include/linux/errno.h | 1 include/linux/init_task.h | 1 include/linux/sched.h | 8 include/linux/wait.h | 9 kernel/fork.c | 15 + 7 files changed, 543 insertions(+), 92 deletions(-) Index: linux.d/fs/aio.c =================================================================== --- linux.d.orig/fs/aio.c 2004-08-18 09:38:46.000000000 -0400 +++ linux.d/fs/aio.c 2004-08-18 11:22:27.387557544 -0400 @@ -39,6 +39,9 @@ #define dprintk(x...) do { ; } while (0) #endif +long aio_run = 0; /* for testing only */ +long aio_wakeups = 0; /* for testing only */ + /*------ sysctl variables----*/ atomic_t aio_nr = ATOMIC_INIT(0); /* current system wide number of aio requests */ unsigned aio_max_nr = 0x10000; /* system wide maximum number of aio requests */ @@ -277,6 +280,7 @@ static void aio_cancel_all(struct kioctx struct kiocb *iocb = list_kiocb(pos); list_del_init(&iocb->ki_list); cancel = iocb->ki_cancel; + kiocbSetCancelled(iocb); if (cancel) { iocb->ki_users++; spin_unlock_irq(&ctx->ctx_lock); @@ -337,6 +341,11 @@ void fastcall exit_aio(struct mm_struct aio_cancel_all(ctx); wait_for_all_aios(ctx); + /* + * this is an overkill, but ensures we don't leave + * the ctx on the aio_wq + */ + flush_workqueue(aio_wq); if (1 != atomic_read(&ctx->users)) printk(KERN_DEBUG @@ -398,6 +407,7 @@ static struct kiocb fastcall *__aio_get_ req->ki_obj.user = NULL; req->ki_dtor = NULL; req->private = NULL; + INIT_LIST_HEAD(&req->ki_run_list); /* Check if the completion queue has enough free space to * accept an event from this io. @@ -543,85 +553,313 @@ struct kioctx *lookup_ioctx(unsigned lon return ioctx; } +/* + * use_mm + * Makes the calling kernel thread take on the specified + * mm context. + * Called by the retry thread execute retries within the + * iocb issuer's mm context, so that copy_from/to_user + * operations work seamlessly for aio. + * (Note: this routine is intended to be called only + * from a kernel thread context) + */ static void use_mm(struct mm_struct *mm) { struct mm_struct *active_mm; + struct task_struct *tsk = current; + task_lock(tsk); + active_mm = tsk->active_mm; atomic_inc(&mm->mm_count); - task_lock(current); - active_mm = current->active_mm; - current->mm = mm; - if (mm != active_mm) { - current->active_mm = mm; - activate_mm(active_mm, mm); - } - task_unlock(current); + tsk->mm = mm; + tsk->active_mm = mm; + activate_mm(active_mm, mm); + task_unlock(tsk); + mmdrop(active_mm); } -static void unuse_mm(struct mm_struct *mm) +/* + * unuse_mm + * Reverses the effect of use_mm, i.e. releases the + * specified mm context which was earlier taken on + * by the calling kernel thread + * (Note: this routine is intended to be called only + * from a kernel thread context) + * + * Comments: Called with ctx->ctx_lock held. This nests + * task_lock instead ctx_lock. + */ +void unuse_mm(struct mm_struct *mm) { - task_lock(current); - current->mm = NULL; - task_unlock(current); + struct task_struct *tsk = current; + + task_lock(tsk); + tsk->mm = NULL; /* active_mm is still 'mm' */ - enter_lazy_tlb(mm, current); + enter_lazy_tlb(mm, tsk); + task_unlock(tsk); } -/* Run on kevent's context. FIXME: needs to be per-cpu and warn if an - * operation blocks. +/* + * Queue up a kiocb to be retried. Assumes that the kiocb + * has already been marked as kicked, and places it on + * the retry run list for the corresponding ioctx, if it + * isn't already queued. Returns 1 if it actually queued + * the kiocb (to tell the caller to activate the work + * queue to process it), or 0, if it found that it was + * already queued. + * + * Should be called with the spin lock iocb->ki_ctx->ctx_lock + * held */ -static void aio_kick_handler(void *data) +static inline int __queue_kicked_iocb(struct kiocb *iocb) { - struct kioctx *ctx = data; + struct kioctx *ctx = iocb->ki_ctx; - use_mm(ctx->mm); + if (list_empty(&iocb->ki_run_list)) { + list_add_tail(&iocb->ki_run_list, + &ctx->run_list); + iocb->ki_queued++; + return 1; + } + return 0; +} + +/* aio_run_iocb + * This is the core aio execution routine. It is + * invoked both for initial i/o submission and + * subsequent retries via the aio_kick_handler. + * Expects to be invoked with iocb->ki_ctx->lock + * already held. The lock is released and reaquired + * as needed during processing. + * + * Calls the iocb retry method (already setup for the + * iocb on initial submission) for operation specific + * handling, but takes care of most of common retry + * execution details for a given iocb. The retry method + * needs to be non-blocking as far as possible, to avoid + * holding up other iocbs waiting to be serviced by the + * retry kernel thread. + * + * The trickier parts in this code have to do with + * ensuring that only one retry instance is in progress + * for a given iocb at any time. Providing that guarantee + * simplifies the coding of individual aio operations as + * it avoids various potential races. + */ +static ssize_t aio_run_iocb(struct kiocb *iocb) +{ + struct kioctx *ctx = iocb->ki_ctx; + ssize_t (*retry)(struct kiocb *); + ssize_t ret; + + if (iocb->ki_retried++ > 1024*1024) { + printk("Maximal retry count. Bytes done %Zd\n", + iocb->ki_nbytes - iocb->ki_left); + return -EAGAIN; + } + + if (!(iocb->ki_retried & 0xff)) { + pr_debug("%ld retry: %d of %d (kick %ld, Q %ld run %ld, wake %ld)\n", + iocb->ki_retried, + iocb->ki_nbytes - iocb->ki_left, iocb->ki_nbytes, + iocb->ki_kicked, iocb->ki_queued, aio_run, aio_wakeups); + } + if (!(retry = iocb->ki_retry)) { + printk("aio_run_iocb: iocb->ki_retry = NULL\n"); + return 0; + } + + /* + * We don't want the next retry iteration for this + * operation to start until this one has returned and + * updated the iocb state. However, wait_queue functions + * can trigger a kick_iocb from interrupt context in the + * meantime, indicating that data is available for the next + * iteration. We want to remember that and enable the + * next retry iteration _after_ we are through with + * this one. + * + * So, in order to be able to register a "kick", but + * prevent it from being queued now, we clear the kick + * flag, but make the kick code *think* that the iocb is + * still on the run list until we are actually done. + * When we are done with this iteration, we check if + * the iocb was kicked in the meantime and if so, queue + * it up afresh. + */ + + kiocbClearKicked(iocb); + + /* + * This is so that aio_complete knows it doesn't need to + * pull the iocb off the run list (We can't just call + * INIT_LIST_HEAD because we don't want a kick_iocb to + * queue this on the run list yet) + */ + iocb->ki_run_list.next = iocb->ki_run_list.prev = NULL; + spin_unlock_irq(&ctx->ctx_lock); + + /* Quit retrying if the i/o has been cancelled */ + if (kiocbIsCancelled(iocb)) { + ret = -EINTR; + aio_complete(iocb, ret, 0); + /* must not access the iocb after this */ + goto out; + } + + /* + * Now we are all set to call the retry method in async + * context. By setting this thread's io_wait context + * to point to the wait queue entry inside the currently + * running iocb for the duration of the retry, we ensure + * that async notification wakeups are queued by the + * operation instead of blocking waits, and when notified, + * cause the iocb to be kicked for continuation (through + * the aio_wake_function callback). + */ + BUG_ON(current->io_wait != NULL); + current->io_wait = &iocb->ki_wait; + ret = retry(iocb); + current->io_wait = NULL; + + if (-EIOCBRETRY != ret) { + if (-EIOCBQUEUED != ret) { + BUG_ON(!list_empty(&iocb->ki_wait.task_list)); + aio_complete(iocb, ret, 0); + /* must not access the iocb after this */ + } + } else { + /* + * Issue an additional retry to avoid waiting forever if + * no waits were queued (e.g. in case of a short read). + */ + if (list_empty(&iocb->ki_wait.task_list)) + kiocbSetKicked(iocb); + } +out: spin_lock_irq(&ctx->ctx_lock); - while (!list_empty(&ctx->run_list)) { - struct kiocb *iocb; - long ret; + + if (-EIOCBRETRY == ret) { + /* + * OK, now that we are done with this iteration + * and know that there is more left to go, + * this is where we let go so that a subsequent + * "kick" can start the next iteration + */ + + /* will make __queue_kicked_iocb succeed from here on */ + INIT_LIST_HEAD(&iocb->ki_run_list); + /* we must queue the next iteration ourselves, if it + * has already been kicked */ + if (kiocbIsKicked(iocb)) { + __queue_kicked_iocb(iocb); + } + } + return ret; +} + +/* + * __aio_run_iocbs: + * Process all pending retries queued on the ioctx + * run list. + * Assumes it is operating within the aio issuer's mm + * context. Expects to be called with ctx->ctx_lock held + */ +static void __aio_run_iocbs(struct kioctx *ctx) +{ + struct kiocb *iocb; + ssize_t ret; + int count = 0; + while (!list_empty(&ctx->run_list)) { iocb = list_entry(ctx->run_list.next, struct kiocb, - ki_run_list); + ki_run_list); list_del(&iocb->ki_run_list); - iocb->ki_users ++; - spin_unlock_irq(&ctx->ctx_lock); + ret = aio_run_iocb(iocb); + count++; + } + aio_run++; +} - kiocbClearKicked(iocb); - ret = iocb->ki_retry(iocb); - if (-EIOCBQUEUED != ret) { - aio_complete(iocb, ret, 0); - iocb = NULL; - } +/* + * aio_run_iocbs: + * Process all pending retries queued on the ioctx + * run list. + * Assumes it is operating within the aio issuer's mm + * context. + */ +static inline void aio_run_iocbs(struct kioctx *ctx) +{ + spin_lock_irq(&ctx->ctx_lock); + __aio_run_iocbs(ctx); + spin_unlock_irq(&ctx->ctx_lock); +} + +/* + * aio_kick_handler: + * Work queue handler triggered to process pending + * retries on an ioctx. Takes on the aio issuer's + * mm context before running the iocbs. + * Run on aiod's context. + */ +static void aio_kick_handler(void *data) +{ + struct kioctx *ctx = data; - spin_lock_irq(&ctx->ctx_lock); - if (NULL != iocb) - __aio_put_req(ctx, iocb); - } + use_mm(ctx->mm); + spin_lock_irq(&ctx->ctx_lock); + __aio_run_iocbs(ctx); + unuse_mm(ctx->mm); spin_unlock_irq(&ctx->ctx_lock); +} + - unuse_mm(ctx->mm); +/* + * Called by kick_iocb to queue the kiocb for retry + * and if required activate the aio work queue to process + * it + */ +void queue_kicked_iocb(struct kiocb *iocb) +{ + struct kioctx *ctx = iocb->ki_ctx; + unsigned long flags; + int run = 0; + + WARN_ON((!list_empty(&iocb->ki_wait.task_list))); + + spin_lock_irqsave(&ctx->ctx_lock, flags); + run = __queue_kicked_iocb(iocb); + spin_unlock_irqrestore(&ctx->ctx_lock, flags); + if (run) { + queue_work(aio_wq, &ctx->wq); + aio_wakeups++; + } } +/* + * kick_iocb: + * Called typically from a wait queue callback context + * (aio_wake_function) to trigger a retry of the iocb. + * The retry is usually executed by aio workqueue + * threads (See aio_kick_handler). + */ void fastcall kick_iocb(struct kiocb *iocb) { - struct kioctx *ctx = iocb->ki_ctx; - /* sync iocbs are easy: they can only ever be executing from a * single context. */ if (is_sync_kiocb(iocb)) { kiocbSetKicked(iocb); - wake_up_process(iocb->ki_obj.tsk); + wake_up_process(iocb->ki_obj.tsk); return; } + iocb->ki_kicked++; + /* If its already kicked we shouldn't queue it again */ if (!kiocbTryKick(iocb)) { - unsigned long flags; - spin_lock_irqsave(&ctx->ctx_lock, flags); - list_add_tail(&iocb->ki_run_list, &ctx->run_list); - spin_unlock_irqrestore(&ctx->ctx_lock, flags); - queue_work(aio_wq, &ctx->wq); + queue_kicked_iocb(iocb); } } EXPORT_SYMBOL(kick_iocb); @@ -675,6 +913,9 @@ int fastcall aio_complete(struct kiocb * */ spin_lock_irqsave(&ctx->ctx_lock, flags); + if (iocb->ki_run_list.prev && !list_empty(&iocb->ki_run_list)) + list_del_init(&iocb->ki_run_list); + ring = kmap_atomic(info->ring_pages[0], KM_IRQ1); tail = info->tail; @@ -703,6 +944,11 @@ int fastcall aio_complete(struct kiocb * pr_debug("added to ring %p at [%lu]\n", iocb, tail); + pr_debug("%ld retries: %d of %d (kicked %ld, Q %ld run %ld wake %ld)\n", + iocb->ki_retried, + iocb->ki_nbytes - iocb->ki_left, iocb->ki_nbytes, + iocb->ki_kicked, iocb->ki_queued, aio_run, aio_wakeups); + /* everything turned out well, dispose of the aiocb. */ ret = __aio_put_req(ctx, iocb); @@ -809,6 +1055,7 @@ static int read_events(struct kioctx *ct int i = 0; struct io_event ent; struct timeout to; + int event_loop = 0; /* testing only */ /* needed to zero any padding within an entry (there shouldn't be * any, but C is fun! @@ -858,7 +1105,6 @@ static int read_events(struct kioctx *ct add_wait_queue_exclusive(&ctx->wait, &wait); do { set_task_state(tsk, TASK_INTERRUPTIBLE); - ret = aio_read_evt(ctx, &ent); if (ret) break; @@ -868,6 +1114,7 @@ static int read_events(struct kioctx *ct if (to.timed_out) /* Only check after read evt */ break; schedule(); + event_loop++; if (signal_pending(tsk)) { ret = -EINTR; break; @@ -895,6 +1142,9 @@ static int read_events(struct kioctx *ct if (timeout) clear_timeout(&to); out: + pr_debug("event loop executed %d times\n", event_loop); + pr_debug("aio_run %ld\n", aio_run); + pr_debug("aio_wakeups %ld\n", aio_wakeups); return i ? i : ret; } @@ -924,6 +1174,11 @@ static void io_destroy(struct kioctx *io aio_cancel_all(ioctx); wait_for_all_aios(ioctx); + /* + * this is an overkill, but ensures we don't leave + * the ctx on the aio_wq + */ + flush_workqueue(aio_wq); put_ioctx(ioctx); /* once for the lookup */ } @@ -962,7 +1217,7 @@ asmlinkage long sys_io_setup(unsigned nr ret = put_user(ioctx->user_id, ctxp); if (!ret) return 0; - get_ioctx(ioctx); + get_ioctx(ioctx); io_destroy(ioctx); } @@ -987,13 +1242,185 @@ asmlinkage long sys_io_destroy(aio_conte return -EINVAL; } +/* + * Retry method for aio_read (also used for first time submit) + * Responsible for updating iocb state as retries progress + */ +static ssize_t aio_pread(struct kiocb *iocb) +{ + struct file *file = iocb->ki_filp; + ssize_t ret = 0; + + ret = file->f_op->aio_read(iocb, iocb->ki_buf, + iocb->ki_left, iocb->ki_pos); + + /* + * Can't just depend on iocb->ki_left to determine + * whether we are done. This may have been a short read. + */ + if (ret > 0) { + iocb->ki_buf += ret; + iocb->ki_left -= ret; + + ret = -EIOCBRETRY; + } + + /* This means we must have transferred all that we could */ + /* No need to retry anymore */ + if ((ret == 0) || (iocb->ki_left == 0)) + ret = iocb->ki_nbytes - iocb->ki_left; + + return ret; +} + +/* + * Retry method for aio_write (also used for first time submit) + * Responsible for updating iocb state as retries progress + */ +static ssize_t aio_pwrite(struct kiocb *iocb) +{ + struct file *file = iocb->ki_filp; + ssize_t ret = 0; + + ret = file->f_op->aio_write(iocb, iocb->ki_buf, + iocb->ki_left, iocb->ki_pos); + + /* + * TBD: Even if iocb->ki_left = 0, could we need to + * wait for data to be sync'd ? Or can we assume + * that aio_fdsync/aio_fsync would be called explicitly + * as required. + */ + if (ret > 0) { + iocb->ki_buf += ret; + iocb->ki_left -= ret; + + ret = -EIOCBRETRY; + } + + /* This means we must have transferred all that we could */ + /* No need to retry anymore */ + if (ret == 0) + ret = iocb->ki_nbytes - iocb->ki_left; + + return ret; +} + +static ssize_t aio_fdsync(struct kiocb *iocb) +{ + struct file *file = iocb->ki_filp; + ssize_t ret = -EINVAL; + + if (file->f_op->aio_fsync) + ret = file->f_op->aio_fsync(iocb, 1); + return ret; +} + +static ssize_t aio_fsync(struct kiocb *iocb) +{ + struct file *file = iocb->ki_filp; + ssize_t ret = -EINVAL; + + if (file->f_op->aio_fsync) + ret = file->f_op->aio_fsync(iocb, 0); + return ret; +} + +/* + * aio_setup_iocb: + * Performs the initial checks and aio retry method + * setup for the kiocb at the time of io submission. + */ +ssize_t aio_setup_iocb(struct kiocb *kiocb) +{ + struct file *file = kiocb->ki_filp; + ssize_t ret = 0; + + switch (kiocb->ki_opcode) { + case IOCB_CMD_PREAD: + ret = -EBADF; + if (unlikely(!(file->f_mode & FMODE_READ))) + break; + ret = -EFAULT; + if (unlikely(!access_ok(VERIFY_WRITE, kiocb->ki_buf, + kiocb->ki_left))) + break; + ret = security_file_permission(file, MAY_READ); + if (ret) + break; + ret = -EINVAL; + if (file->f_op->aio_read) + kiocb->ki_retry = aio_pread; + break; + case IOCB_CMD_PWRITE: + ret = -EBADF; + if (unlikely(!(file->f_mode & FMODE_WRITE))) + break; + ret = -EFAULT; + if (unlikely(!access_ok(VERIFY_READ, kiocb->ki_buf, + kiocb->ki_left))) + break; + ret = security_file_permission(file, MAY_WRITE); + if (ret) + break; + ret = -EINVAL; + if (file->f_op->aio_write) + kiocb->ki_retry = aio_pwrite; + break; + case IOCB_CMD_FDSYNC: + ret = -EINVAL; + if (file->f_op->aio_fsync) + kiocb->ki_retry = aio_fdsync; + break; + case IOCB_CMD_FSYNC: + ret = -EINVAL; + if (file->f_op->aio_fsync) + kiocb->ki_retry = aio_fsync; + break; + default: + dprintk("EINVAL: io_submit: no operation provided\n"); + ret = -EINVAL; + } + + if (!kiocb->ki_retry) + return ret; + + return 0; +} + +/* + * aio_wake_function: + * wait queue callback function for aio notification, + * Simply triggers a retry of the operation via kick_iocb. + * + * This callback is specified in the wait queue entry in + * a kiocb (current->io_wait points to this wait queue + * entry when an aio operation executes; it is used + * instead of a synchronous wait when an i/o blocking + * condition is encountered during aio). + * + * Note: + * This routine is executed with the wait queue lock held. + * Since kick_iocb acquires iocb->ctx->ctx_lock, it nests + * the ioctx lock inside the wait queue lock. This is safe + * because this callback isn't used for wait queues which + * are nested inside ioctx lock (i.e. ctx->wait) + */ +int aio_wake_function(wait_queue_t *wait, unsigned mode, int sync, void *key) +{ + struct kiocb *iocb = container_of(wait, struct kiocb, ki_wait); + + list_del_init(&wait->task_list); + kick_iocb(iocb); + return 1; +} + int fastcall io_submit_one(struct kioctx *ctx, struct iocb __user *user_iocb, struct iocb *iocb) { struct kiocb *req; struct file *file; ssize_t ret; - char __user *buf; /* enforce forwards compatibility on users */ if (unlikely(iocb->aio_reserved1 || iocb->aio_reserved2 || @@ -1034,58 +1461,31 @@ int fastcall io_submit_one(struct kioctx req->ki_user_data = iocb->aio_data; req->ki_pos = iocb->aio_offset; - buf = (char __user *)(unsigned long)iocb->aio_buf; + req->ki_buf = (char *)(unsigned long)iocb->aio_buf; + req->ki_left = req->ki_nbytes = iocb->aio_nbytes; + req->ki_opcode = iocb->aio_lio_opcode; + init_waitqueue_func_entry(&req->ki_wait, aio_wake_function); + INIT_LIST_HEAD(&req->ki_wait.task_list); + req->ki_run_list.next = req->ki_run_list.prev = NULL; + req->ki_retry = NULL; + req->ki_retried = 0; + req->ki_kicked = 0; + req->ki_queued = 0; + aio_run = 0; + aio_wakeups = 0; - switch (iocb->aio_lio_opcode) { - case IOCB_CMD_PREAD: - ret = -EBADF; - if (unlikely(!(file->f_mode & FMODE_READ))) - goto out_put_req; - ret = -EFAULT; - if (unlikely(!access_ok(VERIFY_WRITE, buf, iocb->aio_nbytes))) - goto out_put_req; - ret = security_file_permission (file, MAY_READ); - if (ret) - goto out_put_req; - ret = -EINVAL; - if (file->f_op->aio_read) - ret = file->f_op->aio_read(req, buf, - iocb->aio_nbytes, req->ki_pos); - break; - case IOCB_CMD_PWRITE: - ret = -EBADF; - if (unlikely(!(file->f_mode & FMODE_WRITE))) - goto out_put_req; - ret = -EFAULT; - if (unlikely(!access_ok(VERIFY_READ, buf, iocb->aio_nbytes))) - goto out_put_req; - ret = security_file_permission (file, MAY_WRITE); - if (ret) - goto out_put_req; - ret = -EINVAL; - if (file->f_op->aio_write) - ret = file->f_op->aio_write(req, buf, - iocb->aio_nbytes, req->ki_pos); - break; - case IOCB_CMD_FDSYNC: - ret = -EINVAL; - if (file->f_op->aio_fsync) - ret = file->f_op->aio_fsync(req, 1); - break; - case IOCB_CMD_FSYNC: - ret = -EINVAL; - if (file->f_op->aio_fsync) - ret = file->f_op->aio_fsync(req, 0); - break; - default: - dprintk("EINVAL: io_submit: no operation provided\n"); - ret = -EINVAL; - } + ret = aio_setup_iocb(req); + if (ret) + goto out_put_req; + + spin_lock_irq(&ctx->ctx_lock); + ret = aio_run_iocb(req); + spin_unlock_irq(&ctx->ctx_lock); + + if (-EIOCBRETRY == ret) + queue_work(aio_wq, &ctx->wq); aio_put_req(req); /* drop extra ref to req */ - if (likely(-EIOCBQUEUED == ret)) - return 0; - aio_complete(req, ret, 0); /* will drop i/o ref to req */ return 0; out_put_req: Index: linux.d/include/linux/aio.h =================================================================== --- linux.d.orig/include/linux/aio.h 2004-08-18 09:39:00.000000000 -0400 +++ linux.d/include/linux/aio.h 2004-08-18 11:22:27.388557392 -0400 @@ -52,7 +52,7 @@ struct kiocb { struct file *ki_filp; struct kioctx *ki_ctx; /* may be NULL for sync ops */ int (*ki_cancel)(struct kiocb *, struct io_event *); - long (*ki_retry)(struct kiocb *); + ssize_t (*ki_retry)(struct kiocb *); void (*ki_dtor)(struct kiocb *); struct list_head ki_list; /* the aio core uses this @@ -65,6 +65,15 @@ struct kiocb { __u64 ki_user_data; /* user's data for completion */ loff_t ki_pos; void *private; + /* State that we remember to be able to restart/retry */ + unsigned short ki_opcode; + size_t ki_nbytes; /* copy of iocb->aio_nbytes */ + char *ki_buf; /* remaining iocb->aio_buf */ + size_t ki_left; /* remaining bytes */ + wait_queue_t ki_wait; + long ki_retried; /* just for testing */ + long ki_kicked; /* just for testing */ + long ki_queued; /* just for testing */ }; #define is_sync_kiocb(iocb) ((iocb)->ki_key == KIOCB_SYNC_KEY) @@ -79,6 +88,8 @@ struct kiocb { (x)->ki_cancel = NULL; \ (x)->ki_dtor = NULL; \ (x)->ki_obj.tsk = tsk; \ + (x)->ki_user_data = 0; \ + init_wait((&(x)->ki_wait)); \ } while (0) #define AIO_RING_MAGIC 0xa10a10a1 @@ -161,6 +172,17 @@ int FASTCALL(io_submit_one(struct kioctx #define get_ioctx(kioctx) do { if (unlikely(atomic_read(&(kioctx)->users) <= 0)) BUG(); atomic_inc(&(kioctx)->users); } while (0) #define put_ioctx(kioctx) do { if (unlikely(atomic_dec_and_test(&(kioctx)->users))) __put_ioctx(kioctx); else if (unlikely(atomic_read(&(kioctx)->users) < 0)) BUG(); } while (0) +#define in_aio() !is_sync_wait(current->io_wait) +/* may be used for debugging */ +#define warn_if_async() if (in_aio()) {\ + printk(KERN_ERR "%s(%s:%d) called in async context!\n", \ + __FUNCTION__, __FILE__, __LINE__); \ + dump_stack(); \ + } + +#define io_wait_to_kiocb(wait) container_of(wait, struct kiocb, ki_wait) +#define is_retried_kiocb(iocb) ((iocb)->ki_retried > 1) + #include static inline struct kiocb *list_kiocb(struct list_head *h) Index: linux.d/include/linux/errno.h =================================================================== --- linux.d.orig/include/linux/errno.h 2004-01-09 01:59:56.000000000 -0500 +++ linux.d/include/linux/errno.h 2004-08-18 11:22:27.389557240 -0400 @@ -22,6 +22,7 @@ #define EBADTYPE 527 /* Type not supported by server */ #define EJUKEBOX 528 /* Request initiated, but will not complete before timeout */ #define EIOCBQUEUED 529 /* iocb queued, will get completion event */ +#define EIOCBRETRY 530 /* iocb queued, will trigger a retry */ #endif Index: linux.d/include/linux/init_task.h =================================================================== --- linux.d.orig/include/linux/init_task.h 2004-08-18 09:39:00.000000000 -0400 +++ linux.d/include/linux/init_task.h 2004-08-18 11:22:28.351411016 -0400 @@ -112,6 +112,7 @@ extern struct group_info init_groups; .proc_lock = SPIN_LOCK_UNLOCKED, \ .switch_lock = SPIN_LOCK_UNLOCKED, \ .journal_info = NULL, \ + .io_wait = NULL, \ } Index: linux.d/include/linux/sched.h =================================================================== --- linux.d.orig/include/linux/sched.h 2004-08-18 09:39:00.000000000 -0400 +++ linux.d/include/linux/sched.h 2004-08-18 11:22:27.392556784 -0400 @@ -522,7 +522,13 @@ struct task_struct { unsigned long ptrace_message; siginfo_t *last_siginfo; /* For ptrace use. */ - +/* + * current io wait handle: wait queue entry to use for io waits + * If this thread is processing aio, this points at the waitqueue + * inside the currently handled kiocb. It may be NULL (i.e. default + * to a stack based synchronous wait) if its doing sync IO. + */ + wait_queue_t *io_wait; #ifdef CONFIG_NUMA struct mempolicy *mempolicy; short il_next; /* could be shared with used_math */ Index: linux.d/include/linux/wait.h =================================================================== --- linux.d.orig/include/linux/wait.h 2004-08-18 09:39:00.000000000 -0400 +++ linux.d/include/linux/wait.h 2004-08-18 11:22:27.393556632 -0400 @@ -80,6 +80,15 @@ static inline int waitqueue_active(wait_ return !list_empty(&q->task_list); } +/* + * Used to distinguish between sync and async io wait context: + * sync i/o typically specifies a NULL wait queue entry or a wait + * queue entry bound to a task (current task) to wake up. + * aio specifies a wait queue entry with an async notification + * callback routine, not associated with any task. + */ +#define is_sync_wait(wait) (!(wait) || ((wait)->task)) + extern void FASTCALL(add_wait_queue(wait_queue_head_t *q, wait_queue_t * wait)); extern void FASTCALL(add_wait_queue_exclusive(wait_queue_head_t *q, wait_queue_t * wait)); extern void FASTCALL(remove_wait_queue(wait_queue_head_t *q, wait_queue_t * wait)); Index: linux.d/kernel/fork.c =================================================================== --- linux.d.orig/kernel/fork.c 2004-08-18 09:39:01.000000000 -0400 +++ linux.d/kernel/fork.c 2004-08-18 11:22:27.395556328 -0400 @@ -151,7 +151,12 @@ void fastcall prepare_to_wait(wait_queue spin_lock_irqsave(&q->lock, flags); if (list_empty(&wait->task_list)) __add_wait_queue(q, wait); - set_current_state(state); + /* + * don't alter the task state if this is just going to + * queue an async wait queue callback + */ + if (is_sync_wait(wait)) + set_current_state(state); spin_unlock_irqrestore(&q->lock, flags); } @@ -166,7 +171,12 @@ prepare_to_wait_exclusive(wait_queue_hea spin_lock_irqsave(&q->lock, flags); if (list_empty(&wait->task_list)) __add_wait_queue_tail(q, wait); - set_current_state(state); + /* + * don't alter the task state if this is just going to + * queue an async wait queue callback + */ + if (is_sync_wait(wait)) + set_current_state(state); spin_unlock_irqrestore(&q->lock, flags); } @@ -964,6 +974,7 @@ struct task_struct *copy_process(unsigne p->start_time = get_jiffies_64(); p->security = NULL; p->io_context = NULL; + p->io_wait = NULL; p->audit_context = NULL; #ifdef CONFIG_NUMA p->mempolicy = mpol_copy(p->mempolicy); From: Suparna Bhattacharya This patch appears to fix the hangs seen with AIO and 4G-4G for me. It ensures that the indirect versions of copy_xxx_user are used during aio retries running in worker thread context (i.e. access aio issuer's user-space instead of kernel-space). fs/aio.c | 7 ++++++- 1 files changed, 6 insertions(+), 1 deletion(-) Index: linux.t/fs/aio.c =================================================================== --- linux.t.orig/fs/aio.c 2004-08-13 10:04:27.026069688 -0400 +++ linux.t/fs/aio.c 2004-08-13 10:12:28.898813896 -0400 @@ -802,18 +802,23 @@ static inline void aio_run_iocbs(struct * aio_kick_handler: * Work queue handler triggered to process pending * retries on an ioctx. Takes on the aio issuer's - * mm context before running the iocbs. + * mm context before running the iocbs, so that + * copy_xxx_user operates on the issuer's address + * space. * Run on aiod's context. */ static void aio_kick_handler(void *data) { struct kioctx *ctx = data; + mm_segment_t oldfs = get_fs(); + set_fs(USER_DS); use_mm(ctx->mm); spin_lock_irq(&ctx->ctx_lock); __aio_run_iocbs(ctx); unuse_mm(ctx->mm); spin_unlock_irq(&ctx->ctx_lock); + set_fs(oldfs); } From: Daniel McNeil Here is the patch for AIO retry to hold an extra ref count. The patch is small, but I wanted to make sure it was safe. I spent time looking over the retry code and this patch looks ok to me. It is potentially calling put_ioctx() while holding ctx->ctx_lock, I do not think that will cause any problems. This should never be the last reference on the ioctx anyway, since the loop is checking list_empty(&ctx->run_list). The first ref is taken in sys_io_setup() and last removed in io_destroy(). It also looks like holding ctx->ctx_lock prevents any races between any retries and an io_destroy() which would try to cancel all iocbs. I've tested this on my 2-proc by coping a raw partitions and copying ext3 files using using AIO and O_DIRECT, O_SYNC, and both. fs/aio.c | 9 +++++++-- 1 files changed, 7 insertions(+), 2 deletions(-) Index: linux.t/fs/aio.c =================================================================== --- linux.t.orig/fs/aio.c 2004-08-13 10:12:28.898813896 -0400 +++ linux.t/fs/aio.c 2004-08-13 10:12:34.105022432 -0400 @@ -771,14 +771,19 @@ out: static void __aio_run_iocbs(struct kioctx *ctx) { struct kiocb *iocb; - ssize_t ret; int count = 0; while (!list_empty(&ctx->run_list)) { iocb = list_entry(ctx->run_list.next, struct kiocb, ki_run_list); list_del(&iocb->ki_run_list); - ret = aio_run_iocb(iocb); + /* + * Hold an extra reference while retrying i/o. + */ + iocb->ki_users++; /* grab extra reference */ + aio_run_iocb(iocb); + if (__aio_put_req(ctx, iocb)) /* drop extra ref */ + put_ioctx(ctx); count++; } aio_run++; From: Suparna Bhattacharya This patch tries be a little fairer across multiple io contexts in handling retries, helping make sure progress happens uniformly across different io contexts (especially if they are acting on independent queues). It splices the ioctx runlist before processing it in __aio_run_iocbs. If new iocbs get added to the ctx in meantime, it queues a fresh workqueue entry instead of handling them righaway, so that other ioctxs' retries get a chance to be processed before the newer entries in the queue. This might make a difference in a situation where retries are getting queued very fast on one ioctx, while the workqueue entry for another ioctx is stuck behind it. I've only seen this occasionally earlier and can't recreate it consistently, but may be worth trying out. aio.c | 26 ++++++++++++++++++++------ 1 files changed, 20 insertions(+), 6 deletions(-) Index: linux.t/fs/aio.c =================================================================== --- linux.t.orig/fs/aio.c 2004-08-13 10:12:34.105022432 -0400 +++ linux.t/fs/aio.c 2004-08-13 10:12:37.728471584 -0400 @@ -768,13 +768,15 @@ out: * Assumes it is operating within the aio issuer's mm * context. Expects to be called with ctx->ctx_lock held */ -static void __aio_run_iocbs(struct kioctx *ctx) +static int __aio_run_iocbs(struct kioctx *ctx) { struct kiocb *iocb; int count = 0; + LIST_HEAD(run_list); - while (!list_empty(&ctx->run_list)) { - iocb = list_entry(ctx->run_list.next, struct kiocb, + list_splice_init(&ctx->run_list, &run_list); + while (!list_empty(&run_list)) { + iocb = list_entry(run_list.next, struct kiocb, ki_run_list); list_del(&iocb->ki_run_list); /* @@ -787,6 +789,9 @@ static void __aio_run_iocbs(struct kioct count++; } aio_run++; + if (!list_empty(&ctx->run_list)) + return 1; + return 0; } /* @@ -798,9 +803,15 @@ static void __aio_run_iocbs(struct kioct */ static inline void aio_run_iocbs(struct kioctx *ctx) { + int requeue; + spin_lock_irq(&ctx->ctx_lock); - __aio_run_iocbs(ctx); - spin_unlock_irq(&ctx->ctx_lock); + + requeue = __aio_run_iocbs(ctx); + spin_unlock_irq(&ctx->ctx_lock); + if (requeue) + queue_work(aio_wq, &ctx->wq); + } /* @@ -816,14 +827,17 @@ static void aio_kick_handler(void *data) { struct kioctx *ctx = data; mm_segment_t oldfs = get_fs(); + int requeue; set_fs(USER_DS); use_mm(ctx->mm); spin_lock_irq(&ctx->ctx_lock); - __aio_run_iocbs(ctx); + requeue =__aio_run_iocbs(ctx); unuse_mm(ctx->mm); spin_unlock_irq(&ctx->ctx_lock); set_fs(oldfs); + if (requeue) + queue_work(aio_wq, &ctx->wq); } From: Suparna Bhattacharya Async wait on page support. Implements async versions of lock_page, wait_on_page_locked, and wait_on_page_writeback which accept a wait queue entry as a parameter, and where blocking waits converted into retry exits if the wait queue entry specifies an async callback for AIO. include/linux/pagemap.h | 38 ++++++++++++++---- mm/filemap.c | 100 +++++++++++++++++++++++++++++++++++------------- 2 files changed, 105 insertions(+), 33 deletions(-) Index: linux.t/include/linux/pagemap.h =================================================================== --- linux.t.orig/include/linux/pagemap.h 2004-08-13 09:27:20.380571256 -0400 +++ linux.t/include/linux/pagemap.h 2004-08-13 10:12:40.759010872 -0400 @@ -154,17 +154,27 @@ static inline pgoff_t linear_page_index( extern void FASTCALL(__lock_page(struct page *page)); extern void FASTCALL(unlock_page(struct page *page)); -static inline void lock_page(struct page *page) + +extern int FASTCALL(__lock_page_wq(struct page *page, wait_queue_t *wait)); +static inline int lock_page_wq(struct page *page, wait_queue_t *wait) { if (TestSetPageLocked(page)) - __lock_page(page); + return __lock_page_wq(page, wait); + else + return 0; +} + +static inline void lock_page(struct page *page) +{ + lock_page_wq(page, NULL); } /* * This is exported only for wait_on_page_locked/wait_on_page_writeback. * Never use this directly! */ -extern void FASTCALL(wait_on_page_bit(struct page *page, int bit_nr)); +extern int FASTCALL(wait_on_page_bit_wq(struct page *page, int bit_nr, + wait_queue_t *wait)); /* * Wait for a page to be unlocked. @@ -173,19 +183,33 @@ extern void FASTCALL(wait_on_page_bit(st * ie with increased "page->count" so that the page won't * go away during the wait.. */ -static inline void wait_on_page_locked(struct page *page) +static inline int wait_on_page_locked_wq(struct page *page, wait_queue_t *wait) { if (PageLocked(page)) - wait_on_page_bit(page, PG_locked); + return wait_on_page_bit_wq(page, PG_locked, wait); + return 0; +} + +static inline int wait_on_page_writeback_wq(struct page *page, + wait_queue_t *wait) +{ + if (PageWriteback(page)) + return wait_on_page_bit_wq(page, PG_writeback, wait); + return 0; +} + +static inline void wait_on_page_locked(struct page *page) +{ + wait_on_page_locked_wq(page, NULL); } /* * Wait for a page to complete writeback */ + static inline void wait_on_page_writeback(struct page *page) { - if (PageWriteback(page)) - wait_on_page_bit(page, PG_writeback); + wait_on_page_writeback_wq(page, NULL); } extern void end_page_writeback(struct page *page); Index: linux.t/mm/filemap.c =================================================================== --- linux.t.orig/mm/filemap.c 2004-08-13 09:27:25.453800008 -0400 +++ linux.t/mm/filemap.c 2004-08-13 10:12:40.762010416 -0400 @@ -348,22 +348,43 @@ static void wake_up_page(struct page *pa __wake_up(waitqueue, mode, 1, page); } -void fastcall wait_on_page_bit(struct page *page, int bit_nr) +/* + * wait for the specified page bit to be cleared + * this could be a synchronous wait or could just queue an async + * notification callback depending on the wait queue entry parameter + * + * A NULL wait queue parameter defaults to sync behaviour + */ +int fastcall wait_on_page_bit_wq(struct page *page, int bit_nr, wait_queue_t *wait) { wait_queue_head_t *waitqueue = page_waitqueue(page); - DEFINE_PAGE_WAIT(wait, page, bit_nr); + DEFINE_PAGE_WAIT(local_wait, page, bit_nr); - do { - prepare_to_wait(waitqueue, &wait.wait, TASK_UNINTERRUPTIBLE); - if (test_bit(bit_nr, &page->flags)) { - sync_page(page); - io_schedule(); - } - } while (test_bit(bit_nr, &page->flags)); - finish_wait(waitqueue, &wait.wait); + if (!wait) + wait = &local_wait.wait; /* default to a sync wait entry */ + + do { + prepare_to_wait(waitqueue, wait, TASK_UNINTERRUPTIBLE); + if (test_bit(bit_nr, &page->flags)) { + sync_page(page); + if (!is_sync_wait(wait)) { + /* + * if we've queued an async wait queue + * callback do not block; just tell the + * caller to return and retry later when + * the callback is notified + */ + return -EIOCBRETRY; + } + io_schedule(); + } + } while (test_bit(bit_nr, &page->flags)); + finish_wait(waitqueue, wait); + + return 0; } - -EXPORT_SYMBOL(wait_on_page_bit); +EXPORT_SYMBOL(wait_on_page_bit_wq); + /** * unlock_page() - unlock a locked page @@ -373,8 +394,9 @@ EXPORT_SYMBOL(wait_on_page_bit); * Unlocks the page and wakes up sleepers in ___wait_on_page_locked(). * Also wakes sleepers in wait_on_page_writeback() because the wakeup * mechananism between PageLocked pages and PageWriteback pages is shared. - * But that's OK - sleepers in wait_on_page_writeback() just go back to sleep. - * + * But that's OK - sleepers in wait_on_page_writeback() just go back to sleep, + * or in case the wakeup notifies async wait queue entries, as in the case + * of aio, retries would be triggered and may re-queue their callbacks. * The first mb is necessary to safely close the critical section opened by the * TestSetPageLocked(), the second mb is necessary to enforce ordering between * the clear_bit and the read of the waitqueue (to avoid SMP races with a @@ -407,29 +429,55 @@ void end_page_writeback(struct page *pag EXPORT_SYMBOL(end_page_writeback); + /* - * Get a lock on the page, assuming we need to sleep to get it. + * Get a lock on the page, assuming we need to either sleep to get it + * or to queue an async notification callback to try again when its + * available. + * + * A NULL wait queue parameter defaults to sync behaviour. Otherwise + * it specifies the wait queue entry to be used for async notification + * or waiting. * * Ugly: running sync_page() in state TASK_UNINTERRUPTIBLE is scary. If some * random driver's requestfn sets TASK_RUNNING, we could busywait. However * chances are that on the second loop, the block layer's plug list is empty, * so sync_page() will then return in state TASK_UNINTERRUPTIBLE. */ -void fastcall __lock_page(struct page *page) +int fastcall __lock_page_wq(struct page *page, wait_queue_t *wait) { - wait_queue_head_t *wqh = page_waitqueue(page); - DEFINE_PAGE_WAIT_EXCLUSIVE(wait, page, PG_locked); + wait_queue_head_t *wqh = page_waitqueue(page); + DEFINE_PAGE_WAIT_EXCLUSIVE(local_wait, page, PG_locked); - while (TestSetPageLocked(page)) { - prepare_to_wait_exclusive(wqh, &wait.wait, TASK_UNINTERRUPTIBLE); - if (PageLocked(page)) { - sync_page(page); - io_schedule(); - } - } - finish_wait(wqh, &wait.wait); + if (!wait) + wait = &local_wait.wait; + + while (TestSetPageLocked(page)) { + prepare_to_wait_exclusive(wqh, wait, TASK_UNINTERRUPTIBLE); + if (PageLocked(page)) { + sync_page(page); + if (!is_sync_wait(wait)) { + /* + * if we've queued an async wait queue + * callback do not block; just tell the + * caller to return and retry later when + * the callback is notified + */ + return -EIOCBRETRY; + } + io_schedule(); + } + } + finish_wait(wqh, wait); + return 0; } +EXPORT_SYMBOL(__lock_page_wq); +void fastcall __lock_page(struct page *page) +{ + __lock_page_wq(page, NULL); +} + EXPORT_SYMBOL(__lock_page); /* From: Suparna Bhattacharya Filesystem aio read Converts the wait for page to become uptodate (wait for page lock) after readahead/readpage (in do_generic_mapping_read) to a retry exit. filemap.c | 21 ++++++++++++++++++--- 1 files changed, 18 insertions(+), 3 deletions(-) Index: linux.t/mm/filemap.c =================================================================== --- linux.t.orig/mm/filemap.c 2004-08-13 10:12:40.762010416 -0400 +++ linux.t/mm/filemap.c 2004-08-13 10:28:34.539014272 -0400 @@ -779,7 +779,12 @@ page_ok: page_not_up_to_date: /* Get exclusive access to the page ... */ - lock_page(page); + + if (lock_page_wq(page, current->io_wait)) { + pr_debug("queued lock page \n"); + error = -EIOCBRETRY; + goto readpage_error; + } /* Did it get unhashed before we got the lock? */ if (!page->mapping) { @@ -802,7 +807,11 @@ readpage: goto readpage_error; if (!PageUptodate(page)) { - wait_on_page_locked(page); + if (wait_on_page_locked_wq(page, current->io_wait)) { + pr_debug("queued wait_on_page \n"); + error = -EIOCBRETRY; + goto readpage_error; + } if (!PageUptodate(page)) { error = -EIO; goto readpage_error; @@ -826,7 +835,11 @@ readpage: goto page_ok; readpage_error: - /* UHHUH! A synchronous read error occurred. Report it */ + /* We don't have uptodate data in the page yet + * Could be due to an error or because we need to + * retry when we get an async i/o notification. + * Report the reason. + */ desc->error = error; page_cache_release(page); goto out; From: Suparna Bhattacharya This patch modifies do_generic_mapping_read to readahead upto ra_pages pages in the range requested upfront for AIO reads before it starts waiting for any of the pages to become uptodate. This leads to sane readahead behaviour and I/O ordering for the kind of I/O patterns generated by streaming AIO reads, by ensuring that I/O for as many consecutive blocks as possible in the first request is issued before before submission of the next request (notice that unlike sync I/O, AIO can't wait for completion of the first request before submitting the next). The patch also takes care not to repeatedly issue readaheads for subsequent AIO retries for the same request. Upfront readahead is clipped to ra_pages (128K) to maintain pipelined behaviour for very large requests, like sendfile of a large file. The tradeoff is that in the cases where individual request sizes exceed ra_pages (typically 128KB) I/O ordering wouldn't be optimal for streaming AIOs. There's a good reason why these changes are limited only to AIO. For sendfile with O_NONBLOCK in a loop, the extra upfront readahead getting issued on every iteration disturbs sequentiality of the readahead pattern resulting in non-optimal behaviour (this showed up as a regression in O_NONBLOCK sendfile for a large file). This isn't likely to be a problem with AIO sendfile when it is implemented because that wouldn't be likely to use O_NONBLOCK. filemap.c | 37 ++++++++++++++++++++++++++++++++++++- 1 files changed, 36 insertions(+), 1 deletion(-) Index: linux.t/mm/filemap.c =================================================================== --- linux.t.orig/mm/filemap.c 2004-08-13 10:28:34.539014272 -0400 +++ linux.t/mm/filemap.c 2004-08-13 10:33:10.303091760 -0400 @@ -717,13 +717,48 @@ void do_generic_mapping_read(struct addr if (index > end_index) goto out; + if (unlikely(in_aio())) { + unsigned long i, last, nr; + /* + * Let the readahead logic know upfront about all + * the pages we'll need to satisfy this request while + * taking care to avoid repeat readaheads during retries. + * Required for reasonable IO ordering with multipage + * streaming AIO requests. + */ + if ((!is_retried_kiocb(io_wait_to_kiocb(current->io_wait))) + || (ra.prev_page + 1 == index)) { + + last = (*ppos + desc->count - 1) >> PAGE_CACHE_SHIFT; + nr = max_sane_readahead(last - index + 1); + + for (i = 0; (i < nr) && ((i == 0)||(i < ra.ra_pages)); + i++) { + page_cache_readahead(mapping, &ra, filp, + index + i); + if (bdi_read_congested( + mapping->backing_dev_info)) { + printk("AIO readahead congestion\n"); + break; + } + } + } + } + for (;;) { struct page *page; unsigned long nr, ret; cond_resched(); - page_cache_readahead(mapping, &ra, filp, index); + /* + * Take care to avoid disturbing the existing readahead + * window (concurrent reads may be active for the same fd, + * in the AIO case) + */ + if (!in_aio() || (ra.prev_page + 1 == index)) + page_cache_readahead(mapping, &ra, filp, index); + find_page: page = find_get_page(mapping, index); if (unlikely(page == NULL)) { From: Chris Mason Fix for sys_io_cancel to work properly with retries when a cancel method is specified for an iocb. Needed with pipe AIO support. There's a bug in my aio cancel patch, aio_complete still makes an event for cancelled iocbs. If nobody asks for this event, we effectively leak space in the event ring buffer. I've attached a new aio_cancel patch that just skips the event creation for canceled iocbs. aio.c | 10 +++++++++- 1 files changed, 9 insertions(+), 1 deletion(-) Index: linux.t/fs/aio.c =================================================================== --- linux.t.orig/fs/aio.c 2004-08-13 10:12:37.728471584 -0400 +++ linux.t/fs/aio.c 2004-08-13 10:33:43.668019520 -0400 @@ -940,6 +940,13 @@ int fastcall aio_complete(struct kiocb * if (iocb->ki_run_list.prev && !list_empty(&iocb->ki_run_list)) list_del_init(&iocb->ki_run_list); + /* + * cancelled requests don't get events, userland was given one + * when the event got cancelled. + */ + if (kiocbIsCancelled(iocb)) + goto put_rq; + ring = kmap_atomic(info->ring_pages[0], KM_IRQ1); tail = info->tail; @@ -972,7 +979,7 @@ int fastcall aio_complete(struct kiocb * iocb->ki_retried, iocb->ki_nbytes - iocb->ki_left, iocb->ki_nbytes, iocb->ki_kicked, iocb->ki_queued, aio_run, aio_wakeups); - +put_rq: /* everything turned out well, dispose of the aiocb. */ ret = __aio_put_req(ctx, iocb); @@ -1625,6 +1632,7 @@ asmlinkage long sys_io_cancel(aio_contex if (kiocb && kiocb->ki_cancel) { cancel = kiocb->ki_cancel; kiocb->ki_users ++; + kiocbSetCancelled(kiocb); } else cancel = NULL; spin_unlock_irq(&ctx->ctx_lock); From: Suparna Bhattacharya Currently, the high level AIO code keeps issuing retries until the entire transfer is done, i.e. until all the bytes requested are read (See aio_pread), which is what we needed for filesystem aio read. However, in the pipe read case, the expected semantics would be to return as soon as it has any bytes transferred, rather than waiting for the entire transfer. This will also be true in for network aio reads if/when we implement it. Hmm, so we need to get the generic code to allow for this possibility - maybe based on a check for ISFIFO/ISSOCK ? aio.c | 12 ++++++++++-- 1 files changed, 10 insertions(+), 2 deletions(-) Index: linux.t/fs/aio.c =================================================================== --- linux.t.orig/fs/aio.c 2004-08-13 10:33:43.668019520 -0400 +++ linux.t/fs/aio.c 2004-08-13 10:33:48.340309224 -0400 @@ -1280,6 +1280,8 @@ asmlinkage long sys_io_destroy(aio_conte static ssize_t aio_pread(struct kiocb *iocb) { struct file *file = iocb->ki_filp; + struct address_space *mapping = file->f_mapping; + struct inode *inode = mapping->host; ssize_t ret = 0; ret = file->f_op->aio_read(iocb, iocb->ki_buf, @@ -1292,8 +1294,14 @@ static ssize_t aio_pread(struct kiocb *i if (ret > 0) { iocb->ki_buf += ret; iocb->ki_left -= ret; - - ret = -EIOCBRETRY; + /* + * For pipes and sockets we return once we have + * some data; for regular files we retry till we + * complete the entire read or find that we can't + * read any more data (e.g short reads). + */ + if (!S_ISFIFO(inode->i_mode) && !S_ISSOCK(inode->i_mode)) + ret = -EIOCBRETRY; } /* This means we must have transferred all that we could */ From: Chris Mason AIO support for pipes (using the retry infrastructure). They were easier than I expected ;-) This goes on top of your fsaio patches. This is only lightly tested. I missed the obvious for the pipe aio cancel routine, which is to just wake up the pipe wait queue (which is what the retry is waiting on). Here's a new pipe aio patch, along with a change to sys_aio_cancel to set the cancel bit (See aio-cancel.patch). I'm not 100% sure we need it, but it seems like a good idea. fs/pipe.c | 102 +++++++++++++++++++++++++++++++++++++--------- include/linux/pipe_fs_i.h | 2 2 files changed, 84 insertions(+), 20 deletions(-) Index: linux.t/fs/pipe.c =================================================================== --- linux.t.orig/fs/pipe.c 2004-08-13 09:27:14.697435224 -0400 +++ linux.t/fs/pipe.c 2004-08-13 10:33:51.130884992 -0400 @@ -33,15 +33,21 @@ */ /* Drop the inode semaphore and wait for a pipe event, atomically */ -void pipe_wait(struct inode * inode) +int pipe_wait(struct inode * inode) { - DEFINE_WAIT(wait); + DEFINE_WAIT(local_wait); + wait_queue_t *wait = &local_wait; - prepare_to_wait(PIPE_WAIT(*inode), &wait, TASK_INTERRUPTIBLE); + if (current->io_wait) + wait = current->io_wait; + prepare_to_wait(PIPE_WAIT(*inode), wait, TASK_INTERRUPTIBLE); + if (!is_sync_wait(wait)) + return -EIOCBRETRY; up(PIPE_SEM(*inode)); schedule(); - finish_wait(PIPE_WAIT(*inode), &wait); + finish_wait(PIPE_WAIT(*inode), wait); down(PIPE_SEM(*inode)); + return 0; } static inline int @@ -81,11 +87,11 @@ pipe_iov_copy_to_user(struct iovec *iov, iov->iov_base += copy; iov->iov_len -= copy; } - return 0; + return 0; } static ssize_t -pipe_readv(struct file *filp, const struct iovec *_iov, +pipe_aio_readv(struct file *filp, const struct iovec *_iov, unsigned long nr_segs, loff_t *ppos) { struct inode *inode = filp->f_dentry->d_inode; @@ -93,6 +99,7 @@ pipe_readv(struct file *filp, const stru ssize_t ret; struct iovec *iov = (struct iovec *)_iov; size_t total_len; + ssize_t retry; total_len = iov_length(iov, nr_segs); /* Null read succeeds. */ @@ -152,7 +159,12 @@ pipe_readv(struct file *filp, const stru wake_up_interruptible_sync(PIPE_WAIT(*inode)); kill_fasync(PIPE_FASYNC_WRITERS(*inode), SIGIO, POLL_OUT); } - pipe_wait(inode); + retry = pipe_wait(inode); + if (retry == -EIOCBRETRY) { + if (!ret) + ret = retry; + break; + } } up(PIPE_SEM(*inode)); /* Signal writers asynchronously that there is more room. */ @@ -169,11 +181,15 @@ static ssize_t pipe_read(struct file *filp, char __user *buf, size_t count, loff_t *ppos) { struct iovec iov = { .iov_base = buf, .iov_len = count }; - return pipe_readv(filp, &iov, 1, ppos); + ssize_t ret; + ret = pipe_aio_readv(filp, &iov, 1, ppos); + if (ret == -EIOCBRETRY) + BUG(); + return ret; } static ssize_t -pipe_writev(struct file *filp, const struct iovec *_iov, +pipe_aio_writev(struct file *filp, const struct iovec *_iov, unsigned long nr_segs, loff_t *ppos) { struct inode *inode = filp->f_dentry->d_inode; @@ -182,6 +198,7 @@ pipe_writev(struct file *filp, const str int do_wakeup; struct iovec *iov = (struct iovec *)_iov; size_t total_len; + int retry; total_len = iov_length(iov, nr_segs); /* Null write succeeds. */ @@ -246,7 +263,12 @@ pipe_writev(struct file *filp, const str do_wakeup = 0; } PIPE_WAITING_WRITERS(*inode)++; - pipe_wait(inode); + retry = pipe_wait(inode); + if (retry == -EIOCBRETRY) { + if (!ret) + ret = retry; + break; + } PIPE_WAITING_WRITERS(*inode)--; } up(PIPE_SEM(*inode)); @@ -264,7 +286,41 @@ pipe_write(struct file *filp, const char size_t count, loff_t *ppos) { struct iovec iov = { .iov_base = (void __user *)buf, .iov_len = count }; - return pipe_writev(filp, &iov, 1, ppos); + return pipe_aio_writev(filp, &iov, 1, ppos); +} + +static int +pipe_aio_cancel(struct kiocb *iocb, struct io_event *evt) +{ + struct inode *inode = iocb->ki_filp->f_dentry->d_inode; + evt->obj = (u64)(unsigned long)iocb->ki_obj.user; + evt->data = iocb->ki_user_data; + evt->res = iocb->ki_nbytes - iocb->ki_left; + if (evt->res == 0) + evt->res = -EINTR; + evt->res2 = 0; + wake_up_interruptible(PIPE_WAIT(*inode)); + aio_put_req(iocb); + return 0; +} + +static ssize_t +pipe_aio_write(struct kiocb *iocb, const char __user *buf, + size_t count, loff_t pos) +{ + struct file *file = iocb->ki_filp; + struct iovec iov = { .iov_base = (void __user *)buf, .iov_len = count }; + iocb->ki_cancel = pipe_aio_cancel; + return pipe_aio_writev(file, &iov, 1, &file->f_pos); +} + +static ssize_t +pipe_aio_read(struct kiocb *iocb, char __user *buf, size_t count, loff_t pos) +{ + struct file *file = iocb->ki_filp; + struct iovec iov = { .iov_base = (void __user *)buf, .iov_len = count }; + iocb->ki_cancel = pipe_aio_cancel; + return pipe_aio_readv(file, &iov, 1, &file->f_pos); } static ssize_t @@ -459,7 +515,8 @@ pipe_rdwr_open(struct inode *inode, stru struct file_operations read_fifo_fops = { .llseek = no_llseek, .read = pipe_read, - .readv = pipe_readv, + .readv = pipe_aio_readv, + .aio_read = pipe_aio_read, .write = bad_pipe_w, .poll = fifo_poll, .ioctl = pipe_ioctl, @@ -472,7 +529,8 @@ struct file_operations write_fifo_fops = .llseek = no_llseek, .read = bad_pipe_r, .write = pipe_write, - .writev = pipe_writev, + .writev = pipe_aio_writev, + .aio_write = pipe_aio_write, .poll = fifo_poll, .ioctl = pipe_ioctl, .open = pipe_write_open, @@ -483,9 +541,11 @@ struct file_operations write_fifo_fops = struct file_operations rdwr_fifo_fops = { .llseek = no_llseek, .read = pipe_read, - .readv = pipe_readv, + .readv = pipe_aio_readv, .write = pipe_write, - .writev = pipe_writev, + .writev = pipe_aio_writev, + .aio_write = pipe_aio_write, + .aio_read = pipe_aio_read, .poll = fifo_poll, .ioctl = pipe_ioctl, .open = pipe_rdwr_open, @@ -496,7 +556,8 @@ struct file_operations rdwr_fifo_fops = struct file_operations read_pipe_fops = { .llseek = no_llseek, .read = pipe_read, - .readv = pipe_readv, + .aio_read = pipe_aio_read, + .readv = pipe_aio_readv, .write = bad_pipe_w, .poll = pipe_poll, .ioctl = pipe_ioctl, @@ -509,7 +570,8 @@ struct file_operations write_pipe_fops = .llseek = no_llseek, .read = bad_pipe_r, .write = pipe_write, - .writev = pipe_writev, + .writev = pipe_aio_writev, + .aio_write = pipe_aio_write, .poll = pipe_poll, .ioctl = pipe_ioctl, .open = pipe_write_open, @@ -520,9 +582,11 @@ struct file_operations write_pipe_fops = struct file_operations rdwr_pipe_fops = { .llseek = no_llseek, .read = pipe_read, - .readv = pipe_readv, + .readv = pipe_aio_readv, + .aio_read = pipe_aio_read, + .aio_write = pipe_aio_write, .write = pipe_write, - .writev = pipe_writev, + .writev = pipe_aio_writev, .poll = pipe_poll, .ioctl = pipe_ioctl, .open = pipe_rdwr_open, Index: linux.t/include/linux/pipe_fs_i.h =================================================================== --- linux.t.orig/include/linux/pipe_fs_i.h 2004-01-09 01:59:46.000000000 -0500 +++ linux.t/include/linux/pipe_fs_i.h 2004-08-13 10:33:51.131884840 -0400 @@ -41,7 +41,7 @@ struct pipe_inode_info { #define PIPE_MAX_WCHUNK(inode) (PIPE_SIZE - PIPE_END(inode)) /* Drop the inode semaphore and wait for a pipe event, atomically */ -void pipe_wait(struct inode * inode); +int pipe_wait(struct inode * inode); struct inode* pipe_new(struct inode* inode); From: Chris Mason I compared the 2.6 pipetest results with the 2.4 suse kernel, and 2.6 was roughly 40% slower. During the pipetest run, 2.6 generates ~600,000 context switches per second while 2.4 generates 30 or so. aio-context-switch (attached) has a few changes that reduces our context switch rate, and bring performance back up to 2.4 levels. These have only really been tested against pipetest, they might make other workloads worse. The basic theory behind the patch is that it is better for the userland process to call run_iocbs than it is to schedule away and let the worker thread do it. 1) on io_submit, use run_iocbs instead of run_iocb 2) on io_getevents, call run_iocbs if no events were available. 3) don't let two procs call run_iocbs for the same context at the same time. They just end up bouncing on spinlocks. The first three optimizations got me down to 360,000 context switches per second, and they help build a little structure to allow optimization #4, which uses queue_delayed_work(HZ/10) instead of queue_work. That brings down the number of context switches to 2.4 levels. On Tue, 2004-02-24 at 13:32, Suparna Bhattacharya wrote: > On more thought ... > The aio-splice-runlist patch runs counter-purpose to some of > your optimizations. I put that one in to avoid starvation when > multiple ioctx's are in use. But it means that ctx->running > doesn't ensure that it will process the new request we just put on > the run-list. The ctx->running optimization probably isn't critical. It should be enough to call run_iocbs from io_submit_one and getevents, which will help make sure the process does its own retries whenever possible. Doing the run_iocbs from getevents is what makes the queue_delayed_work possible, since someone waiting on an event won't have to wait the extra HZ/10 for the worker thread to schedule in. Wow, 15% slower with ctx->running removed, but the number of context switches is stays nice and low. We can play with ctx->running variations later, here's a patch without them. It should be easier to apply with the rest of your code. aio.c | 18 ++++++++++++------ 1 files changed, 12 insertions(+), 6 deletions(-) Index: linux.t/fs/aio.c =================================================================== --- linux.t.orig/fs/aio.c 2004-08-13 10:33:48.340309224 -0400 +++ linux.t/fs/aio.c 2004-08-13 10:35:54.990055536 -0400 @@ -858,7 +858,7 @@ void queue_kicked_iocb(struct kiocb *ioc run = __queue_kicked_iocb(iocb); spin_unlock_irqrestore(&ctx->ctx_lock, flags); if (run) { - queue_work(aio_wq, &ctx->wq); + queue_delayed_work(aio_wq, &ctx->wq, HZ/10); aio_wakeups++; } } @@ -1087,13 +1087,14 @@ static int read_events(struct kioctx *ct struct io_event ent; struct timeout to; int event_loop = 0; /* testing only */ + int retry = 0; /* needed to zero any padding within an entry (there shouldn't be * any, but C is fun! */ memset(&ent, 0, sizeof(ent)); +retry: ret = 0; - while (likely(i < nr)) { ret = aio_read_evt(ctx, &ent); if (unlikely(ret <= 0)) @@ -1122,6 +1123,13 @@ static int read_events(struct kioctx *ct /* End fast path */ + /* racey check, but it gets redone */ + if (!retry && unlikely(!list_empty(&ctx->run_list))) { + retry = 1; + aio_run_iocbs(ctx); + goto retry; + } + init_timeout(&to); if (timeout) { struct timespec ts; @@ -1519,11 +1527,9 @@ int fastcall io_submit_one(struct kioctx goto out_put_req; spin_lock_irq(&ctx->ctx_lock); - ret = aio_run_iocb(req); + list_add_tail(&req->ki_run_list, &ctx->run_list); + __aio_run_iocbs(ctx); spin_unlock_irq(&ctx->ctx_lock); - - if (-EIOCBRETRY == ret) - queue_work(aio_wq, &ctx->wq); aio_put_req(req); /* drop extra ref to req */ return 0; From: Andrew Morton Modify mpage_writepages to optionally only write back dirty pages within a specified range in a file (as in the case of O_SYNC). Cheat a little to avoid changes to prototypes of aops - just put the hint into the writeback_control struct instead. If are not set, then default to writing back all the mapping's dirty pages. fs/mpage.c | 27 ++++++++++++++++++++++++--- include/linux/writeback.h | 21 ++++++++++++++++----- 2 files changed, 40 insertions(+), 8 deletions(-) Index: linux.t/fs/mpage.c =================================================================== --- linux.t.orig/fs/mpage.c 2004-08-13 09:27:13.809570200 -0400 +++ linux.t/fs/mpage.c 2004-08-13 10:35:57.908611848 -0400 @@ -627,7 +627,9 @@ mpage_writepages(struct address_space *m struct pagevec pvec; int nr_pages; pgoff_t index; + pgoff_t end = -1; /* Inclusive */ int scanned = 0; + int is_range = 0; if (wbc->nonblocking && bdi_write_congested(bdi)) { wbc->encountered_congestion = 1; @@ -645,9 +647,16 @@ mpage_writepages(struct address_space *m index = 0; /* whole-file sweep */ scanned = 1; } + if (wbc->start || wbc->end) { + index = wbc->start >> PAGE_CACHE_SHIFT; + end = wbc->end >> PAGE_CACHE_SHIFT; + is_range = 1; + scanned = 1; + } retry: while (!done && (nr_pages = pagevec_lookup_tag(&pvec, mapping, &index, - PAGECACHE_TAG_DIRTY, PAGEVEC_SIZE))) { + PAGECACHE_TAG_DIRTY, + min(end - index, (pgoff_t)PAGEVEC_SIZE-1) + 1))) { unsigned i; scanned = 1; @@ -664,10 +673,21 @@ retry: lock_page(page); + if (unlikely(page->mapping != mapping)) { + unlock_page(page); + continue; + } + + if (unlikely(is_range) && page->index > end) { + done = 1; + unlock_page(page); + continue; + } + if (wbc->sync_mode != WB_SYNC_NONE) wait_on_page_writeback(page); - if (page->mapping != mapping || PageWriteback(page) || + if (PageWriteback(page) || !clear_page_dirty_for_io(page)) { unlock_page(page); continue; @@ -706,7 +726,8 @@ retry: index = 0; goto retry; } - mapping->writeback_index = index; + if (!is_range) + mapping->writeback_index = index; if (bio) mpage_bio_submit(WRITE, bio); return ret; Index: linux.t/include/linux/writeback.h =================================================================== --- linux.t.orig/include/linux/writeback.h 2004-08-13 09:27:20.744515928 -0400 +++ linux.t/include/linux/writeback.h 2004-08-13 10:35:57.909611696 -0400 @@ -29,7 +29,9 @@ enum writeback_sync_modes { }; /* - * A control structure which tells the writeback code what to do + * A control structure which tells the writeback code what to do. These are + * always on the stack, and hence need no locking. They are always initialised + * in a manner such that unspecified fields are set to zero. */ struct writeback_control { struct backing_dev_info *bdi; /* If !NULL, only write back this @@ -40,10 +42,19 @@ struct writeback_control { long nr_to_write; /* Write this many pages, and decrement this for each page written */ long pages_skipped; /* Pages which were not written */ - int nonblocking; /* Don't get stuck on request queues */ - int encountered_congestion; /* An output: a queue is full */ - int for_kupdate; /* A kupdate writeback */ - int for_reclaim; /* Invoked from the page allocator */ + + /* + * For a_ops->writepages(): is start or end are non-zero then this is + * a hint that the filesystem need only write out the pages inside that + * byterange. The byte at `end' is included in the writeout request. + */ + loff_t start; + loff_t end; + + int nonblocking:1; /* Don't get stuck on request queues */ + int encountered_congestion:1; /* An output: a queue is full */ + int for_kupdate:1; /* A kupdate writeback */ + int for_reclaim:1; /* Invoked from the page allocator */ }; /* From: Suparna Bhattacharya wait_on_page_writeback_range shouldn't wait for pages beyond the specified range. Ideally, the radix-tree-lookup could accept an end_index parameter so that it doesn't return the extra pages in the first place, but for now we just add a few extra checks to skip such pages. filemap.c | 7 ++++++- 1 files changed, 6 insertions(+), 1 deletion(-) Index: linux.t/mm/filemap.c =================================================================== --- linux.t.orig/mm/filemap.c 2004-08-13 10:33:10.303091760 -0400 +++ linux.t/mm/filemap.c 2004-08-13 10:36:02.172963568 -0400 @@ -198,7 +198,8 @@ static int wait_on_page_writeback_range( pagevec_init(&pvec, 0); index = start; - while ((nr_pages = pagevec_lookup_tag(&pvec, mapping, &index, + while ((index <= end) && + (nr_pages = pagevec_lookup_tag(&pvec, mapping, &index, PAGECACHE_TAG_WRITEBACK, min(end - index, (pgoff_t)PAGEVEC_SIZE-1) + 1)) != 0) { unsigned i; @@ -206,6 +207,10 @@ static int wait_on_page_writeback_range( for (i = 0; i < nr_pages; i++) { struct page *page = pvec.pages[i]; + /* until radix tree lookup accepts end_index */ + if (page->index > end) { + continue; + } wait_on_page_writeback(page); if (PageError(page)) ret = -EIO; From: Suparna Bhattacharya Safeguard to make sure we break out of pagevec_lookup_tag loop if we are beyond the specified range. mpage.c | 3 ++- 1 files changed, 2 insertions(+), 1 deletion(-) Index: linux.t/fs/mpage.c =================================================================== --- linux.t.orig/fs/mpage.c 2004-08-13 10:35:57.908611848 -0400 +++ linux.t/fs/mpage.c 2004-08-13 10:38:57.615292280 -0400 @@ -654,7 +654,8 @@ mpage_writepages(struct address_space *m scanned = 1; } retry: - while (!done && (nr_pages = pagevec_lookup_tag(&pvec, mapping, &index, + while (!done && (index <= end) && + (nr_pages = pagevec_lookup_tag(&pvec, mapping, &index, PAGECACHE_TAG_DIRTY, min(end - index, (pgoff_t)PAGEVEC_SIZE-1) + 1))) { unsigned i; From: Suparna Bhattacharya Range based equivalent of filemap_fdatawrite for O_SYNC writers (to go with writepages range support added to mpage_writepages). If both and are zero, then it defaults to writing back all of the mapping's dirty pages. filemap.c | 23 +++++++++++++++++++++-- 1 files changed, 21 insertions(+), 2 deletions(-) Index: linux.t/mm/filemap.c =================================================================== --- linux.t.orig/mm/filemap.c 2004-08-13 10:36:02.172963568 -0400 +++ linux.t/mm/filemap.c 2004-08-13 10:39:00.797808464 -0400 @@ -142,20 +142,26 @@ static inline int sync_page(struct page } /** - * filemap_fdatawrite - start writeback against all of a mapping's dirty pages + * filemap_fdatawrite_range - start writeback against all of a mapping's + * dirty pages that lie within the byte offsets * @mapping: address space structure to write + * @start: offset in bytes where the range starts + * @end : offset in bytes where the range ends * * If sync_mode is WB_SYNC_ALL then this is a "data integrity" operation, as * opposed to a regular memory * cleansing writeback. The difference between * these two operations is that if a dirty page/buffer is encountered, it must * be waited upon, and not just skipped over. */ -static int __filemap_fdatawrite(struct address_space *mapping, int sync_mode) +static int __filemap_fdatawrite_range(struct address_space *mapping, + loff_t start, loff_t end, int sync_mode) { int ret; struct writeback_control wbc = { .sync_mode = sync_mode, .nr_to_write = mapping->nrpages * 2, + .start = start, + .end = end, }; if (mapping->backing_dev_info->memory_backed) @@ -165,12 +171,25 @@ static int __filemap_fdatawrite(struct a return ret; } +static inline int __filemap_fdatawrite(struct address_space *mapping, + int sync_mode) +{ + return __filemap_fdatawrite_range(mapping, 0, 0, sync_mode); +} + int filemap_fdatawrite(struct address_space *mapping) { return __filemap_fdatawrite(mapping, WB_SYNC_ALL); } EXPORT_SYMBOL(filemap_fdatawrite); +int filemap_fdatawrite_range(struct address_space *mapping, + loff_t start, loff_t end) +{ + return __filemap_fdatawrite_range(mapping, start, end, WB_SYNC_ALL); +} +EXPORT_SYMBOL(filemap_fdatawrite_range); + /* * This is a mostly non-blocking flush. Not suitable for data-integrity * purposes - I/O may not be started against all dirty pages. From: Andrew Morton In databases it is common to have multiple threads or processes performing O_SYNC writes against different parts of the same file. Our performance at this is poor, because each writer blocks access to the file by waiting on I/O completion while holding i_sem: everything is serialised. The patch improves things by moving the writing and waiting outside i_sem. So other threads can get in and submit their I/O and permit the disk scheduler to optimise the IO patterns better. Also, the O_SYNC writer only writes and waits on the pages which he wrote, rather than writing and waiting on all dirty pages in the file. The reason we haven't been able to do this before is that the required walk of the address_space page lists is easily livelockable without the i_sem serialisation. But in this patch we perform the waiting via a radix-tree walk of the affected pages. This cannot be livelocked. The sync of the inode's metadata is still performed inside i_sem. This is because it is list-based and is hence still livelockable. However it is usually the case that databases are overwriting existing file blocks and there will be no dirty buffers attached to the address_space anyway. The code is careful to ensure that the IO for the pages and the IO for the metadata are nonblockingly scheduled at the same time. This is am improvemtn over the current code, which will issue two separate write-and-wait cycles: one for metadata, one for pages. Note from Suparna: Reworked to use the tagged radix-tree based writeback infrastructure. include/linux/buffer_head.h | 6 -- include/linux/fs.h | 5 ++ include/linux/writeback.h | 2 mm/filemap.c | 93 +++++++++++++++++++++++++++++++++++--------- 4 files changed, 81 insertions(+), 25 deletions(-) Index: linux.t/include/linux/buffer_head.h =================================================================== --- linux.t.orig/include/linux/buffer_head.h 2004-08-13 09:28:02.465173424 -0400 +++ linux.t/include/linux/buffer_head.h 2004-08-13 10:39:03.259434240 -0400 @@ -206,12 +206,6 @@ int nobh_prepare_write(struct page*, uns int nobh_commit_write(struct file *, struct page *, unsigned, unsigned); int nobh_truncate_page(struct address_space *, loff_t); -#define OSYNC_METADATA (1<<0) -#define OSYNC_DATA (1<<1) -#define OSYNC_INODE (1<<2) -int generic_osync_inode(struct inode *, struct address_space *, int); - - /* * inline definitions */ Index: linux.t/include/linux/fs.h =================================================================== --- linux.t.orig/include/linux/fs.h 2004-08-13 09:27:56.678053200 -0400 +++ linux.t/include/linux/fs.h 2004-08-13 10:39:03.261433936 -0400 @@ -830,6 +830,11 @@ extern int vfs_rename(struct inode *, st #define DT_SOCK 12 #define DT_WHT 14 +#define OSYNC_METADATA (1<<0) +#define OSYNC_DATA (1<<1) +#define OSYNC_INODE (1<<2) +int generic_osync_inode(struct inode *, struct address_space *, int); + /* * This is the "filldir" function type, used by readdir() to let * the kernel specify what kind of dirent layout it wants to have. Index: linux.t/include/linux/writeback.h =================================================================== --- linux.t.orig/include/linux/writeback.h 2004-08-13 10:35:57.909611696 -0400 +++ linux.t/include/linux/writeback.h 2004-08-13 10:39:03.262433784 -0400 @@ -103,6 +103,8 @@ void page_writeback_init(void); void balance_dirty_pages_ratelimited(struct address_space *mapping); int pdflush_operation(void (*fn)(unsigned long), unsigned long arg0); int do_writepages(struct address_space *mapping, struct writeback_control *wbc); +int sync_page_range(struct inode *inode, struct address_space *mapping, + loff_t pos, size_t count); /* pdflush.c */ extern int nr_pdflush_threads; /* Global so it can be exported to sysctl Index: linux.t/mm/filemap.c =================================================================== --- linux.t.orig/mm/filemap.c 2004-08-13 10:39:00.797808464 -0400 +++ linux.t/mm/filemap.c 2004-08-13 10:39:03.265433328 -0400 @@ -247,6 +247,34 @@ static int wait_on_page_writeback_range( return ret; } + +/* + * Write and wait upon all the pages in the passed range. This is a "data + * integrity" operation. It waits upon in-flight writeout before starting and + * waiting upon new writeout. If there was an IO error, return it. + * + * We need to re-take i_sem during the generic_osync_inode list walk because + * it is otherwise livelockable. + */ +int sync_page_range(struct inode *inode, struct address_space *mapping, + loff_t pos, size_t count) +{ + pgoff_t start = pos >> PAGE_CACHE_SHIFT; + pgoff_t end = (pos + count - 1) >> PAGE_CACHE_SHIFT; + int ret; + + if (mapping->backing_dev_info->memory_backed || !count) + return 0; + ret = filemap_fdatawrite_range(mapping, pos, pos + count - 1); + if (ret == 0) { + down(&inode->i_sem); + ret = generic_osync_inode(inode, mapping, OSYNC_METADATA); + up(&inode->i_sem); + } + if (ret == 0) + ret = wait_on_page_writeback_range(mapping, start, end); + return ret; +} /** * filemap_fdatawait - walk the list of under-writeback pages of the given * address space and wait for all of them. @@ -2112,11 +2140,13 @@ generic_file_aio_write_nolock(struct kio /* * For now, when the user asks for O_SYNC, we'll actually give O_DSYNC */ - if (status >= 0) { - if ((file->f_flags & O_SYNC) || IS_SYNC(inode)) - status = generic_osync_inode(inode, mapping, - OSYNC_METADATA|OSYNC_DATA); - } + if (likely(status >= 0)) { + if (unlikely((file->f_flags & O_SYNC) || IS_SYNC(inode))) { + if (!a_ops->writepage || !is_sync_kiocb(iocb)) + status = generic_osync_inode(inode, mapping, + OSYNC_METADATA|OSYNC_DATA); + } + } /* * If we get here for O_DIRECT writes then we must have fallen through @@ -2156,36 +2186,52 @@ ssize_t generic_file_aio_write(struct ki size_t count, loff_t pos) { struct file *file = iocb->ki_filp; - struct inode *inode = file->f_mapping->host; - ssize_t err; - struct iovec local_iov = { .iov_base = (void __user *)buf, .iov_len = count }; + struct address_space *mapping = file->f_mapping; + struct inode *inode = mapping->host; + ssize_t ret; + struct iovec local_iov = { .iov_base = (void __user *)buf, + .iov_len = count }; BUG_ON(iocb->ki_pos != pos); down(&inode->i_sem); - err = generic_file_aio_write_nolock(iocb, &local_iov, 1, + ret = generic_file_aio_write_nolock(iocb, &local_iov, 1, &iocb->ki_pos); up(&inode->i_sem); - return err; -} + if (ret > 0 && ((file->f_flags & O_SYNC) || IS_SYNC(inode))) { + ssize_t err; + err = sync_page_range(inode, mapping, pos, ret); + if (err < 0) + ret = err; + } + return ret; +} EXPORT_SYMBOL(generic_file_aio_write); ssize_t generic_file_write(struct file *file, const char __user *buf, size_t count, loff_t *ppos) { - struct inode *inode = file->f_mapping->host; - ssize_t err; - struct iovec local_iov = { .iov_base = (void __user *)buf, .iov_len = count }; + struct address_space *mapping = file->f_mapping; + struct inode *inode = mapping->host; + ssize_t ret; + struct iovec local_iov = { .iov_base = (void __user *)buf, + .iov_len = count }; down(&inode->i_sem); - err = generic_file_write_nolock(file, &local_iov, 1, ppos); + ret = generic_file_write_nolock(file, &local_iov, 1, ppos); up(&inode->i_sem); - return err; -} + if (ret > 0 && ((file->f_flags & O_SYNC) || IS_SYNC(inode))) { + ssize_t err; + err = sync_page_range(inode, mapping, *ppos - ret, ret); + if (err < 0) + ret = err; + } + return ret; +} EXPORT_SYMBOL(generic_file_write); ssize_t generic_file_readv(struct file *filp, const struct iovec *iov, @@ -2204,14 +2250,23 @@ ssize_t generic_file_readv(struct file * EXPORT_SYMBOL(generic_file_readv); ssize_t generic_file_writev(struct file *file, const struct iovec *iov, - unsigned long nr_segs, loff_t * ppos) + unsigned long nr_segs, loff_t *ppos) { - struct inode *inode = file->f_mapping->host; + struct address_space *mapping = file->f_mapping; + struct inode *inode = mapping->host; ssize_t ret; down(&inode->i_sem); ret = generic_file_write_nolock(file, iov, nr_segs, ppos); up(&inode->i_sem); + + if (ret > 0 && ((file->f_flags & O_SYNC) || IS_SYNC(inode))) { + int err; + + err = sync_page_range(inode, mapping, *ppos - ret, ret); + if (err < 0) + ret = err; + } return ret; } From: Suparna Bhattacharya Implements async support for wait_on_page_writeback_range. Accepts a wait queue entry as an additional parameter, invokes the async version wait_on_page_writeback and takes care of propagating how much of writeback in the range completed so far (in terms of number of pages), on an -EIOCBRETRY, so that subqueuent retries can accordingly check/wait only on the remaining part of the range. filemap.c | 137 +++++++++++++++++++++++++++++++++++++++++++++++++------------- 1 files changed, 109 insertions(+), 28 deletions(-) Index: linux.t/mm/filemap.c =================================================================== --- linux.t.orig/mm/filemap.c 2004-08-13 10:39:03.265433328 -0400 +++ linux.t/mm/filemap.c 2004-08-13 10:39:17.397284960 -0400 @@ -202,24 +202,29 @@ EXPORT_SYMBOL(filemap_flush); /* * Wait for writeback to complete against pages indexed by start->end - * inclusive + * inclusive. + * This could be a synchronous wait or could just queue an async + * notification callback depending on the wait queue entry parameter + * Returns the size of the range for which writeback has been completed + * in terms of number of pages. This value is used in the AIO case + * to retry the wait for the remaining part of the range through on + * async notification. */ -static int wait_on_page_writeback_range(struct address_space *mapping, - pgoff_t start, pgoff_t end) +static ssize_t wait_on_page_writeback_range_wq(struct address_space *mapping, + pgoff_t start, pgoff_t end, wait_queue_t *wait) { struct pagevec pvec; int nr_pages; - int ret = 0; - pgoff_t index; + int ret = 0, done = 0; + pgoff_t index, curr = start; if (end < start) return 0; pagevec_init(&pvec, 0); index = start; - while ((index <= end) && - (nr_pages = pagevec_lookup_tag(&pvec, mapping, &index, - PAGECACHE_TAG_WRITEBACK, + while (!done && (index <= end) && (nr_pages = pagevec_lookup_tag(&pvec, + mapping, &index, PAGECACHE_TAG_WRITEBACK, min(end - index, (pgoff_t)PAGEVEC_SIZE-1) + 1)) != 0) { unsigned i; @@ -230,7 +235,20 @@ static int wait_on_page_writeback_range( if (page->index > end) { continue; } - wait_on_page_writeback(page); + lock_page(page); + if (unlikely(page->mapping != mapping)) { + unlock_page(page); + continue; + } + curr = page->index; + unlock_page(page); + ret = wait_on_page_writeback_wq(page, wait); + if (ret == -EIOCBRETRY) { + if (curr > start) + ret = curr - start; + done = 1; + break; + } if (PageError(page)) ret = -EIO; } @@ -244,9 +262,17 @@ static int wait_on_page_writeback_range( if (test_and_clear_bit(AS_EIO, &mapping->flags)) ret = -EIO; + if (ret == 0) + ret = end - start + 1; + return ret; } +static inline ssize_t wait_on_page_writeback_range(struct address_space + *mapping, pgoff_t start, pgoff_t end) +{ + return wait_on_page_writeback_range_wq(mapping, start, end, NULL); +} /* * Write and wait upon all the pages in the passed range. This is a "data From: Suparna Bhattacharya AIO support for O_SYNC buffered writes, built over O_SYNC-speedup an reworked against the tagged radix-tree-lookup based writeback changes. It uses the tagged radix tree lookups to writeout just the pages pertaining to this request, and then takes the retry based approach to wait for writeback to complete on the same range (using AIO enabled wait_on_page_writeback_range). All the writeout is issued at the time of io submission, and there is a check to make sure that retries skip over straight to the wait_on_page_writeback_range. The code turns out to be a little more simplified than earlier iterations by not trying to make the high level code allow for retries at blocking points other than those handled in this patch. fs/aio.c | 13 +--- include/linux/aio.h | 5 + include/linux/writeback.h | 4 + mm/filemap.c | 136 +++++++++++++++++++++++++++++++++++++++------- 4 files changed, 130 insertions(+), 28 deletions(-) Index: linux.t/fs/aio.c =================================================================== --- linux.t.orig/fs/aio.c 2004-08-13 10:35:54.990055536 -0400 +++ linux.t/fs/aio.c 2004-08-13 10:42:08.029344944 -0400 @@ -1330,16 +1330,10 @@ static ssize_t aio_pwrite(struct kiocb * ssize_t ret = 0; ret = file->f_op->aio_write(iocb, iocb->ki_buf, - iocb->ki_left, iocb->ki_pos); + iocb->ki_left, iocb->ki_pos); - /* - * TBD: Even if iocb->ki_left = 0, could we need to - * wait for data to be sync'd ? Or can we assume - * that aio_fdsync/aio_fsync would be called explicitly - * as required. - */ if (ret > 0) { - iocb->ki_buf += ret; + iocb->ki_buf += iocb->ki_buf ? ret : 0; iocb->ki_left -= ret; ret = -EIOCBRETRY; @@ -1347,8 +1341,9 @@ static ssize_t aio_pwrite(struct kiocb * /* This means we must have transferred all that we could */ /* No need to retry anymore */ - if (ret == 0) + if ((ret == 0) || (iocb->ki_left == 0)) { ret = iocb->ki_nbytes - iocb->ki_left; + } return ret; } Index: linux.t/include/linux/aio.h =================================================================== --- linux.t.orig/include/linux/aio.h 2004-08-13 09:46:17.815654840 -0400 +++ linux.t/include/linux/aio.h 2004-08-13 10:42:08.031344640 -0400 @@ -27,21 +27,26 @@ struct kioctx; #define KIF_LOCKED 0 #define KIF_KICKED 1 #define KIF_CANCELLED 2 +#define KIF_SYNCED 3 #define kiocbTryLock(iocb) test_and_set_bit(KIF_LOCKED, &(iocb)->ki_flags) #define kiocbTryKick(iocb) test_and_set_bit(KIF_KICKED, &(iocb)->ki_flags) +#define kiocbTrySync(iocb) test_and_set_bit(KIF_SYNCED, &(iocb)->ki_flags) #define kiocbSetLocked(iocb) set_bit(KIF_LOCKED, &(iocb)->ki_flags) #define kiocbSetKicked(iocb) set_bit(KIF_KICKED, &(iocb)->ki_flags) #define kiocbSetCancelled(iocb) set_bit(KIF_CANCELLED, &(iocb)->ki_flags) +#define kiocbSetSynced(iocb) set_bit(KIF_SYNCED, &(iocb)->ki_flags) #define kiocbClearLocked(iocb) clear_bit(KIF_LOCKED, &(iocb)->ki_flags) #define kiocbClearKicked(iocb) clear_bit(KIF_KICKED, &(iocb)->ki_flags) #define kiocbClearCancelled(iocb) clear_bit(KIF_CANCELLED, &(iocb)->ki_flags) +#define kiocbClearSynced(iocb) clear_bit(KIF_SYNCED, &(iocb)->ki_flags) #define kiocbIsLocked(iocb) test_bit(KIF_LOCKED, &(iocb)->ki_flags) #define kiocbIsKicked(iocb) test_bit(KIF_KICKED, &(iocb)->ki_flags) #define kiocbIsCancelled(iocb) test_bit(KIF_CANCELLED, &(iocb)->ki_flags) +#define kiocbIsSynced(iocb) test_bit(KIF_SYNCED, &(iocb)->ki_flags) struct kiocb { struct list_head ki_run_list; Index: linux.t/include/linux/writeback.h =================================================================== --- linux.t.orig/include/linux/writeback.h 2004-08-13 10:39:03.262433784 -0400 +++ linux.t/include/linux/writeback.h 2004-08-13 10:42:08.031344640 -0400 @@ -103,8 +103,10 @@ void page_writeback_init(void); void balance_dirty_pages_ratelimited(struct address_space *mapping); int pdflush_operation(void (*fn)(unsigned long), unsigned long arg0); int do_writepages(struct address_space *mapping, struct writeback_control *wbc); -int sync_page_range(struct inode *inode, struct address_space *mapping, +ssize_t sync_page_range(struct inode *inode, struct address_space *mapping, loff_t pos, size_t count); +ssize_t sync_page_range_nolock(struct inode *inode, struct address_space + *mapping, loff_t pos, size_t count); /* pdflush.c */ extern int nr_pdflush_threads; /* Global so it can be exported to sysctl Index: linux.t/mm/filemap.c =================================================================== --- linux.t.orig/mm/filemap.c 2004-08-13 10:39:17.397284960 -0400 +++ linux.t/mm/filemap.c 2004-08-13 10:42:08.035344032 -0400 @@ -282,23 +282,75 @@ static inline ssize_t wait_on_page_write * We need to re-take i_sem during the generic_osync_inode list walk because * it is otherwise livelockable. */ -int sync_page_range(struct inode *inode, struct address_space *mapping, +ssize_t sync_page_range(struct inode *inode, struct address_space *mapping, loff_t pos, size_t count) { pgoff_t start = pos >> PAGE_CACHE_SHIFT; pgoff_t end = (pos + count - 1) >> PAGE_CACHE_SHIFT; - int ret; + ssize_t ret = 0; if (mapping->backing_dev_info->memory_backed || !count) return 0; + if (in_aio()) { + /* Already issued writeouts for this iocb ? */ + if (kiocbTrySync(io_wait_to_kiocb(current->io_wait))) + goto do_wait; /* just need to check if done */ + } ret = filemap_fdatawrite_range(mapping, pos, pos + count - 1); - if (ret == 0) { + + if (ret >= 0) { down(&inode->i_sem); ret = generic_osync_inode(inode, mapping, OSYNC_METADATA); up(&inode->i_sem); } - if (ret == 0) - ret = wait_on_page_writeback_range(mapping, start, end); +do_wait: + if (ret >= 0) { + ret = wait_on_page_writeback_range_wq(mapping, start, end, + current->io_wait); + if (ret > 0) { + ret <<= PAGE_CACHE_SHIFT; + if (ret > count) + ret = count; + } + } + return ret; +} + +/* + * It is really better to use sync_page_range, rather than call + * sync_page_range_nolock while holding i_sem, if you don't + * want to block parallel O_SYNC writes until the pages in this + * range are written out. + */ +ssize_t sync_page_range_nolock(struct inode *inode, struct address_space + *mapping, loff_t pos, size_t count) +{ + pgoff_t start = pos >> PAGE_CACHE_SHIFT; + pgoff_t end = (pos + count - 1) >> PAGE_CACHE_SHIFT; + ssize_t ret = 0; + + if (mapping->backing_dev_info->memory_backed || !count) + return 0; + if (in_aio()) { + /* Already issued writeouts for this iocb ? */ + if (kiocbTrySync(io_wait_to_kiocb(current->io_wait))) + goto do_wait; /* just need to check if done */ + } + ret = filemap_fdatawrite_range(mapping, pos, pos + count - 1); + + if (ret >= 0) { + ret = generic_osync_inode(inode, mapping, OSYNC_METADATA); + } +do_wait: + if (ret >= 0) { + ret = wait_on_page_writeback_range_wq(mapping, start, end, + current->io_wait); + if (ret > 0) { + ret <<= PAGE_CACHE_SHIFT; + if (ret > count) + ret = count; + } + } return ret; } /** @@ -309,7 +361,10 @@ int sync_page_range(struct inode *inode, */ int filemap_fdatawait(struct address_space *mapping) { - return wait_on_page_writeback_range(mapping, 0, -1); + int ret = wait_on_page_writeback_range(mapping, 0, -1); + if (ret > 0) + ret = 0; + return ret; } EXPORT_SYMBOL(filemap_fdatawait); @@ -1987,7 +2042,7 @@ EXPORT_SYMBOL(generic_write_checks); * okir@monad.swb.de */ ssize_t -generic_file_aio_write_nolock(struct kiocb *iocb, const struct iovec *iov, +__generic_file_aio_write_nolock(struct kiocb *iocb, const struct iovec *iov, unsigned long nr_segs, loff_t *ppos) { struct file *file = iocb->ki_filp; @@ -2168,7 +2223,7 @@ generic_file_aio_write_nolock(struct kio */ if (likely(status >= 0)) { if (unlikely((file->f_flags & O_SYNC) || IS_SYNC(inode))) { - if (!a_ops->writepage || !is_sync_kiocb(iocb)) + if (!a_ops->writepage) status = generic_osync_inode(inode, mapping, OSYNC_METADATA|OSYNC_DATA); } @@ -2193,6 +2248,48 @@ out: EXPORT_SYMBOL(generic_file_aio_write_nolock); ssize_t +generic_file_aio_write_nolock(struct kiocb *iocb, const struct iovec *iov, + unsigned long nr_segs, loff_t *ppos) +{ + struct file *file = iocb->ki_filp; + struct address_space *mapping = file->f_mapping; + struct inode *inode = mapping->host; + ssize_t ret; + loff_t pos = *ppos; + + if (!is_sync_kiocb(iocb) && kiocbIsSynced(iocb)) { + /* nothing to transfer, may just need to sync data */ + ret = iov->iov_len; /* vector AIO not supported yet */ + goto osync; + } + + ret = __generic_file_aio_write_nolock(iocb, iov, nr_segs, ppos); + +osync: + if (ret > 0 && ((file->f_flags & O_SYNC) || IS_SYNC(inode))) { + ret = sync_page_range_nolock(inode, mapping, pos, ret); + if (ret >= 0) + *ppos = pos + ret; + } + return ret; +} + + +ssize_t +__generic_file_write_nolock(struct file *file, const struct iovec *iov, + unsigned long nr_segs, loff_t *ppos) +{ + struct kiocb kiocb; + ssize_t ret; + + init_sync_kiocb(&kiocb, file); + ret = __generic_file_aio_write_nolock(&kiocb, iov, nr_segs, ppos); + if (-EIOCBQUEUED == ret) + ret = wait_on_sync_kiocb(&kiocb); + return ret; +} + +ssize_t generic_file_write_nolock(struct file *file, const struct iovec *iov, unsigned long nr_segs, loff_t *ppos) { @@ -2218,19 +2315,22 @@ ssize_t generic_file_aio_write(struct ki struct iovec local_iov = { .iov_base = (void __user *)buf, .iov_len = count }; - BUG_ON(iocb->ki_pos != pos); + if (!is_sync_kiocb(iocb) && kiocbIsSynced(iocb)) { + /* nothing to transfer, may just need to sync data */ + ret = count; + goto osync; + } down(&inode->i_sem); - ret = generic_file_aio_write_nolock(iocb, &local_iov, 1, + ret = __generic_file_aio_write_nolock(iocb, &local_iov, 1, &iocb->ki_pos); up(&inode->i_sem); +osync: if (ret > 0 && ((file->f_flags & O_SYNC) || IS_SYNC(inode))) { - ssize_t err; - - err = sync_page_range(inode, mapping, pos, ret); - if (err < 0) - ret = err; + ret = sync_page_range(inode, mapping, pos, ret); + if (ret >= 0) + iocb->ki_pos = pos + ret; } return ret; } @@ -2246,7 +2346,7 @@ ssize_t generic_file_write(struct file * .iov_len = count }; down(&inode->i_sem); - ret = generic_file_write_nolock(file, &local_iov, 1, ppos); + ret = __generic_file_write_nolock(file, &local_iov, 1, ppos); up(&inode->i_sem); if (ret > 0 && ((file->f_flags & O_SYNC) || IS_SYNC(inode))) { @@ -2283,11 +2383,11 @@ ssize_t generic_file_writev(struct file ssize_t ret; down(&inode->i_sem); - ret = generic_file_write_nolock(file, iov, nr_segs, ppos); + ret = __generic_file_write_nolock(file, iov, nr_segs, ppos); up(&inode->i_sem); if (ret > 0 && ((file->f_flags & O_SYNC) || IS_SYNC(inode))) { - int err; + ssize_t err; err = sync_page_range(inode, mapping, *ppos - ret, ret); if (err < 0) From: Chris Mason We need aio poll for sles8 compatibility, so I whipped up a quick and dirty 2.6 aio poll patch. Eventually I'll probably add some simple socket support to aio-stress, but I'm hoping to validate the code a little now before putting it into our CVS. I've attached the patch, it's not quite as obvious as the pipe code, but not too bad. I'm not sure if I'm using struct kiocb->private the way it was intended, but I don't see any other code touching it, so it should be ok. On Mon 2004-02-23 at 14:05, Suparna Bhattacharya wrote: > I was wondering if a particular fop->poll routine, could possibly > invoke __pollwait for more than one wait queue (I don't know if such > a case even exists). That kind of a thing would work OK with the existing > poll logic, but not in our case, because we'd end up queueing the same > wait queue on two queues which would be a problem. Oh, I see what you mean. I looked at a few of the poll_wait callers, and it seems safe, but there are too many for a full audit right now. The attached patch fixes the page allocation problem and adds a check to make sure we don't abuse current->io_wait. An oops is better than random corruption at least. I ran it through my basic test and pipetest, the pipetest results are below. The pipetest epoll usage needs updating, so I can only compare against regular poll. ./pipetest --aio-poll 10000 1 5 using 10000 pipe pairs, 1 message threads, 5 generations, 12 bufsize Ok! Mode aio-poll: 5 passes in 0.000073 seconds passes_per_sec: 68493.15 coffee:/usr/src/aio # ./pipetest 10000 1 5 using 10000 pipe pairs, 1 message threads, 5 generations, 12 bufsize Ok! Mode poll: 5 passes in 0.083066 seconds passes_per_sec: 60.19 Here are some optimizations. aio-poll-3 avoids wake_up when it can use finish_wait instead, and adds a fast path to aio-poll for when data is already available. fs/aio.c | 17 +++++++ fs/select.c | 104 +++++++++++++++++++++++++++++++++++++++++++++++- include/linux/aio.h | 1 include/linux/aio_abi.h | 2 4 files changed, 122 insertions(+), 2 deletions(-) Index: linux.t/fs/aio.c =================================================================== --- linux.t.orig/fs/aio.c 2004-08-13 10:42:08.029344944 -0400 +++ linux.t/fs/aio.c 2004-08-13 10:42:14.724327152 -0400 @@ -1369,6 +1369,16 @@ static ssize_t aio_fsync(struct kiocb *i } /* + * Retry method for aio_poll (also used for first time submit) + * Responsible for updating iocb state as retries progress + */ +static ssize_t aio_poll(struct kiocb *iocb) +{ + unsigned events = (unsigned)(iocb->ki_buf); + return generic_aio_poll(iocb, events); +} + +/* * aio_setup_iocb: * Performs the initial checks and aio retry method * setup for the kiocb at the time of io submission. @@ -1419,6 +1429,13 @@ ssize_t aio_setup_iocb(struct kiocb *kio if (file->f_op->aio_fsync) kiocb->ki_retry = aio_fsync; break; + case IOCB_CMD_POLL: + ret = -EINVAL; + if (file->f_op->poll) { + memset(kiocb->private, 0, sizeof(kiocb->private)); + kiocb->ki_retry = aio_poll; + } + break; default: dprintk("EINVAL: io_submit: no operation provided\n"); ret = -EINVAL; Index: linux.t/fs/select.c =================================================================== --- linux.t.orig/fs/select.c 2004-01-09 01:59:05.000000000 -0500 +++ linux.t/fs/select.c 2004-08-13 10:42:14.725327000 -0400 @@ -21,6 +21,7 @@ #include /* for STICKY_TIMEOUTS */ #include #include +#include #include @@ -39,6 +40,12 @@ struct poll_table_page { struct poll_table_entry entries[0]; }; +struct aio_poll_table { + int init; + struct poll_wqueues wq; + struct poll_table_page table; +}; + #define POLL_TABLE_FULL(table) \ ((unsigned long)((table)->entry+1) > PAGE_SIZE + (unsigned long)(table)) @@ -109,12 +116,34 @@ void __pollwait(struct file *filp, wait_ /* Add a new entry */ { struct poll_table_entry * entry = table->entry; + wait_queue_t *wait; + wait_queue_t *aio_wait = current->io_wait; + + if (aio_wait) { + /* for aio, there can only be one wait_address. + * we might be adding it again via a retry call + * if so, just return. + * if not, bad things are happening + */ + if (table->entry != table->entries) { + if (table->entries[0].wait_address != wait_address) + BUG(); + return; + } + } + table->entry = entry+1; get_file(filp); entry->filp = filp; entry->wait_address = wait_address; init_waitqueue_entry(&entry->wait, current); - add_wait_queue(wait_address,&entry->wait); + + /* if we're in aioland, use current->io_wait */ + if (aio_wait) + wait = aio_wait; + else + wait = &entry->wait; + add_wait_queue(wait_address,wait); } } @@ -533,3 +562,76 @@ out_fds: poll_freewait(&table); return err; } + +static void aio_poll_freewait(struct aio_poll_table *ap, struct kiocb *iocb) +{ + struct poll_table_page * p = ap->wq.table; + if (p) { + struct poll_table_entry * entry = p->entry; + if (entry > p->entries) { + /* + * there is only one entry for aio polls + */ + entry = p->entries; + if (iocb) + finish_wait(entry->wait_address,&iocb->ki_wait); + else + wake_up(entry->wait_address); + fput(entry->filp); + } + } + ap->init = 0; +} + +static int +aio_poll_cancel(struct kiocb *iocb, struct io_event *evt) +{ + struct aio_poll_table *aio_table; + aio_table = (struct aio_poll_table *)iocb->private; + + evt->obj = (u64)(unsigned long)iocb-> ki_obj.user; + evt->data = iocb->ki_user_data; + evt->res = iocb->ki_nbytes - iocb->ki_left; + if (evt->res == 0) + evt->res = -EINTR; + evt->res2 = 0; + if (aio_table->init) + aio_poll_freewait(aio_table, NULL); + aio_put_req(iocb); + return 0; +} + +ssize_t generic_aio_poll(struct kiocb *iocb, unsigned events) +{ + struct aio_poll_table *aio_table; + unsigned mask; + struct file *file = iocb->ki_filp; + aio_table = (struct aio_poll_table *)iocb->private; + + /* fast path */ + mask = file->f_op->poll(file, NULL); + mask &= events | POLLERR | POLLHUP; + if (mask) + return mask; + + if ((sizeof(*aio_table) + sizeof(struct poll_table_entry)) > + sizeof(iocb->private)) + BUG(); + + if (!aio_table->init) { + aio_table->init = 1; + poll_initwait(&aio_table->wq); + aio_table->wq.table = &aio_table->table; + aio_table->table.next = NULL; + aio_table->table.entry = aio_table->table.entries; + } + iocb->ki_cancel = aio_poll_cancel; + + mask = file->f_op->poll(file, &aio_table->wq.pt); + mask &= events | POLLERR | POLLHUP; + if (mask) { + aio_poll_freewait(aio_table, iocb); + return mask; + } + return -EIOCBRETRY; +} Index: linux.t/include/linux/aio.h =================================================================== --- linux.t.orig/include/linux/aio.h 2004-08-13 10:42:08.031344640 -0400 +++ linux.t/include/linux/aio.h 2004-08-13 10:42:14.726326848 -0400 @@ -199,4 +199,5 @@ static inline struct kiocb *list_kiocb(s extern atomic_t aio_nr; extern unsigned aio_max_nr; +extern ssize_t generic_aio_poll(struct kiocb *, unsigned); #endif /* __LINUX__AIO_H */ Index: linux.t/include/linux/aio_abi.h =================================================================== --- linux.t.orig/include/linux/aio_abi.h 2004-01-09 01:59:09.000000000 -0500 +++ linux.t/include/linux/aio_abi.h 2004-08-13 10:42:14.727326696 -0400 @@ -38,8 +38,8 @@ enum { IOCB_CMD_FDSYNC = 3, /* These two are experimental. * IOCB_CMD_PREADX = 4, - * IOCB_CMD_POLL = 5, */ + IOCB_CMD_POLL = 5, IOCB_CMD_NOOP = 6, }; From: Chris Mason While testing fsaio here, I hit an oops in kick_iocb because iocb->mm was null. This was right as the program was exiting. With the patch below, I wasn't able to reproduce, it makes sure we flush the workqueue every time __put_ioctx gets called. aio.c | 6 +----- 1 files changed, 1 insertion(+), 5 deletions(-) Index: linux.t/fs/aio.c =================================================================== --- linux.t.orig/fs/aio.c 2004-08-13 10:42:14.724327152 -0400 +++ linux.t/fs/aio.c 2004-08-13 10:42:18.739716720 -0400 @@ -368,6 +368,7 @@ void fastcall __put_ioctx(struct kioctx if (unlikely(ctx->reqs_active)) BUG(); + flush_workqueue(aio_wq); aio_free_ring(ctx); mmdrop(ctx->mm); ctx->mm = NULL; @@ -1213,11 +1214,6 @@ static void io_destroy(struct kioctx *io aio_cancel_all(ioctx); wait_for_all_aios(ioctx); - /* - * this is an overkill, but ensures we don't leave - * the ctx on the aio_wq - */ - flush_workqueue(aio_wq); put_ioctx(ioctx); /* once for the lookup */ } From: Chris Mason aio.c | 39 +++++++++++++++++++++++++++++++++++---- 1 files changed, 35 insertions(+), 4 deletions(-) Index: linux.t/fs/aio.c =================================================================== --- linux.t.orig/fs/aio.c 2004-08-13 10:42:18.739716720 -0400 +++ linux.t/fs/aio.c 2004-08-13 10:42:24.210884976 -0400 @@ -368,6 +368,7 @@ void fastcall __put_ioctx(struct kioctx if (unlikely(ctx->reqs_active)) BUG(); + cancel_delayed_work(&ctx->wq); flush_workqueue(aio_wq); aio_free_ring(ctx); mmdrop(ctx->mm); @@ -795,6 +796,22 @@ static int __aio_run_iocbs(struct kioctx return 0; } +static void aio_queue_work(struct kioctx * ctx) +{ + unsigned long timeout; + /* + * if someone is waiting, get the work started right + * away, otherwise, use a longer delay + */ + smp_mb(); + if (waitqueue_active(&ctx->wait)) + timeout = 1; + else + timeout = HZ/10; + queue_delayed_work(aio_wq, &ctx->wq, timeout); +} + + /* * aio_run_iocbs: * Process all pending retries queued on the ioctx @@ -811,8 +828,18 @@ static inline void aio_run_iocbs(struct requeue = __aio_run_iocbs(ctx); spin_unlock_irq(&ctx->ctx_lock); if (requeue) - queue_work(aio_wq, &ctx->wq); + aio_queue_work(ctx); +} +/* + * just like aio_run_iocbs, but keeps running them until + * the list stays empty + */ +static inline void aio_run_all_iocbs(struct kioctx *ctx) +{ + spin_lock_irq(&ctx->ctx_lock); + while( __aio_run_iocbs(ctx)); + spin_unlock_irq(&ctx->ctx_lock); } /* @@ -837,6 +864,9 @@ static void aio_kick_handler(void *data) unuse_mm(ctx->mm); spin_unlock_irq(&ctx->ctx_lock); set_fs(oldfs); + /* + * we're in a worker thread already, don't use queue_delayed_work, + */ if (requeue) queue_work(aio_wq, &ctx->wq); } @@ -859,7 +889,7 @@ void queue_kicked_iocb(struct kiocb *ioc run = __queue_kicked_iocb(iocb); spin_unlock_irqrestore(&ctx->ctx_lock, flags); if (run) { - queue_delayed_work(aio_wq, &ctx->wq, HZ/10); + aio_queue_work(ctx); aio_wakeups++; } } @@ -1127,7 +1157,7 @@ retry: /* racey check, but it gets redone */ if (!retry && unlikely(!list_empty(&ctx->run_list))) { retry = 1; - aio_run_iocbs(ctx); + aio_run_all_iocbs(ctx); goto retry; } @@ -1536,7 +1566,8 @@ int fastcall io_submit_one(struct kioctx spin_lock_irq(&ctx->ctx_lock); list_add_tail(&req->ki_run_list, &ctx->run_list); - __aio_run_iocbs(ctx); + /* drain the run list */ + while(__aio_run_iocbs(ctx)); spin_unlock_irq(&ctx->ctx_lock); aio_put_req(req); /* drop extra ref to req */ return 0; wait_on_page_writeback_range tried to return a count of the number of bytes it managed to wait on before hitting -EIOCBRETRY. The math was off though, leading to the possibility of telling the aio subsystem all waiting was finished before it really was. Since there is no easy way to turn a page index into the correct number of bytes, this patch just removes the optimization entirely. Index: linux.t/mm/filemap.c =================================================================== --- linux.t.orig/mm/filemap.c 2004-08-13 10:42:08.035344032 -0400 +++ linux.t/mm/filemap.c 2004-08-13 10:53:37.522526048 -0400 @@ -216,7 +216,7 @@ static ssize_t wait_on_page_writeback_ra struct pagevec pvec; int nr_pages; int ret = 0, done = 0; - pgoff_t index, curr = start; + pgoff_t index; if (end < start) return 0; @@ -235,17 +235,10 @@ static ssize_t wait_on_page_writeback_ra if (page->index > end) { continue; } - lock_page(page); - if (unlikely(page->mapping != mapping)) { - unlock_page(page); + if (unlikely(page->mapping != mapping)) continue; - } - curr = page->index; - unlock_page(page); ret = wait_on_page_writeback_wq(page, wait); if (ret == -EIOCBRETRY) { - if (curr > start) - ret = curr - start; done = 1; break; } __lock_page_wq uses prepare_to_wait_exclusive, which causes problems for aio, since it changes current->io_wait to always be exclusive. Index: linux.t/mm/filemap.c =================================================================== --- linux.t.orig/mm/filemap.c 2004-08-13 14:52:11.000000000 -0400 +++ linux.t/mm/filemap.c 2004-08-13 20:34:24.850903920 -0400 @@ -579,7 +579,11 @@ int fastcall __lock_page_wq(struct page wait = &local_wait.wait; while (TestSetPageLocked(page)) { - prepare_to_wait_exclusive(wqh, wait, TASK_UNINTERRUPTIBLE); + if (wait == &local_wait.wait) + prepare_to_wait_exclusive(wqh, wait, + TASK_UNINTERRUPTIBLE); + else + prepare_to_wait(wqh, wait, TASK_UNINTERRUPTIBLE); if (PageLocked(page)) { sync_page(page); if (!is_sync_wait(wait)) {