Merge commit 'v3.4.4' into android-tegra-nv-3.4
[linux-2.6.git] / fs / ocfs2 / cluster / tcp.c
index 0f60cc0..044e7b5 100644 (file)
@@ -58,6 +58,8 @@
 #include <linux/slab.h>
 #include <linux/idr.h>
 #include <linux/kref.h>
+#include <linux/net.h>
+#include <linux/export.h>
 #include <net/tcp.h>
 
 #include <asm/uaccess.h>
 
 #include "tcp_internal.h"
 
-/* 
- * The linux network stack isn't sparse endian clean.. It has macros like
- * ntohs() which perform the endian checks and structs like sockaddr_in
- * which aren't annotated.  So __force is found here to get the build
- * clean.  When they emerge from the dark ages and annotate the code
- * we can remove these.
- */
-
-#define SC_NODEF_FMT "node %s (num %u) at %u.%u.%u.%u:%u"
+#define SC_NODEF_FMT "node %s (num %u) at %pI4:%u"
 #define SC_NODEF_ARGS(sc) sc->sc_node->nd_name, sc->sc_node->nd_num,   \
-                         NIPQUAD(sc->sc_node->nd_ipv4_address),        \
+                         &sc->sc_node->nd_ipv4_address,                \
                          ntohs(sc->sc_node->nd_ipv4_port)
 
 /*
            ##args);                                                    \
 } while (0)
 
-static rwlock_t o2net_handler_lock = RW_LOCK_UNLOCKED;
+static DEFINE_RWLOCK(o2net_handler_lock);
 static struct rb_root o2net_handler_tree = RB_ROOT;
 
 static struct o2net_node o2net_nodes[O2NM_MAX_NODES];
@@ -140,13 +134,148 @@ static int o2net_sys_err_translations[O2NET_ERR_MAX] =
                 [O2NET_ERR_DIED]       = -EHOSTDOWN,};
 
 /* can't quite avoid *all* internal declarations :/ */
-static void o2net_sc_connect_completed(void *arg);
-static void o2net_rx_until_empty(void *arg);
-static void o2net_shutdown_sc(void *arg);
+static void o2net_sc_connect_completed(struct work_struct *work);
+static void o2net_rx_until_empty(struct work_struct *work);
+static void o2net_shutdown_sc(struct work_struct *work);
 static void o2net_listen_data_ready(struct sock *sk, int bytes);
-static void o2net_sc_send_keep_req(void *arg);
+static void o2net_sc_send_keep_req(struct work_struct *work);
 static void o2net_idle_timer(unsigned long data);
 static void o2net_sc_postpone_idle(struct o2net_sock_container *sc);
+static void o2net_sc_reset_idle_timer(struct o2net_sock_container *sc);
+
+#ifdef CONFIG_DEBUG_FS
+static void o2net_init_nst(struct o2net_send_tracking *nst, u32 msgtype,
+                          u32 msgkey, struct task_struct *task, u8 node)
+{
+       INIT_LIST_HEAD(&nst->st_net_debug_item);
+       nst->st_task = task;
+       nst->st_msg_type = msgtype;
+       nst->st_msg_key = msgkey;
+       nst->st_node = node;
+}
+
+static inline void o2net_set_nst_sock_time(struct o2net_send_tracking *nst)
+{
+       nst->st_sock_time = ktime_get();
+}
+
+static inline void o2net_set_nst_send_time(struct o2net_send_tracking *nst)
+{
+       nst->st_send_time = ktime_get();
+}
+
+static inline void o2net_set_nst_status_time(struct o2net_send_tracking *nst)
+{
+       nst->st_status_time = ktime_get();
+}
+
+static inline void o2net_set_nst_sock_container(struct o2net_send_tracking *nst,
+                                               struct o2net_sock_container *sc)
+{
+       nst->st_sc = sc;
+}
+
+static inline void o2net_set_nst_msg_id(struct o2net_send_tracking *nst,
+                                       u32 msg_id)
+{
+       nst->st_id = msg_id;
+}
+
+static inline void o2net_set_sock_timer(struct o2net_sock_container *sc)
+{
+       sc->sc_tv_timer = ktime_get();
+}
+
+static inline void o2net_set_data_ready_time(struct o2net_sock_container *sc)
+{
+       sc->sc_tv_data_ready = ktime_get();
+}
+
+static inline void o2net_set_advance_start_time(struct o2net_sock_container *sc)
+{
+       sc->sc_tv_advance_start = ktime_get();
+}
+
+static inline void o2net_set_advance_stop_time(struct o2net_sock_container *sc)
+{
+       sc->sc_tv_advance_stop = ktime_get();
+}
+
+static inline void o2net_set_func_start_time(struct o2net_sock_container *sc)
+{
+       sc->sc_tv_func_start = ktime_get();
+}
+
+static inline void o2net_set_func_stop_time(struct o2net_sock_container *sc)
+{
+       sc->sc_tv_func_stop = ktime_get();
+}
+
+#else  /* CONFIG_DEBUG_FS */
+# define o2net_init_nst(a, b, c, d, e)
+# define o2net_set_nst_sock_time(a)
+# define o2net_set_nst_send_time(a)
+# define o2net_set_nst_status_time(a)
+# define o2net_set_nst_sock_container(a, b)
+# define o2net_set_nst_msg_id(a, b)
+# define o2net_set_sock_timer(a)
+# define o2net_set_data_ready_time(a)
+# define o2net_set_advance_start_time(a)
+# define o2net_set_advance_stop_time(a)
+# define o2net_set_func_start_time(a)
+# define o2net_set_func_stop_time(a)
+#endif /* CONFIG_DEBUG_FS */
+
+#ifdef CONFIG_OCFS2_FS_STATS
+static ktime_t o2net_get_func_run_time(struct o2net_sock_container *sc)
+{
+       return ktime_sub(sc->sc_tv_func_stop, sc->sc_tv_func_start);
+}
+
+static void o2net_update_send_stats(struct o2net_send_tracking *nst,
+                                   struct o2net_sock_container *sc)
+{
+       sc->sc_tv_status_total = ktime_add(sc->sc_tv_status_total,
+                                          ktime_sub(ktime_get(),
+                                                    nst->st_status_time));
+       sc->sc_tv_send_total = ktime_add(sc->sc_tv_send_total,
+                                        ktime_sub(nst->st_status_time,
+                                                  nst->st_send_time));
+       sc->sc_tv_acquiry_total = ktime_add(sc->sc_tv_acquiry_total,
+                                           ktime_sub(nst->st_send_time,
+                                                     nst->st_sock_time));
+       sc->sc_send_count++;
+}
+
+static void o2net_update_recv_stats(struct o2net_sock_container *sc)
+{
+       sc->sc_tv_process_total = ktime_add(sc->sc_tv_process_total,
+                                           o2net_get_func_run_time(sc));
+       sc->sc_recv_count++;
+}
+
+#else
+
+# define o2net_update_send_stats(a, b)
+
+# define o2net_update_recv_stats(sc)
+
+#endif /* CONFIG_OCFS2_FS_STATS */
+
+static inline int o2net_reconnect_delay(void)
+{
+       return o2nm_single_cluster->cl_reconnect_delay_ms;
+}
+
+static inline int o2net_keepalive_delay(void)
+{
+       return o2nm_single_cluster->cl_keepalive_delay_ms;
+}
+
+static inline int o2net_idle_timeout(void)
+{
+       return o2nm_single_cluster->cl_idle_timeout_ms;
+}
 
 static inline int o2net_sys_err_to_errno(enum o2net_system_error err)
 {
@@ -239,14 +368,12 @@ out:
 
 static void o2net_complete_nodes_nsw(struct o2net_node *nn)
 {
-       struct list_head *iter, *tmp;
+       struct o2net_status_wait *nsw, *tmp;
        unsigned int num_kills = 0;
-       struct o2net_status_wait *nsw;
 
        assert_spin_locked(&nn->nn_lock);
 
-       list_for_each_safe(iter, tmp, &nn->nn_status_list) {
-               nsw = list_entry(iter, struct o2net_status_wait, ns_node_item);
+       list_for_each_entry_safe(nsw, tmp, &nn->nn_status_list, ns_node_item) {
                o2net_complete_nsw_locked(nn, nsw, O2NET_ERR_DIED, 0);
                num_kills++;
        }
@@ -271,6 +398,8 @@ static void sc_kref_release(struct kref *kref)
 {
        struct o2net_sock_container *sc = container_of(kref,
                                        struct o2net_sock_container, sc_kref);
+       BUG_ON(timer_pending(&sc->sc_idle_timeout));
+
        sclog(sc, "releasing\n");
 
        if (sc->sc_sock) {
@@ -278,9 +407,11 @@ static void sc_kref_release(struct kref *kref)
                sc->sc_sock = NULL;
        }
 
+       o2nm_undepend_item(&sc->sc_node->nd_item);
        o2nm_node_put(sc->sc_node);
        sc->sc_node = NULL;
 
+       o2net_debug_del_sc(sc);
        kfree(sc);
 }
 
@@ -298,9 +429,10 @@ static struct o2net_sock_container *sc_alloc(struct o2nm_node *node)
 {
        struct o2net_sock_container *sc, *ret = NULL;
        struct page *page = NULL;
+       int status = 0;
 
        page = alloc_page(GFP_NOFS);
-       sc = kcalloc(1, sizeof(*sc), GFP_NOFS);
+       sc = kzalloc(sizeof(*sc), GFP_NOFS);
        if (sc == NULL || page == NULL)
                goto out;
 
@@ -308,10 +440,17 @@ static struct o2net_sock_container *sc_alloc(struct o2nm_node *node)
        o2nm_node_get(node);
        sc->sc_node = node;
 
-       INIT_WORK(&sc->sc_connect_work, o2net_sc_connect_completed, sc);
-       INIT_WORK(&sc->sc_rx_work, o2net_rx_until_empty, sc);
-       INIT_WORK(&sc->sc_shutdown_work, o2net_shutdown_sc, sc);
-       INIT_WORK(&sc->sc_keepalive_work, o2net_sc_send_keep_req, sc);
+       /* pin the node item of the remote node */
+       status = o2nm_depend_item(&node->nd_item);
+       if (status) {
+               mlog_errno(status);
+               o2nm_node_put(node);
+               goto out;
+       }
+       INIT_WORK(&sc->sc_connect_work, o2net_sc_connect_completed);
+       INIT_WORK(&sc->sc_rx_work, o2net_rx_until_empty);
+       INIT_WORK(&sc->sc_shutdown_work, o2net_shutdown_sc);
+       INIT_DELAYED_WORK(&sc->sc_keepalive_work, o2net_sc_send_keep_req);
 
        init_timer(&sc->sc_idle_timeout);
        sc->sc_idle_timeout.function = o2net_idle_timer;
@@ -321,6 +460,7 @@ static struct o2net_sock_container *sc_alloc(struct o2nm_node *node)
 
        ret = sc;
        sc->sc_page = page;
+       o2net_debug_add_sc(sc);
        sc = NULL;
        page = NULL;
 
@@ -342,7 +482,7 @@ static void o2net_sc_queue_work(struct o2net_sock_container *sc,
                sc_put(sc);
 }
 static void o2net_sc_queue_delayed_work(struct o2net_sock_container *sc,
-                                       struct work_struct *work,
+                                       struct delayed_work *work,
                                        int delay)
 {
        sc_get(sc);
@@ -350,12 +490,19 @@ static void o2net_sc_queue_delayed_work(struct o2net_sock_container *sc,
                sc_put(sc);
 }
 static void o2net_sc_cancel_delayed_work(struct o2net_sock_container *sc,
-                                        struct work_struct *work)
+                                        struct delayed_work *work)
 {
        if (cancel_delayed_work(work))
                sc_put(sc);
 }
 
+static atomic_t o2net_connected_peers = ATOMIC_INIT(0);
+
+int o2net_num_connected_peers(void)
+{
+       return atomic_read(&o2net_connected_peers);
+}
+
 static void o2net_set_nn_state(struct o2net_node *nn,
                               struct o2net_sock_container *sc,
                               unsigned valid, int err)
@@ -366,14 +513,17 @@ static void o2net_set_nn_state(struct o2net_node *nn,
 
        assert_spin_locked(&nn->nn_lock);
 
+       if (old_sc && !sc)
+               atomic_dec(&o2net_connected_peers);
+       else if (!old_sc && sc)
+               atomic_inc(&o2net_connected_peers);
+
        /* the node num comparison and single connect/accept path should stop
         * an non-null sc from being overwritten with another */
        BUG_ON(sc && nn->nn_sc && nn->nn_sc != sc);
        mlog_bug_on_msg(err && valid, "err %d valid %u\n", err, valid);
        mlog_bug_on_msg(valid && !sc, "valid %u sc %p\n", valid, sc);
 
-       /* we won't reconnect after our valid conn goes away for
-        * this hb iteration.. here so it shows up in the logs */
        if (was_valid && !valid && err == 0)
                err = -ENOTCONN;
 
@@ -396,23 +546,18 @@ static void o2net_set_nn_state(struct o2net_node *nn,
        }
 
        if (was_valid && !valid) {
-               mlog(ML_NOTICE, "no longer connected to " SC_NODEF_FMT "\n",
-                    SC_NODEF_ARGS(old_sc));
+               printk(KERN_NOTICE "o2net: No longer connected to "
+                      SC_NODEF_FMT "\n", SC_NODEF_ARGS(old_sc));
                o2net_complete_nodes_nsw(nn);
        }
 
        if (!was_valid && valid) {
                o2quo_conn_up(o2net_num_from_nn(nn));
-               /* this is a bit of a hack.  we only try reconnecting
-                * when heartbeating starts until we get a connection.
-                * if that connection then dies we don't try reconnecting.
-                * the only way to start connecting again is to down
-                * heartbeat and bring it back up. */
                cancel_delayed_work(&nn->nn_connect_expired);
-               mlog(ML_NOTICE, "%s " SC_NODEF_FMT "\n", 
-                    o2nm_this_node() > sc->sc_node->nd_num ?
-                       "connected to" : "accepted connection from",
-                    SC_NODEF_ARGS(sc));
+               printk(KERN_NOTICE "o2net: %s " SC_NODEF_FMT "\n",
+                      o2nm_this_node() > sc->sc_node->nd_num ?
+                      "Connected to" : "Accepted connection from",
+                      SC_NODEF_ARGS(sc));
        }
 
        /* trigger the connecting worker func as long as we're not valid,
@@ -421,15 +566,27 @@ static void o2net_set_nn_state(struct o2net_node *nn,
         * the work queue actually being up. */
        if (!valid && o2net_wq) {
                unsigned long delay;
-               /* delay if we're withing a RECONNECT_DELAY of the
+               /* delay if we're within a RECONNECT_DELAY of the
                 * last attempt */
                delay = (nn->nn_last_connect_attempt +
-                        msecs_to_jiffies(O2NET_RECONNECT_DELAY_MS))
+                        msecs_to_jiffies(o2net_reconnect_delay()))
                        - jiffies;
-               if (delay > msecs_to_jiffies(O2NET_RECONNECT_DELAY_MS))
+               if (delay > msecs_to_jiffies(o2net_reconnect_delay()))
                        delay = 0;
                mlog(ML_CONN, "queueing conn attempt in %lu jiffies\n", delay);
                queue_delayed_work(o2net_wq, &nn->nn_connect_work, delay);
+
+               /*
+                * Delay the expired work after idle timeout.
+                *
+                * We might have lots of failed connection attempts that run
+                * through here but we only cancel the connect_expired work when
+                * a connection attempt succeeds.  So only the first enqueue of
+                * the connect_expired work will do anything.  The rest will see
+                * that it's already queued and do nothing.
+                */
+               delay += msecs_to_jiffies(o2net_idle_timeout());
+               queue_delayed_work(o2net_wq, &nn->nn_connect_expired, delay);
        }
 
        /* keep track of the nn's sc ref for the caller */
@@ -450,7 +607,7 @@ static void o2net_data_ready(struct sock *sk, int bytes)
        if (sk->sk_user_data) {
                struct o2net_sock_container *sc = sk->sk_user_data;
                sclog(sc, "data_ready hit\n");
-               do_gettimeofday(&sc->sc_tv_data_ready);
+               o2net_set_data_ready_time(sc);
                o2net_sc_queue_work(sc, &sc->sc_rx_work);
                ready = sc->sc_data_ready;
        } else {
@@ -487,6 +644,9 @@ static void o2net_state_change(struct sock *sk)
                        o2net_sc_queue_work(sc, &sc->sc_connect_work);
                        break;
                default:
+                       printk(KERN_INFO "o2net: Connection to " SC_NODEF_FMT
+                             " shutdown, state %d\n",
+                             SC_NODEF_ARGS(sc), sk->sk_state);
                        o2net_sc_queue_work(sc, &sc->sc_shutdown_work);
                        break;
        }
@@ -520,6 +680,8 @@ static void o2net_register_callbacks(struct sock *sk,
        sk->sk_data_ready = o2net_data_ready;
        sk->sk_state_change = o2net_state_change;
 
+       mutex_init(&sc->sc_send_lock);
+
        write_unlock_bh(&sk->sk_callback_lock);
 }
 
@@ -564,9 +726,11 @@ static void o2net_ensure_shutdown(struct o2net_node *nn,
  * ourselves as state_change couldn't get the nn_lock and call set_nn_state
  * itself.
  */
-static void o2net_shutdown_sc(void *arg)
+static void o2net_shutdown_sc(struct work_struct *work)
 {
-       struct o2net_sock_container *sc = arg;
+       struct o2net_sock_container *sc =
+               container_of(work, struct o2net_sock_container,
+                            sc_shutdown_work);
        struct o2net_node *nn = o2net_nn_from_num(sc->sc_node->nd_num);
 
        sclog(sc, "shutting down\n");
@@ -578,8 +742,7 @@ static void o2net_shutdown_sc(void *arg)
                del_timer_sync(&sc->sc_idle_timeout);
                o2net_sc_cancel_delayed_work(sc, &sc->sc_keepalive_work);
                sc_put(sc);
-               sc->sc_sock->ops->shutdown(sc->sc_sock,
-                                          RCV_SHUTDOWN|SEND_SHUTDOWN);
+               kernel_sock_shutdown(sc->sc_sock, SHUT_RDWR);
        }
 
        /* not fatal so failed connects before the other guy has our
@@ -650,6 +813,7 @@ static void o2net_handler_put(struct o2net_msg_handler *nmh)
  * be given to the handler if their payload is longer than the max. */
 int o2net_register_handler(u32 msg_type, u32 key, u32 max_len,
                           o2net_msg_handler_func *func, void *data,
+                          o2net_post_msg_handler_func *post_func,
                           struct list_head *unreg_list)
 {
        struct o2net_msg_handler *nmh = NULL;
@@ -676,7 +840,7 @@ int o2net_register_handler(u32 msg_type, u32 key, u32 max_len,
                goto out;
        }
 
-               nmh = kcalloc(1, sizeof(struct o2net_msg_handler), GFP_NOFS);
+               nmh = kzalloc(sizeof(struct o2net_msg_handler), GFP_NOFS);
        if (nmh == NULL) {
                ret = -ENOMEM;
                goto out;
@@ -684,6 +848,7 @@ int o2net_register_handler(u32 msg_type, u32 key, u32 max_len,
 
        nmh->nh_func = func;
        nmh->nh_func_data = data;
+       nmh->nh_post_func = post_func;
        nmh->nh_msg_type = msg_type;
        nmh->nh_max_len = max_len;
        nmh->nh_key = key;
@@ -722,13 +887,10 @@ EXPORT_SYMBOL_GPL(o2net_register_handler);
 
 void o2net_unregister_handler_list(struct list_head *list)
 {
-       struct list_head *pos, *n;
-       struct o2net_msg_handler *nmh;
+       struct o2net_msg_handler *nmh, *n;
 
        write_lock(&o2net_handler_lock);
-       list_for_each_safe(pos, n, list) {
-               nmh = list_entry(pos, struct o2net_msg_handler,
-                                nh_unregister_item);
+       list_for_each_entry_safe(nmh, n, list, nh_unregister_item) {
                mlog(ML_TCP, "unregistering handler func %p type %u key %08x\n",
                     nmh->nh_func, nmh->nh_msg_type, nmh->nh_key);
                rb_erase(&nmh->nh_node, &o2net_handler_tree);
@@ -817,15 +979,25 @@ static void o2net_sendpage(struct o2net_sock_container *sc,
        struct o2net_node *nn = o2net_nn_from_num(sc->sc_node->nd_num);
        ssize_t ret;
 
-
-       ret = sc->sc_sock->ops->sendpage(sc->sc_sock,
-                                        virt_to_page(kmalloced_virt),
-                                        (long)kmalloced_virt & ~PAGE_MASK,
-                                        size, MSG_DONTWAIT);
-       if (ret != size) {
-               mlog(ML_ERROR, "sendpage of size %zu to " SC_NODEF_FMT 
+       while (1) {
+               mutex_lock(&sc->sc_send_lock);
+               ret = sc->sc_sock->ops->sendpage(sc->sc_sock,
+                                                virt_to_page(kmalloced_virt),
+                                                (long)kmalloced_virt & ~PAGE_MASK,
+                                                size, MSG_DONTWAIT);
+               mutex_unlock(&sc->sc_send_lock);
+               if (ret == size)
+                       break;
+               if (ret == (ssize_t)-EAGAIN) {
+                       mlog(0, "sendpage of size %zu to " SC_NODEF_FMT
+                            " returned EAGAIN\n", size, SC_NODEF_ARGS(sc));
+                       cond_resched();
+                       continue;
+               }
+               mlog(ML_ERROR, "sendpage of size %zu to " SC_NODEF_FMT
                     " failed with %zd\n", size, SC_NODEF_ARGS(sc), ret);
                o2net_ensure_shutdown(nn, sc, 0);
+               break;
        }
 }
 
@@ -863,10 +1035,29 @@ static int o2net_tx_can_proceed(struct o2net_node *nn,
        return ret;
 }
 
+/* Get a map of all nodes to which this node is currently connected to */
+void o2net_fill_node_map(unsigned long *map, unsigned bytes)
+{
+       struct o2net_sock_container *sc;
+       int node, ret;
+
+       BUG_ON(bytes < (BITS_TO_LONGS(O2NM_MAX_NODES) * sizeof(unsigned long)));
+
+       memset(map, 0, bytes);
+       for (node = 0; node < O2NM_MAX_NODES; ++node) {
+               o2net_tx_can_proceed(o2net_nn_from_num(node), &sc, &ret);
+               if (!ret) {
+                       set_bit(node, map);
+                       sc_put(sc);
+               }
+       }
+}
+EXPORT_SYMBOL_GPL(o2net_fill_node_map);
+
 int o2net_send_message_vec(u32 msg_type, u32 key, struct kvec *caller_vec,
                           size_t caller_veclen, u8 target_node, int *status)
 {
-       int ret, error = 0;
+       int ret = 0;
        struct o2net_msg *msg = NULL;
        size_t veclen, caller_bytes = 0;
        struct kvec *vec = NULL;
@@ -875,6 +1066,9 @@ int o2net_send_message_vec(u32 msg_type, u32 key, struct kvec *caller_vec,
        struct o2net_status_wait nsw = {
                .ns_node_item = LIST_HEAD_INIT(nsw.ns_node_item),
        };
+       struct o2net_send_tracking nst;
+
+       o2net_init_nst(&nst, msg_type, key, current, target_node);
 
        if (o2net_wq == NULL) {
                mlog(0, "attempt to tx without o2netd running\n");
@@ -900,13 +1094,16 @@ int o2net_send_message_vec(u32 msg_type, u32 key, struct kvec *caller_vec,
                goto out;
        }
 
-       ret = wait_event_interruptible(nn->nn_sc_wq,
-                                      o2net_tx_can_proceed(nn, &sc, &error));
-       if (!ret && error)
-               ret = error;
+       o2net_debug_add_nst(&nst);
+
+       o2net_set_nst_sock_time(&nst);
+
+       wait_event(nn->nn_sc_wq, o2net_tx_can_proceed(nn, &sc, &ret));
        if (ret)
                goto out;
 
+       o2net_set_nst_sock_container(&nst, sc);
+
        veclen = caller_veclen + 1;
        vec = kmalloc(sizeof(struct kvec) * veclen, GFP_ATOMIC);
        if (vec == NULL) {
@@ -933,11 +1130,16 @@ int o2net_send_message_vec(u32 msg_type, u32 key, struct kvec *caller_vec,
                goto out;
 
        msg->msg_num = cpu_to_be32(nsw.ns_id);
+       o2net_set_nst_msg_id(&nst, nsw.ns_id);
+
+       o2net_set_nst_send_time(&nst);
 
        /* finally, convert the message header to network byte-order
         * and send */
+       mutex_lock(&sc->sc_send_lock);
        ret = o2net_send_tcp_msg(sc->sc_sock, vec, veclen,
                                 sizeof(struct o2net_msg) + caller_bytes);
+       mutex_unlock(&sc->sc_send_lock);
        msglog(msg, "sending returned %d\n", ret);
        if (ret < 0) {
                mlog(0, "error returned from o2net_send_tcp_msg=%d\n", ret);
@@ -945,8 +1147,11 @@ int o2net_send_message_vec(u32 msg_type, u32 key, struct kvec *caller_vec,
        }
 
        /* wait on other node's handler */
+       o2net_set_nst_status_time(&nst);
        wait_event(nsw.ns_wq, o2net_nsw_completed(nn, &nsw));
 
+       o2net_update_send_stats(&nst, sc);
+
        /* Note that we avoid overwriting the callers status return
         * variable if a system error was reported on the other
         * side. Callers beware. */
@@ -957,6 +1162,7 @@ int o2net_send_message_vec(u32 msg_type, u32 key, struct kvec *caller_vec,
        mlog(0, "woken, returning system status %d, user status %d\n",
             ret, nsw.ns_status);
 out:
+       o2net_debug_del_nst(&nst); /* must be before dropping sc and node */
        if (sc)
                sc_put(sc);
        if (vec)
@@ -1011,6 +1217,7 @@ static int o2net_process_message(struct o2net_sock_container *sc,
        int ret = 0, handler_status;
        enum  o2net_system_error syserr;
        struct o2net_msg_handler *nmh = NULL;
+       void *ret_data = NULL;
 
        msglog(hdr, "processing message\n");
 
@@ -1058,22 +1265,33 @@ static int o2net_process_message(struct o2net_sock_container *sc,
        if (syserr != O2NET_ERR_NONE)
                goto out_respond;
 
-       do_gettimeofday(&sc->sc_tv_func_start);
+       o2net_set_func_start_time(sc);
        sc->sc_msg_key = be32_to_cpu(hdr->key);
        sc->sc_msg_type = be16_to_cpu(hdr->msg_type);
        handler_status = (nmh->nh_func)(hdr, sizeof(struct o2net_msg) +
                                             be16_to_cpu(hdr->data_len),
-                                       nmh->nh_func_data);
-       do_gettimeofday(&sc->sc_tv_func_stop);
+                                       nmh->nh_func_data, &ret_data);
+       o2net_set_func_stop_time(sc);
+
+       o2net_update_recv_stats(sc);
 
 out_respond:
        /* this destroys the hdr, so don't use it after this */
+       mutex_lock(&sc->sc_send_lock);
        ret = o2net_send_status_magic(sc->sc_sock, hdr, syserr,
                                      handler_status);
+       mutex_unlock(&sc->sc_send_lock);
        hdr = NULL;
        mlog(0, "sending handler status %d, syserr %d returned %d\n",
             handler_status, syserr, ret);
 
+       if (nmh) {
+               BUG_ON(ret_data != NULL && nmh->nh_post_func == NULL);
+               if (nmh->nh_post_func)
+                       (nmh->nh_post_func)(handler_status, nmh->nh_func_data,
+                                           ret_data);
+       }
+
 out:
        if (nmh)
                o2net_handler_put(nmh);
@@ -1086,24 +1304,63 @@ static int o2net_check_handshake(struct o2net_sock_container *sc)
        struct o2net_node *nn = o2net_nn_from_num(sc->sc_node->nd_num);
 
        if (hand->protocol_version != cpu_to_be64(O2NET_PROTOCOL_VERSION)) {
-               mlog(ML_NOTICE, SC_NODEF_FMT " advertised net protocol "
-                    "version %llu but %llu is required, disconnecting\n",
-                    SC_NODEF_ARGS(sc),
-                    (unsigned long long)be64_to_cpu(hand->protocol_version),
-                    O2NET_PROTOCOL_VERSION);
+               printk(KERN_NOTICE "o2net: " SC_NODEF_FMT " Advertised net "
+                      "protocol version %llu but %llu is required. "
+                      "Disconnecting.\n", SC_NODEF_ARGS(sc),
+                      (unsigned long long)be64_to_cpu(hand->protocol_version),
+                      O2NET_PROTOCOL_VERSION);
 
                /* don't bother reconnecting if its the wrong version. */
                o2net_ensure_shutdown(nn, sc, -ENOTCONN);
                return -1;
        }
 
+       /*
+        * Ensure timeouts are consistent with other nodes, otherwise
+        * we can end up with one node thinking that the other must be down,
+        * but isn't. This can ultimately cause corruption.
+        */
+       if (be32_to_cpu(hand->o2net_idle_timeout_ms) !=
+                               o2net_idle_timeout()) {
+               printk(KERN_NOTICE "o2net: " SC_NODEF_FMT " uses a network "
+                      "idle timeout of %u ms, but we use %u ms locally. "
+                      "Disconnecting.\n", SC_NODEF_ARGS(sc),
+                      be32_to_cpu(hand->o2net_idle_timeout_ms),
+                      o2net_idle_timeout());
+               o2net_ensure_shutdown(nn, sc, -ENOTCONN);
+               return -1;
+       }
+
+       if (be32_to_cpu(hand->o2net_keepalive_delay_ms) !=
+                       o2net_keepalive_delay()) {
+               printk(KERN_NOTICE "o2net: " SC_NODEF_FMT " uses a keepalive "
+                      "delay of %u ms, but we use %u ms locally. "
+                      "Disconnecting.\n", SC_NODEF_ARGS(sc),
+                      be32_to_cpu(hand->o2net_keepalive_delay_ms),
+                      o2net_keepalive_delay());
+               o2net_ensure_shutdown(nn, sc, -ENOTCONN);
+               return -1;
+       }
+
+       if (be32_to_cpu(hand->o2hb_heartbeat_timeout_ms) !=
+                       O2HB_MAX_WRITE_TIMEOUT_MS) {
+               printk(KERN_NOTICE "o2net: " SC_NODEF_FMT " uses a heartbeat "
+                      "timeout of %u ms, but we use %u ms locally. "
+                      "Disconnecting.\n", SC_NODEF_ARGS(sc),
+                      be32_to_cpu(hand->o2hb_heartbeat_timeout_ms),
+                      O2HB_MAX_WRITE_TIMEOUT_MS);
+               o2net_ensure_shutdown(nn, sc, -ENOTCONN);
+               return -1;
+       }
+
        sc->sc_handshake_ok = 1;
 
        spin_lock(&nn->nn_lock);
        /* set valid and queue the idle timers only if it hasn't been
         * shut down already */
        if (nn->nn_sc == sc) {
-               o2net_sc_postpone_idle(sc);
+               o2net_sc_reset_idle_timer(sc);
+               atomic_set(&nn->nn_timeout, 0);
                o2net_set_nn_state(nn, sc, 1, 0);
        }
        spin_unlock(&nn->nn_lock);
@@ -1127,7 +1384,24 @@ static int o2net_advance_rx(struct o2net_sock_container *sc)
        size_t datalen;
 
        sclog(sc, "receiving\n");
-       do_gettimeofday(&sc->sc_tv_advance_start);
+       o2net_set_advance_start_time(sc);
+
+       if (unlikely(sc->sc_handshake_ok == 0)) {
+               if(sc->sc_page_off < sizeof(struct o2net_handshake)) {
+                       data = page_address(sc->sc_page) + sc->sc_page_off;
+                       datalen = sizeof(struct o2net_handshake) - sc->sc_page_off;
+                       ret = o2net_recv_tcp_msg(sc->sc_sock, data, datalen);
+                       if (ret > 0)
+                               sc->sc_page_off += ret;
+               }
+
+               if (sc->sc_page_off == sizeof(struct o2net_handshake)) {
+                       o2net_check_handshake(sc);
+                       if (unlikely(sc->sc_handshake_ok == 0))
+                               ret = -EPROTO;
+               }
+               goto out;
+       }
 
        /* do we need more header? */
        if (sc->sc_page_off < sizeof(struct o2net_msg)) {
@@ -1136,15 +1410,6 @@ static int o2net_advance_rx(struct o2net_sock_container *sc)
                ret = o2net_recv_tcp_msg(sc->sc_sock, data, datalen);
                if (ret > 0) {
                        sc->sc_page_off += ret;
-
-                       /* this working relies on the handshake being
-                        * smaller than the normal message header */
-                       if (sc->sc_page_off >= sizeof(struct o2net_handshake)&&
-                           !sc->sc_handshake_ok && o2net_check_handshake(sc)) {
-                               ret = -EPROTO;
-                               goto out;
-                       }
-
                        /* only swab incoming here.. we can
                         * only get here once as we cross from
                         * being under to over */
@@ -1194,16 +1459,17 @@ static int o2net_advance_rx(struct o2net_sock_container *sc)
 
 out:
        sclog(sc, "ret = %d\n", ret);
-       do_gettimeofday(&sc->sc_tv_advance_stop);
+       o2net_set_advance_stop_time(sc);
        return ret;
 }
 
 /* this work func is triggerd by data ready.  it reads until it can read no
  * more.  it interprets 0, eof, as fatal.  if data_ready hits while we're doing
  * our work the work struct will be marked and we'll be called again. */
-static void o2net_rx_until_empty(void *arg)
+static void o2net_rx_until_empty(struct work_struct *work)
 {
-       struct o2net_sock_container *sc = arg;
+       struct o2net_sock_container *sc =
+               container_of(work, struct o2net_sock_container, sc_rx_work);
        int ret;
 
        do {
@@ -1245,26 +1511,42 @@ static int o2net_set_nodelay(struct socket *sock)
        return ret;
 }
 
+static void o2net_initialize_handshake(void)
+{
+       o2net_hand->o2hb_heartbeat_timeout_ms = cpu_to_be32(
+               O2HB_MAX_WRITE_TIMEOUT_MS);
+       o2net_hand->o2net_idle_timeout_ms = cpu_to_be32(o2net_idle_timeout());
+       o2net_hand->o2net_keepalive_delay_ms = cpu_to_be32(
+               o2net_keepalive_delay());
+       o2net_hand->o2net_reconnect_delay_ms = cpu_to_be32(
+               o2net_reconnect_delay());
+}
+
 /* ------------------------------------------------------------ */
 
 /* called when a connect completes and after a sock is accepted.  the
  * rx path will see the response and mark the sc valid */
-static void o2net_sc_connect_completed(void *arg)
+static void o2net_sc_connect_completed(struct work_struct *work)
 {
-       struct o2net_sock_container *sc = arg;
+       struct o2net_sock_container *sc =
+               container_of(work, struct o2net_sock_container,
+                            sc_connect_work);
 
        mlog(ML_MSG, "sc sending handshake with ver %llu id %llx\n",
               (unsigned long long)O2NET_PROTOCOL_VERSION,
              (unsigned long long)be64_to_cpu(o2net_hand->connector_id));
 
+       o2net_initialize_handshake();
        o2net_sendpage(sc, o2net_hand, sizeof(*o2net_hand));
        sc_put(sc);
 }
 
 /* this is called as a work_struct func. */
-static void o2net_sc_send_keep_req(void *arg)
+static void o2net_sc_send_keep_req(struct work_struct *work)
 {
-       struct o2net_sock_container *sc = arg;
+       struct o2net_sock_container *sc =
+               container_of(work, struct o2net_sock_container,
+                            sc_keepalive_work.work);
 
        o2net_sendpage(sc, o2net_keep_req, sizeof(*o2net_keep_req));
        sc_put(sc);
@@ -1276,37 +1558,42 @@ static void o2net_sc_send_keep_req(void *arg)
 static void o2net_idle_timer(unsigned long data)
 {
        struct o2net_sock_container *sc = (struct o2net_sock_container *)data;
-       struct timeval now;
-
-       do_gettimeofday(&now);
-
-       mlog(ML_NOTICE, "connection to " SC_NODEF_FMT " has been idle for 10 "
-            "seconds, shutting it down.\n", SC_NODEF_ARGS(sc));
-       mlog(ML_NOTICE, "here are some times that might help debug the "
-            "situation: (tmr %ld.%ld now %ld.%ld dr %ld.%ld adv "
-            "%ld.%ld:%ld.%ld func (%08x:%u) %ld.%ld:%ld.%ld)\n",
-            sc->sc_tv_timer.tv_sec, (long) sc->sc_tv_timer.tv_usec, 
-            now.tv_sec, (long) now.tv_usec,
-            sc->sc_tv_data_ready.tv_sec, (long) sc->sc_tv_data_ready.tv_usec,
-            sc->sc_tv_advance_start.tv_sec,
-            (long) sc->sc_tv_advance_start.tv_usec,
-            sc->sc_tv_advance_stop.tv_sec,
-            (long) sc->sc_tv_advance_stop.tv_usec,
-            sc->sc_msg_key, sc->sc_msg_type,
-            sc->sc_tv_func_start.tv_sec, (long) sc->sc_tv_func_start.tv_usec,
-            sc->sc_tv_func_stop.tv_sec, (long) sc->sc_tv_func_stop.tv_usec);
+       struct o2net_node *nn = o2net_nn_from_num(sc->sc_node->nd_num);
+#ifdef CONFIG_DEBUG_FS
+       unsigned long msecs = ktime_to_ms(ktime_get()) -
+               ktime_to_ms(sc->sc_tv_timer);
+#else
+       unsigned long msecs = o2net_idle_timeout();
+#endif
+
+       printk(KERN_NOTICE "o2net: Connection to " SC_NODEF_FMT " has been "
+              "idle for %lu.%lu secs, shutting it down.\n", SC_NODEF_ARGS(sc),
+              msecs / 1000, msecs % 1000);
+
+       /*
+        * Initialize the nn_timeout so that the next connection attempt
+        * will continue in o2net_start_connect.
+        */
+       atomic_set(&nn->nn_timeout, 1);
 
        o2net_sc_queue_work(sc, &sc->sc_shutdown_work);
 }
 
-static void o2net_sc_postpone_idle(struct o2net_sock_container *sc)
+static void o2net_sc_reset_idle_timer(struct o2net_sock_container *sc)
 {
        o2net_sc_cancel_delayed_work(sc, &sc->sc_keepalive_work);
        o2net_sc_queue_delayed_work(sc, &sc->sc_keepalive_work,
-                                   O2NET_KEEPALIVE_DELAY_SECS * HZ);
-       do_gettimeofday(&sc->sc_tv_timer);
+                     msecs_to_jiffies(o2net_keepalive_delay()));
+       o2net_set_sock_timer(sc);
        mod_timer(&sc->sc_idle_timeout,
-                 jiffies + (O2NET_IDLE_TIMEOUT_SECS * HZ));
+              jiffies + msecs_to_jiffies(o2net_idle_timeout()));
+}
+
+static void o2net_sc_postpone_idle(struct o2net_sock_container *sc)
+{
+       /* Only push out an existing timer */
+       if (timer_pending(&sc->sc_idle_timeout))
+               o2net_sc_reset_idle_timer(sc);
 }
 
 /* this work func is kicked whenever a path sets the nn state which doesn't
@@ -1314,14 +1601,16 @@ static void o2net_sc_postpone_idle(struct o2net_sock_container *sc)
  * having a connect attempt fail, etc. This centralizes the logic which decides
  * if a connect attempt should be made or if we should give up and all future
  * transmit attempts should fail */
-static void o2net_start_connect(void *arg)
+static void o2net_start_connect(struct work_struct *work)
 {
-       struct o2net_node *nn = arg;
+       struct o2net_node *nn =
+               container_of(work, struct o2net_node, nn_connect_work.work);
        struct o2net_sock_container *sc = NULL;
        struct o2nm_node *node = NULL, *mynode = NULL;
        struct socket *sock = NULL;
        struct sockaddr_in myaddr = {0, }, remoteaddr = {0, };
-       int ret = 0;
+       int ret = 0, stop;
+       unsigned int timeout;
 
        /* if we're greater we initiate tx, otherwise we accept */
        if (o2nm_this_node() <= o2net_num_from_nn(nn))
@@ -1341,11 +1630,19 @@ static void o2net_start_connect(void *arg)
        }
 
        spin_lock(&nn->nn_lock);
-       /* see if we already have one pending or have given up */
-       if (nn->nn_sc || nn->nn_persistent_error)
-               arg = NULL;
+       /*
+        * see if we already have one pending or have given up.
+        * For nn_timeout, it is set when we close the connection
+        * because of the idle time out. So it means that we have
+        * at least connected to that node successfully once,
+        * now try to connect to it again.
+        */
+       timeout = atomic_read(&nn->nn_timeout);
+       stop = (nn->nn_sc ||
+               (nn->nn_persistent_error &&
+               (nn->nn_persistent_error != -ENOTCONN || timeout == 0)));
        spin_unlock(&nn->nn_lock);
-       if (arg == NULL) /* *shrug*, needed some indicator */
+       if (stop)
                goto out;
 
        nn->nn_last_connect_attempt = jiffies;
@@ -1367,14 +1664,14 @@ static void o2net_start_connect(void *arg)
        sock->sk->sk_allocation = GFP_ATOMIC;
 
        myaddr.sin_family = AF_INET;
-       myaddr.sin_addr.s_addr = (__force u32)mynode->nd_ipv4_address;
-       myaddr.sin_port = (__force u16)htons(0); /* any port */
+       myaddr.sin_addr.s_addr = mynode->nd_ipv4_address;
+       myaddr.sin_port = htons(0); /* any port */
 
        ret = sock->ops->bind(sock, (struct sockaddr *)&myaddr,
                              sizeof(myaddr));
        if (ret) {
-               mlog(ML_ERROR, "bind failed with %d at address %u.%u.%u.%u\n",
-                    ret, NIPQUAD(mynode->nd_ipv4_address));
+               mlog(ML_ERROR, "bind failed with %d at address %pI4\n",
+                    ret, &mynode->nd_ipv4_address);
                goto out;
        }
 
@@ -1392,8 +1689,8 @@ static void o2net_start_connect(void *arg)
        spin_unlock(&nn->nn_lock);
 
        remoteaddr.sin_family = AF_INET;
-       remoteaddr.sin_addr.s_addr = (__force u32)node->nd_ipv4_address;
-       remoteaddr.sin_port = (__force u16)node->nd_ipv4_port;
+       remoteaddr.sin_addr.s_addr = node->nd_ipv4_address;
+       remoteaddr.sin_port = node->nd_ipv4_port;
 
        ret = sc->sc_sock->ops->connect(sc->sc_sock,
                                        (struct sockaddr *)&remoteaddr,
@@ -1404,8 +1701,8 @@ static void o2net_start_connect(void *arg)
 
 out:
        if (ret) {
-               mlog(ML_NOTICE, "connect attempt to " SC_NODEF_FMT " failed "
-                    "with errno %d\n", SC_NODEF_ARGS(sc), ret);
+               printk(KERN_NOTICE "o2net: Connect attempt to " SC_NODEF_FMT
+                      " failed with errno %d\n", SC_NODEF_ARGS(sc), ret);
                /* 0 err so that another will be queued and attempted
                 * from set_nn_state */
                if (sc)
@@ -1421,24 +1718,28 @@ out:
        return;
 }
 
-static void o2net_connect_expired(void *arg)
+static void o2net_connect_expired(struct work_struct *work)
 {
-       struct o2net_node *nn = arg;
+       struct o2net_node *nn =
+               container_of(work, struct o2net_node, nn_connect_expired.work);
 
        spin_lock(&nn->nn_lock);
        if (!nn->nn_sc_valid) {
-               mlog(ML_ERROR, "no connection established with node %u after "
-                    "%u seconds, giving up and returning errors.\n",
-                    o2net_num_from_nn(nn), O2NET_IDLE_TIMEOUT_SECS);
+               printk(KERN_NOTICE "o2net: No connection established with "
+                      "node %u after %u.%u seconds, giving up.\n",
+                    o2net_num_from_nn(nn),
+                    o2net_idle_timeout() / 1000,
+                    o2net_idle_timeout() % 1000);
 
                o2net_set_nn_state(nn, NULL, 0, -ENOTCONN);
        }
        spin_unlock(&nn->nn_lock);
 }
 
-static void o2net_still_up(void *arg)
+static void o2net_still_up(struct work_struct *work)
 {
-       struct o2net_node *nn = arg;
+       struct o2net_node *nn =
+               container_of(work, struct o2net_node, nn_still_up.work);
 
        o2quo_hb_still_up(o2net_num_from_nn(nn));
 }
@@ -1451,6 +1752,7 @@ void o2net_disconnect_node(struct o2nm_node *node)
 
        /* don't reconnect until it's heartbeating again */
        spin_lock(&nn->nn_lock);
+       atomic_set(&nn->nn_timeout, 0);
        o2net_set_nn_state(nn, NULL, 0, -ENOTCONN);
        spin_unlock(&nn->nn_lock);
 
@@ -1467,8 +1769,13 @@ static void o2net_hb_node_down_cb(struct o2nm_node *node, int node_num,
 {
        o2quo_hb_down(node_num);
 
+       if (!node)
+               return;
+
        if (node_num != o2nm_this_node())
                o2net_disconnect_node(node);
+
+       BUG_ON(atomic_read(&o2net_connected_peers) < 0);
 }
 
 static void o2net_hb_node_up_cb(struct o2nm_node *node, int node_num,
@@ -1478,22 +1785,19 @@ static void o2net_hb_node_up_cb(struct o2nm_node *node, int node_num,
 
        o2quo_hb_up(node_num);
 
+       BUG_ON(!node);
+
        /* ensure an immediate connect attempt */
        nn->nn_last_connect_attempt = jiffies -
-               (msecs_to_jiffies(O2NET_RECONNECT_DELAY_MS) + 1);
+               (msecs_to_jiffies(o2net_reconnect_delay()) + 1);
 
        if (node_num != o2nm_this_node()) {
-               /* heartbeat doesn't work unless a local node number is
-                * configured and doing so brings up the o2net_wq, so we can
-                * use it.. */
-               queue_delayed_work(o2net_wq, &nn->nn_connect_expired,
-                                  O2NET_IDLE_TIMEOUT_SECS * HZ);
-
                /* believe it or not, accept and node hearbeating testing
                 * can succeed for this node before we got here.. so
                 * only use set_nn_state to clear the persistent error
                 * if that hasn't already happened */
                spin_lock(&nn->nn_lock);
+               atomic_set(&nn->nn_timeout, 0);
                if (nn->nn_persistent_error)
                        o2net_set_nn_state(nn, NULL, 0, 0);
                spin_unlock(&nn->nn_lock);
@@ -1502,17 +1806,8 @@ static void o2net_hb_node_up_cb(struct o2nm_node *node, int node_num,
 
 void o2net_unregister_hb_callbacks(void)
 {
-       int ret;
-
-       ret = o2hb_unregister_callback(&o2net_hb_up);
-       if (ret < 0)
-               mlog(ML_ERROR, "Status return %d unregistering heartbeat up "
-                    "callback!\n", ret);
-
-       ret = o2hb_unregister_callback(&o2net_hb_down);
-       if (ret < 0)
-               mlog(ML_ERROR, "Status return %d unregistering heartbeat down "
-                    "callback!\n", ret);
+       o2hb_unregister_callback(NULL, &o2net_hb_up);
+       o2hb_unregister_callback(NULL, &o2net_hb_down);
 }
 
 int o2net_register_hb_callbacks(void)
@@ -1524,9 +1819,9 @@ int o2net_register_hb_callbacks(void)
        o2hb_setup_callback(&o2net_hb_up, O2HB_NODE_UP_CB,
                            o2net_hb_node_up_cb, NULL, O2NET_HB_PRI);
 
-       ret = o2hb_register_callback(&o2net_hb_up);
+       ret = o2hb_register_callback(NULL, &o2net_hb_up);
        if (ret == 0)
-               ret = o2hb_register_callback(&o2net_hb_down);
+               ret = o2hb_register_callback(NULL, &o2net_hb_down);
 
        if (ret)
                o2net_unregister_hb_callbacks();
@@ -1542,6 +1837,7 @@ static int o2net_accept_one(struct socket *sock)
        struct sockaddr_in sin;
        struct socket *new_sock = NULL;
        struct o2nm_node *node = NULL;
+       struct o2nm_node *local_node = NULL;
        struct o2net_sock_container *sc = NULL;
        struct o2net_node *nn;
 
@@ -1571,20 +1867,23 @@ static int o2net_accept_one(struct socket *sock)
        if (ret < 0)
                goto out;
 
-       node = o2nm_get_node_by_ip((__force __be32)sin.sin_addr.s_addr);
+       node = o2nm_get_node_by_ip(sin.sin_addr.s_addr);
        if (node == NULL) {
-               mlog(ML_NOTICE, "attempt to connect from unknown node at "
-                    "%u.%u.%u.%u:%d\n", NIPQUAD(sin.sin_addr.s_addr),
-                    ntohs((__force __be16)sin.sin_port));
+               printk(KERN_NOTICE "o2net: Attempt to connect from unknown "
+                      "node at %pI4:%d\n", &sin.sin_addr.s_addr,
+                      ntohs(sin.sin_port));
                ret = -EINVAL;
                goto out;
        }
 
-       if (o2nm_this_node() > node->nd_num) {
-               mlog(ML_NOTICE, "unexpected connect attempted from a lower "
-                    "numbered node '%s' at " "%u.%u.%u.%u:%d with num %u\n",
-                    node->nd_name, NIPQUAD(sin.sin_addr.s_addr),
-                    ntohs((__force __be16)sin.sin_port), node->nd_num);
+       if (o2nm_this_node() >= node->nd_num) {
+               local_node = o2nm_get_node_by_num(o2nm_this_node());
+               printk(KERN_NOTICE "o2net: Unexpected connect attempt seen "
+                      "at node '%s' (%u, %pI4:%d) from node '%s' (%u, "
+                      "%pI4:%d)\n", local_node->nd_name, local_node->nd_num,
+                      &(local_node->nd_ipv4_address),
+                      ntohs(local_node->nd_ipv4_port), node->nd_name,
+                      node->nd_num, &sin.sin_addr.s_addr, ntohs(sin.sin_port));
                ret = -EINVAL;
                goto out;
        }
@@ -1593,9 +1892,9 @@ static int o2net_accept_one(struct socket *sock)
         * and tries to connect before we see their heartbeat */
        if (!o2hb_check_node_heartbeating_from_callback(node->nd_num)) {
                mlog(ML_CONN, "attempt to connect from node '%s' at "
-                    "%u.%u.%u.%u:%d but it isn't heartbeating\n",
-                    node->nd_name, NIPQUAD(sin.sin_addr.s_addr),
-                    ntohs((__force __be16)sin.sin_port));
+                    "%pI4:%d but it isn't heartbeating\n",
+                    node->nd_name, &sin.sin_addr.s_addr,
+                    ntohs(sin.sin_port));
                ret = -EINVAL;
                goto out;
        }
@@ -1609,10 +1908,10 @@ static int o2net_accept_one(struct socket *sock)
                ret = 0;
        spin_unlock(&nn->nn_lock);
        if (ret) {
-               mlog(ML_NOTICE, "attempt to connect from node '%s' at "
-                    "%u.%u.%u.%u:%d but it already has an open connection\n",
-                    node->nd_name, NIPQUAD(sin.sin_addr.s_addr),
-                    ntohs((__force __be16)sin.sin_port));
+               printk(KERN_NOTICE "o2net: Attempt to connect from node '%s' "
+                      "at %pI4:%d but it already has an open connection\n",
+                      node->nd_name, &sin.sin_addr.s_addr,
+                      ntohs(sin.sin_port));
                goto out;
        }
 
@@ -1626,12 +1925,14 @@ static int o2net_accept_one(struct socket *sock)
        new_sock = NULL;
 
        spin_lock(&nn->nn_lock);
+       atomic_set(&nn->nn_timeout, 0);
        o2net_set_nn_state(nn, sc, 0, 0);
        spin_unlock(&nn->nn_lock);
 
        o2net_register_callbacks(sc->sc_sock->sk, sc);
        o2net_sc_queue_work(sc, &sc->sc_rx_work);
 
+       o2net_initialize_handshake();
        o2net_sendpage(sc, o2net_hand, sizeof(*o2net_hand));
 
 out:
@@ -1639,14 +1940,16 @@ out:
                sock_release(new_sock);
        if (node)
                o2nm_node_put(node);
+       if (local_node)
+               o2nm_node_put(local_node);
        if (sc)
                sc_put(sc);
        return ret;
 }
 
-static void o2net_accept_many(void *arg)
+static void o2net_accept_many(struct work_struct *work)
 {
-       struct socket *sock = arg;
+       struct socket *sock = o2net_listen_sock;
        while (o2net_accept_one(sock) == 0)
                cond_resched();
 }
@@ -1676,19 +1979,19 @@ out:
        ready(sk, bytes);
 }
 
-static int o2net_open_listening_sock(__be16 port)
+static int o2net_open_listening_sock(__be32 addr, __be16 port)
 {
        struct socket *sock = NULL;
        int ret;
        struct sockaddr_in sin = {
                .sin_family = PF_INET,
-               .sin_addr = { .s_addr = (__force u32)htonl(INADDR_ANY) },
-               .sin_port = (__force u16)port,
+               .sin_addr = { .s_addr = addr },
+               .sin_port = port,
        };
 
        ret = sock_create(PF_INET, SOCK_STREAM, IPPROTO_TCP, &sock);
        if (ret < 0) {
-               mlog(ML_ERROR, "unable to create socket, ret=%d\n", ret);
+               printk(KERN_ERR "o2net: Error %d while creating socket\n", ret);
                goto out;
        }
 
@@ -1700,21 +2003,20 @@ static int o2net_open_listening_sock(__be16 port)
        write_unlock_bh(&sock->sk->sk_callback_lock);
 
        o2net_listen_sock = sock;
-       INIT_WORK(&o2net_listen_work, o2net_accept_many, sock);
+       INIT_WORK(&o2net_listen_work, o2net_accept_many);
 
        sock->sk->sk_reuse = 1;
        ret = sock->ops->bind(sock, (struct sockaddr *)&sin, sizeof(sin));
        if (ret < 0) {
-               mlog(ML_ERROR, "unable to bind socket to port %d, ret=%d\n",
-                    ntohs(port), ret);
+               printk(KERN_ERR "o2net: Error %d while binding socket at "
+                      "%pI4:%u\n", ret, &addr, ntohs(port)); 
                goto out;
        }
 
        ret = sock->ops->listen(sock, 64);
-       if (ret < 0) {
-               mlog(ML_ERROR, "unable to listen on port %d, ret=%d\n",
-                    ntohs(port), ret);
-       }
+       if (ret < 0)
+               printk(KERN_ERR "o2net: Error %d while listening on %pI4:%u\n",
+                      ret, &addr, ntohs(port));
 
 out:
        if (ret) {
@@ -1746,7 +2048,8 @@ int o2net_start_listening(struct o2nm_node *node)
                return -ENOMEM; /* ? */
        }
 
-       ret = o2net_open_listening_sock(node->nd_ipv4_port);
+       ret = o2net_open_listening_sock(node->nd_ipv4_address,
+                                       node->nd_ipv4_port);
        if (ret) {
                destroy_workqueue(o2net_wq);
                o2net_wq = NULL;
@@ -1799,9 +2102,12 @@ int o2net_init(void)
 
        o2quo_init();
 
-       o2net_hand = kcalloc(1, sizeof(struct o2net_handshake), GFP_KERNEL);
-       o2net_keep_req = kcalloc(1, sizeof(struct o2net_msg), GFP_KERNEL);
-       o2net_keep_resp = kcalloc(1, sizeof(struct o2net_msg), GFP_KERNEL);
+       if (o2net_debugfs_init())
+               return -ENOMEM;
+
+       o2net_hand = kzalloc(sizeof(struct o2net_handshake), GFP_KERNEL);
+       o2net_keep_req = kzalloc(sizeof(struct o2net_msg), GFP_KERNEL);
+       o2net_keep_resp = kzalloc(sizeof(struct o2net_msg), GFP_KERNEL);
        if (!o2net_hand || !o2net_keep_req || !o2net_keep_resp) {
                kfree(o2net_hand);
                kfree(o2net_keep_req);
@@ -1818,10 +2124,12 @@ int o2net_init(void)
        for (i = 0; i < ARRAY_SIZE(o2net_nodes); i++) {
                struct o2net_node *nn = o2net_nn_from_num(i);
 
+               atomic_set(&nn->nn_timeout, 0);
                spin_lock_init(&nn->nn_lock);
-               INIT_WORK(&nn->nn_connect_work, o2net_start_connect, nn);
-               INIT_WORK(&nn->nn_connect_expired, o2net_connect_expired, nn);
-               INIT_WORK(&nn->nn_still_up, o2net_still_up, nn);
+               INIT_DELAYED_WORK(&nn->nn_connect_work, o2net_start_connect);
+               INIT_DELAYED_WORK(&nn->nn_connect_expired,
+                                 o2net_connect_expired);
+               INIT_DELAYED_WORK(&nn->nn_still_up, o2net_still_up);
                /* until we see hb from a node we'll return einval */
                nn->nn_persistent_error = -ENOTCONN;
                init_waitqueue_head(&nn->nn_sc_wq);
@@ -1838,4 +2146,5 @@ void o2net_exit(void)
        kfree(o2net_hand);
        kfree(o2net_keep_req);
        kfree(o2net_keep_resp);
+       o2net_debugfs_exit();
 }