RDS: use locking on the connection hash list
[linux-2.6.git] / net / rds / connection.c
index b420a20..87df15b 100644 (file)
  */
 #include <linux/kernel.h>
 #include <linux/list.h>
+#include <linux/slab.h>
 #include <net/inet_hashtables.h>
 
 #include "rds.h"
 #include "loop.h"
-#include "rdma.h"
 
 #define RDS_CONNECTION_HASH_BITS 12
 #define RDS_CONNECTION_HASH_ENTRIES (1 << RDS_CONNECTION_HASH_BITS)
@@ -62,18 +62,6 @@ static struct hlist_head *rds_conn_bucket(__be32 laddr, __be32 faddr)
                var |= RDS_INFO_CONNECTION_FLAG_##suffix;       \
 } while (0)
 
-static inline int rds_conn_is_sending(struct rds_connection *conn)
-{
-       int ret = 0;
-
-       if (!mutex_trylock(&conn->c_send_lock))
-               ret = 1;
-       else
-               mutex_unlock(&conn->c_send_lock);
-
-       return ret;
-}
-
 static struct rds_connection *rds_conn_lookup(struct hlist_head *head,
                                              __be32 laddr, __be32 faddr,
                                              struct rds_transport *trans)
@@ -133,10 +121,8 @@ static struct rds_connection *__rds_conn_create(__be32 laddr, __be32 faddr,
 
        spin_lock_irqsave(&rds_conn_lock, flags);
        conn = rds_conn_lookup(head, laddr, faddr, trans);
-       if (conn
-        && conn->c_loopback
-        && conn->c_trans != &rds_loop_transport
-        && !is_outgoing) {
+       if (conn && conn->c_loopback && conn->c_trans != &rds_loop_transport &&
+           !is_outgoing) {
                /* This is a looped back IB connection, and we're
                 * called by the code handling the incoming connect.
                 * We need a second connection object into which we
@@ -149,7 +135,7 @@ static struct rds_connection *__rds_conn_create(__be32 laddr, __be32 faddr,
                goto out;
 
        conn = kmem_cache_zalloc(rds_conn_slab, gfp);
-       if (conn == NULL) {
+       if (!conn) {
                conn = ERR_PTR(-ENOMEM);
                goto out;
        }
@@ -160,7 +146,9 @@ static struct rds_connection *__rds_conn_create(__be32 laddr, __be32 faddr,
        spin_lock_init(&conn->c_lock);
        conn->c_next_tx_seq = 1;
 
-       mutex_init(&conn->c_send_lock);
+       spin_lock_init(&conn->c_send_lock);
+       atomic_set(&conn->c_send_generation, 1);
+       atomic_set(&conn->c_senders, 0);
        INIT_LIST_HEAD(&conn->c_send_queue);
        INIT_LIST_HEAD(&conn->c_retrans);
 
@@ -255,13 +243,76 @@ struct rds_connection *rds_conn_create(__be32 laddr, __be32 faddr,
 {
        return __rds_conn_create(laddr, faddr, trans, gfp, 0);
 }
+EXPORT_SYMBOL_GPL(rds_conn_create);
 
 struct rds_connection *rds_conn_create_outgoing(__be32 laddr, __be32 faddr,
                                       struct rds_transport *trans, gfp_t gfp)
 {
        return __rds_conn_create(laddr, faddr, trans, gfp, 1);
 }
+EXPORT_SYMBOL_GPL(rds_conn_create_outgoing);
 
+void rds_conn_shutdown(struct rds_connection *conn)
+{
+       /* shut it down unless it's down already */
+       if (!rds_conn_transition(conn, RDS_CONN_DOWN, RDS_CONN_DOWN)) {
+               /*
+                * Quiesce the connection mgmt handlers before we start tearing
+                * things down. We don't hold the mutex for the entire
+                * duration of the shutdown operation, else we may be
+                * deadlocking with the CM handler. Instead, the CM event
+                * handler is supposed to check for state DISCONNECTING
+                */
+               mutex_lock(&conn->c_cm_lock);
+               if (!rds_conn_transition(conn, RDS_CONN_UP, RDS_CONN_DISCONNECTING)
+                && !rds_conn_transition(conn, RDS_CONN_ERROR, RDS_CONN_DISCONNECTING)) {
+                       rds_conn_error(conn, "shutdown called in state %d\n",
+                                       atomic_read(&conn->c_state));
+                       mutex_unlock(&conn->c_cm_lock);
+                       return;
+               }
+               mutex_unlock(&conn->c_cm_lock);
+
+               /* verify everybody's out of rds_send_xmit() */
+               spin_lock_irq(&conn->c_send_lock);
+               spin_unlock_irq(&conn->c_send_lock);
+
+               while(atomic_read(&conn->c_senders)) {
+                       schedule_timeout(1);
+                       spin_lock_irq(&conn->c_send_lock);
+                       spin_unlock_irq(&conn->c_send_lock);
+               }
+
+               conn->c_trans->conn_shutdown(conn);
+               rds_conn_reset(conn);
+
+               if (!rds_conn_transition(conn, RDS_CONN_DISCONNECTING, RDS_CONN_DOWN)) {
+                       /* This can happen - eg when we're in the middle of tearing
+                        * down the connection, and someone unloads the rds module.
+                        * Quite reproduceable with loopback connections.
+                        * Mostly harmless.
+                        */
+                       rds_conn_error(conn,
+                               "%s: failed to transition to state DOWN, "
+                               "current state is %d\n",
+                               __func__,
+                               atomic_read(&conn->c_state));
+                       return;
+               }
+       }
+
+       /* Then reconnect if it's still live.
+        * The passive side of an IB loopback connection is never added
+        * to the conn hash, so we never trigger a reconnect on this
+        * conn - the reconnect is always triggered by the active peer. */
+       cancel_delayed_work_sync(&conn->c_conn_w);
+       if (!hlist_unhashed(&conn->c_hash_node))
+               rds_queue_reconnect(conn);
+}
+
+/*
+ * Stop and free a connection.
+ */
 void rds_conn_destroy(struct rds_connection *conn)
 {
        struct rds_message *rm, *rtmp;
@@ -270,7 +321,10 @@ void rds_conn_destroy(struct rds_connection *conn)
                 "%pI4\n", conn, &conn->c_laddr,
                 &conn->c_faddr);
 
+       /* Ensure conn will not be scheduled for reconnect */
+       spin_lock_irq(&rds_conn_lock);
        hlist_del_init(&conn->c_hash_node);
+       spin_unlock_irq(&rds_conn_lock);
 
        /* wait for the rds thread to shut it down */
        atomic_set(&conn->c_state, RDS_CONN_ERROR);
@@ -303,6 +357,7 @@ void rds_conn_destroy(struct rds_connection *conn)
 
        rds_conn_count--;
 }
+EXPORT_SYMBOL_GPL(rds_conn_destroy);
 
 static void rds_conn_message_info(struct socket *sock, unsigned int len,
                                  struct rds_info_iterator *iter,
@@ -406,6 +461,7 @@ void rds_for_each_conn_info(struct socket *sock, unsigned int len,
 
        spin_unlock_irqrestore(&rds_conn_lock, flags);
 }
+EXPORT_SYMBOL_GPL(rds_for_each_conn_info);
 
 static int rds_conn_info_visitor(struct rds_connection *conn,
                                  void *buffer)
@@ -421,7 +477,7 @@ static int rds_conn_info_visitor(struct rds_connection *conn,
        cinfo->flags = 0;
 
        rds_conn_info_set(cinfo->flags,
-                         rds_conn_is_sending(conn), SENDING);
+                         spin_is_locked(&conn->c_send_lock), SENDING);
        /* XXX Future: return the state rather than these funky bits */
        rds_conn_info_set(cinfo->flags,
                          atomic_read(&conn->c_state) == RDS_CONN_CONNECTING,
@@ -446,7 +502,7 @@ int __init rds_conn_init(void)
        rds_conn_slab = kmem_cache_create("rds_connection",
                                          sizeof(struct rds_connection),
                                          0, 0, NULL);
-       if (rds_conn_slab == NULL)
+       if (!rds_conn_slab)
                return -ENOMEM;
 
        rds_info_register_func(RDS_INFO_CONNECTIONS, rds_conn_info);
@@ -481,6 +537,7 @@ void rds_conn_drop(struct rds_connection *conn)
        atomic_set(&conn->c_state, RDS_CONN_ERROR);
        queue_work(rds_wq, &conn->c_down_w);
 }
+EXPORT_SYMBOL_GPL(rds_conn_drop);
 
 /*
  * An error occurred on the connection