target: use a workqueue for I/O completions
Christoph Hellwig [Mon, 17 Oct 2011 17:56:53 +0000 (13:56 -0400)]
Instead of abusing the target processing thread for offloading I/O
completion in the backends to user context add a new workqueue.  This means
completions can be processed as fast as available CPU time allows it,
including in parallel with other completions and more importantly I/O
submission or QUEUE FULL retries.  This should give much better performance
especially on loaded systems.

As a fallout we can merge all the completed states into a single
one.

On the downside this change complicates lun reset handling a bit by
requiring us to cancel a work item only for those states that have it
initialized.  The alternative would be to either always initialize the work
item to a dummy handler, or always use the same handler and do a switch on
the state. The long term solution will be a flag that says that the command
has an initialized work item, but that's only going to be useful once we
have more users.

Signed-off-by: Christoph Hellwig <hch@lst.de>
Signed-off-by: Nicholas Bellinger <nab@linux-iscsi.org>

drivers/target/target_core_tmr.c
drivers/target/target_core_transport.c
include/target/target_core_base.h

index 532ce31..570b144 100644 (file)
@@ -255,6 +255,16 @@ static void core_tmr_drain_task_list(
                        atomic_read(&cmd->t_transport_stop),
                        atomic_read(&cmd->t_transport_sent));
 
+               /*
+                * If the command may be queued onto a workqueue cancel it now.
+                *
+                * This is equivalent to removal from the execute queue in the
+                * loop above, but we do it down here given that
+                * cancel_work_sync may block.
+                */
+               if (cmd->t_state == TRANSPORT_COMPLETE)
+                       cancel_work_sync(&cmd->work);
+
                spin_lock_irqsave(&cmd->t_state_lock, flags);
                target_stop_task(task, &flags);
 
index 87beae6..774ff00 100644 (file)
@@ -58,6 +58,7 @@
 
 static int sub_api_initialized;
 
+static struct workqueue_struct *target_completion_wq;
 static struct kmem_cache *se_cmd_cache;
 static struct kmem_cache *se_sess_cache;
 struct kmem_cache *se_tmr_req_cache;
@@ -84,6 +85,8 @@ static int transport_generic_get_mem(struct se_cmd *cmd);
 static void transport_put_cmd(struct se_cmd *cmd);
 static void transport_remove_cmd_from_queue(struct se_cmd *cmd);
 static int transport_set_sense_codes(struct se_cmd *cmd, u8 asc, u8 ascq);
+static void transport_generic_request_failure(struct se_cmd *, int, int);
+static void target_complete_ok_work(struct work_struct *work);
 
 int init_se_kmem_caches(void)
 {
@@ -99,7 +102,7 @@ int init_se_kmem_caches(void)
        if (!se_tmr_req_cache) {
                pr_err("kmem_cache_create() for struct se_tmr_req"
                                " failed\n");
-               goto out;
+               goto out_free_cmd_cache;
        }
        se_sess_cache = kmem_cache_create("se_sess_cache",
                        sizeof(struct se_session), __alignof__(struct se_session),
@@ -107,14 +110,14 @@ int init_se_kmem_caches(void)
        if (!se_sess_cache) {
                pr_err("kmem_cache_create() for struct se_session"
                                " failed\n");
-               goto out;
+               goto out_free_tmr_req_cache;
        }
        se_ua_cache = kmem_cache_create("se_ua_cache",
                        sizeof(struct se_ua), __alignof__(struct se_ua),
                        0, NULL);
        if (!se_ua_cache) {
                pr_err("kmem_cache_create() for struct se_ua failed\n");
-               goto out;
+               goto out_free_sess_cache;
        }
        t10_pr_reg_cache = kmem_cache_create("t10_pr_reg_cache",
                        sizeof(struct t10_pr_registration),
@@ -122,7 +125,7 @@ int init_se_kmem_caches(void)
        if (!t10_pr_reg_cache) {
                pr_err("kmem_cache_create() for struct t10_pr_registration"
                                " failed\n");
-               goto out;
+               goto out_free_ua_cache;
        }
        t10_alua_lu_gp_cache = kmem_cache_create("t10_alua_lu_gp_cache",
                        sizeof(struct t10_alua_lu_gp), __alignof__(struct t10_alua_lu_gp),
@@ -130,7 +133,7 @@ int init_se_kmem_caches(void)
        if (!t10_alua_lu_gp_cache) {
                pr_err("kmem_cache_create() for t10_alua_lu_gp_cache"
                                " failed\n");
-               goto out;
+               goto out_free_pr_reg_cache;
        }
        t10_alua_lu_gp_mem_cache = kmem_cache_create("t10_alua_lu_gp_mem_cache",
                        sizeof(struct t10_alua_lu_gp_member),
@@ -138,7 +141,7 @@ int init_se_kmem_caches(void)
        if (!t10_alua_lu_gp_mem_cache) {
                pr_err("kmem_cache_create() for t10_alua_lu_gp_mem_"
                                "cache failed\n");
-               goto out;
+               goto out_free_lu_gp_cache;
        }
        t10_alua_tg_pt_gp_cache = kmem_cache_create("t10_alua_tg_pt_gp_cache",
                        sizeof(struct t10_alua_tg_pt_gp),
@@ -146,7 +149,7 @@ int init_se_kmem_caches(void)
        if (!t10_alua_tg_pt_gp_cache) {
                pr_err("kmem_cache_create() for t10_alua_tg_pt_gp_"
                                "cache failed\n");
-               goto out;
+               goto out_free_lu_gp_mem_cache;
        }
        t10_alua_tg_pt_gp_mem_cache = kmem_cache_create(
                        "t10_alua_tg_pt_gp_mem_cache",
@@ -156,34 +159,41 @@ int init_se_kmem_caches(void)
        if (!t10_alua_tg_pt_gp_mem_cache) {
                pr_err("kmem_cache_create() for t10_alua_tg_pt_gp_"
                                "mem_t failed\n");
-               goto out;
+               goto out_free_tg_pt_gp_cache;
        }
 
+       target_completion_wq = alloc_workqueue("target_completion",
+                                              WQ_MEM_RECLAIM, 0);
+       if (!target_completion_wq)
+               goto out_free_tg_pt_gp_mem_cache;
+
        return 0;
+
+out_free_tg_pt_gp_mem_cache:
+       kmem_cache_destroy(t10_alua_tg_pt_gp_mem_cache);
+out_free_tg_pt_gp_cache:
+       kmem_cache_destroy(t10_alua_tg_pt_gp_cache);
+out_free_lu_gp_mem_cache:
+       kmem_cache_destroy(t10_alua_lu_gp_mem_cache);
+out_free_lu_gp_cache:
+       kmem_cache_destroy(t10_alua_lu_gp_cache);
+out_free_pr_reg_cache:
+       kmem_cache_destroy(t10_pr_reg_cache);
+out_free_ua_cache:
+       kmem_cache_destroy(se_ua_cache);
+out_free_sess_cache:
+       kmem_cache_destroy(se_sess_cache);
+out_free_tmr_req_cache:
+       kmem_cache_destroy(se_tmr_req_cache);
+out_free_cmd_cache:
+       kmem_cache_destroy(se_cmd_cache);
 out:
-       if (se_cmd_cache)
-               kmem_cache_destroy(se_cmd_cache);
-       if (se_tmr_req_cache)
-               kmem_cache_destroy(se_tmr_req_cache);
-       if (se_sess_cache)
-               kmem_cache_destroy(se_sess_cache);
-       if (se_ua_cache)
-               kmem_cache_destroy(se_ua_cache);
-       if (t10_pr_reg_cache)
-               kmem_cache_destroy(t10_pr_reg_cache);
-       if (t10_alua_lu_gp_cache)
-               kmem_cache_destroy(t10_alua_lu_gp_cache);
-       if (t10_alua_lu_gp_mem_cache)
-               kmem_cache_destroy(t10_alua_lu_gp_mem_cache);
-       if (t10_alua_tg_pt_gp_cache)
-               kmem_cache_destroy(t10_alua_tg_pt_gp_cache);
-       if (t10_alua_tg_pt_gp_mem_cache)
-               kmem_cache_destroy(t10_alua_tg_pt_gp_mem_cache);
        return -ENOMEM;
 }
 
 void release_se_kmem_caches(void)
 {
+       destroy_workqueue(target_completion_wq);
        kmem_cache_destroy(se_cmd_cache);
        kmem_cache_destroy(se_tmr_req_cache);
        kmem_cache_destroy(se_sess_cache);
@@ -689,6 +699,33 @@ void transport_complete_sync_cache(struct se_cmd *cmd, int good)
 }
 EXPORT_SYMBOL(transport_complete_sync_cache);
 
+static void target_complete_timeout_work(struct work_struct *work)
+{
+       struct se_cmd *cmd = container_of(work, struct se_cmd, work);
+       unsigned long flags;
+
+       /*
+        * Reset cmd->t_se_count to allow transport_put_cmd()
+        * to allow last call to free memory resources.
+        */
+       spin_lock_irqsave(&cmd->t_state_lock, flags);
+       if (atomic_read(&cmd->t_transport_timeout) > 1) {
+               int tmp = (atomic_read(&cmd->t_transport_timeout) - 1);
+
+               atomic_sub(tmp, &cmd->t_se_count);
+       }
+       spin_unlock_irqrestore(&cmd->t_state_lock, flags);
+
+       transport_put_cmd(cmd);
+}
+
+static void target_complete_failure_work(struct work_struct *work)
+{
+       struct se_cmd *cmd = container_of(work, struct se_cmd, work);
+
+       transport_generic_request_failure(cmd, 1, 1);
+}
+
 /*     transport_complete_task():
  *
  *     Called from interrupt and non interrupt context depending
@@ -698,7 +735,6 @@ void transport_complete_task(struct se_task *task, int success)
 {
        struct se_cmd *cmd = task->task_se_cmd;
        struct se_device *dev = cmd->se_dev;
-       int t_state;
        unsigned long flags;
 #if 0
        pr_debug("task: %p CDB: 0x%02x obj_ptr: %p\n", task,
@@ -749,17 +785,12 @@ void transport_complete_task(struct se_task *task, int success)
         * the processing thread.
         */
        if (task->task_flags & TF_TIMEOUT) {
-               if (!atomic_dec_and_test(
-                               &cmd->t_task_cdbs_timeout_left)) {
-                       spin_unlock_irqrestore(&cmd->t_state_lock,
-                               flags);
+               if (!atomic_dec_and_test(&cmd->t_task_cdbs_timeout_left)) {
+                       spin_unlock_irqrestore(&cmd->t_state_lock, flags);
                        return;
                }
-               t_state = TRANSPORT_COMPLETE_TIMEOUT;
-               spin_unlock_irqrestore(&cmd->t_state_lock, flags);
-
-               transport_add_cmd_to_queue(cmd, t_state, false);
-               return;
+               INIT_WORK(&cmd->work, target_complete_timeout_work);
+               goto out_queue;
        }
        atomic_dec(&cmd->t_task_cdbs_timeout_left);
 
@@ -769,28 +800,29 @@ void transport_complete_task(struct se_task *task, int success)
         * device queue depending upon int success.
         */
        if (!atomic_dec_and_test(&cmd->t_task_cdbs_left)) {
-               if (!success)
-                       cmd->t_tasks_failed = 1;
-
                spin_unlock_irqrestore(&cmd->t_state_lock, flags);
                return;
        }
 
        if (!success || cmd->t_tasks_failed) {
-               t_state = TRANSPORT_COMPLETE_FAILURE;
                if (!task->task_error_status) {
                        task->task_error_status =
                                PYX_TRANSPORT_UNKNOWN_SAM_OPCODE;
                        cmd->transport_error_status =
                                PYX_TRANSPORT_UNKNOWN_SAM_OPCODE;
                }
+               INIT_WORK(&cmd->work, target_complete_failure_work);
        } else {
                atomic_set(&cmd->t_transport_complete, 1);
-               t_state = TRANSPORT_COMPLETE_OK;
+               INIT_WORK(&cmd->work, target_complete_ok_work);
        }
+
+out_queue:
+       cmd->t_state = TRANSPORT_COMPLETE;
+       atomic_set(&cmd->t_transport_active, 1);
        spin_unlock_irqrestore(&cmd->t_state_lock, flags);
 
-       transport_add_cmd_to_queue(cmd, t_state, false);
+       queue_work(target_completion_wq, &cmd->work);
 }
 EXPORT_SYMBOL(transport_complete_task);
 
@@ -1642,8 +1674,6 @@ int transport_generic_allocate_tasks(
 }
 EXPORT_SYMBOL(transport_generic_allocate_tasks);
 
-static void transport_generic_request_failure(struct se_cmd *, int, int);
-
 /*
  * Used by fabric module frontends to queue tasks directly.
  * Many only be used from process context only
@@ -1985,25 +2015,6 @@ static void transport_direct_request_timeout(struct se_cmd *cmd)
        spin_unlock_irqrestore(&cmd->t_state_lock, flags);
 }
 
-static void transport_generic_request_timeout(struct se_cmd *cmd)
-{
-       unsigned long flags;
-
-       /*
-        * Reset cmd->t_se_count to allow transport_put_cmd()
-        * to allow last call to free memory resources.
-        */
-       spin_lock_irqsave(&cmd->t_state_lock, flags);
-       if (atomic_read(&cmd->t_transport_timeout) > 1) {
-               int tmp = (atomic_read(&cmd->t_transport_timeout) - 1);
-
-               atomic_sub(tmp, &cmd->t_se_count);
-       }
-       spin_unlock_irqrestore(&cmd->t_state_lock, flags);
-
-       transport_put_cmd(cmd);
-}
-
 static inline u32 transport_lba_21(unsigned char *cdb)
 {
        return ((cdb[1] & 0x1f) << 16) | (cdb[2] << 8) | cdb[3];
@@ -2094,10 +2105,12 @@ static void transport_task_timeout_handler(unsigned long data)
        pr_debug("transport task: %p cmd: %p timeout ZERO t_task_cdbs_left\n",
                        task, cmd);
 
-       cmd->t_state = TRANSPORT_COMPLETE_FAILURE;
+       INIT_WORK(&cmd->work, target_complete_failure_work);
+       cmd->t_state = TRANSPORT_COMPLETE;
+       atomic_set(&cmd->t_transport_active, 1);
        spin_unlock_irqrestore(&cmd->t_state_lock, flags);
 
-       transport_add_cmd_to_queue(cmd, TRANSPORT_COMPLETE_FAILURE, false);
+       queue_work(target_completion_wq, &cmd->work);
 }
 
 static void transport_start_task_timer(struct se_task *task)
@@ -2879,7 +2892,7 @@ static int transport_generic_cmd_sequencer(
                if (passthrough)
                        break;
                /*
-                * Setup BIDI XOR callback to be run during transport_generic_complete_ok()
+                * Setup BIDI XOR callback to be run after I/O completion.
                 */
                cmd->transport_complete_callback = &transport_xor_callback;
                cmd->t_tasks_fua = (cdb[1] & 0x8);
@@ -2913,8 +2926,8 @@ static int transport_generic_cmd_sequencer(
                                break;
 
                        /*
-                        * Setup BIDI XOR callback to be run during
-                        * transport_generic_complete_ok()
+                        * Setup BIDI XOR callback to be run during after I/O
+                        * completion.
                         */
                        cmd->transport_complete_callback = &transport_xor_callback;
                        cmd->t_tasks_fua = (cdb[10] & 0x8);
@@ -3311,8 +3324,7 @@ out_invalid_cdb_field:
 }
 
 /*
- * Called from transport_generic_complete_ok() and
- * transport_generic_request_failure() to determine which dormant/delayed
+ * Called from I/O completion to determine which dormant/delayed
  * and ordered cmds need to have their tasks added to the execution queue.
  */
 static void transport_complete_task_attr(struct se_cmd *cmd)
@@ -3433,9 +3445,11 @@ static void transport_handle_queue_full(
        schedule_work(&cmd->se_dev->qf_work_queue);
 }
 
-static void transport_generic_complete_ok(struct se_cmd *cmd)
+static void target_complete_ok_work(struct work_struct *work)
 {
+       struct se_cmd *cmd = container_of(work, struct se_cmd, work);
        int reason = 0, ret;
+
        /*
         * Check if we need to move delayed/dormant tasks from cmds on the
         * delayed execution list after a HEAD_OF_QUEUE or ORDERED Task
@@ -4778,21 +4792,12 @@ get_cmd:
                case TRANSPORT_PROCESS_WRITE:
                        transport_generic_process_write(cmd);
                        break;
-               case TRANSPORT_COMPLETE_OK:
-                       transport_generic_complete_ok(cmd);
-                       break;
                case TRANSPORT_FREE_CMD_INTR:
                        transport_generic_free_cmd(cmd, 0);
                        break;
                case TRANSPORT_PROCESS_TMR:
                        transport_generic_do_tmr(cmd);
                        break;
-               case TRANSPORT_COMPLETE_FAILURE:
-                       transport_generic_request_failure(cmd, 1, 1);
-                       break;
-               case TRANSPORT_COMPLETE_TIMEOUT:
-                       transport_generic_request_timeout(cmd);
-                       break;
                case TRANSPORT_COMPLETE_QF_WP:
                        transport_write_pending_qf(cmd);
                        break;
index 07104bf..8e2c83d 100644 (file)
@@ -86,9 +86,7 @@ enum transport_state_table {
        TRANSPORT_WRITE_PENDING = 3,
        TRANSPORT_PROCESS_WRITE = 4,
        TRANSPORT_PROCESSING    = 5,
-       TRANSPORT_COMPLETE_OK   = 6,
-       TRANSPORT_COMPLETE_FAILURE = 7,
-       TRANSPORT_COMPLETE_TIMEOUT = 8,
+       TRANSPORT_COMPLETE      = 6,
        TRANSPORT_PROCESS_TMR   = 9,
        TRANSPORT_ISTATE_PROCESSING = 11,
        TRANSPORT_NEW_CMD_MAP   = 16,
@@ -492,6 +490,8 @@ struct se_cmd {
        struct completion       transport_lun_stop_comp;
        struct scatterlist      *t_tasks_sg_chained;
 
+       struct work_struct      work;
+
        /*
         * Used for pre-registered fabric SGL passthrough WRITE and READ
         * with the special SCF_PASSTHROUGH_CONTIG_TO_SG case for TCM_Loop