[NET] TIPC: Fix whitespace errors.
[linux-2.6.git] / net / tipc / link.c
index 784b24b..71c2f2f 100644 (file)
@@ -1,8 +1,8 @@
 /*
  * net/tipc/link.c: TIPC link code
- * 
+ *
  * Copyright (c) 1996-2006, Ericsson AB
- * Copyright (c) 2004-2005, Wind River Systems
+ * Copyright (c) 2004-2006, Wind River Systems
  * All rights reserved.
  *
  * Redistribution and use in source and binary forms, with or without
 #include "bcast.h"
 
 
-/* 
- * Limit for deferred reception queue: 
+/*
+ * Limit for deferred reception queue:
  */
 
 #define DEF_QUEUE_LIMIT 256u
 
-/* 
- * Link state events: 
+/*
+ * Link state events:
  */
 
 #define  STARTING_EVT    856384768     /* link processing trigger */
 #define  TRAFFIC_MSG_EVT 560815u       /* rx'd ??? */
 #define  TIMEOUT_EVT     560817u       /* link timer expired */
 
-/*   
- * The following two 'message types' is really just implementation 
- * data conveniently stored in the message header. 
+/*
+ * The following two 'message types' is really just implementation
+ * data conveniently stored in the message header.
  * They must not be considered part of the protocol
  */
 #define OPEN_MSG   0
 #define CLOSED_MSG 1
 
-/* 
+/*
  * State value stored in 'exp_msg_count'
  */
 
@@ -97,7 +97,7 @@ struct link_name {
 
 /* LINK EVENT CODE IS NOT SUPPORTED AT PRESENT */
 
-/** 
+/**
  * struct link_event - link up/down event notification
  */
 
@@ -121,7 +121,7 @@ static int  link_send_sections_long(struct port *sender,
 static void link_check_defragm_bufs(struct link *l_ptr);
 static void link_state_event(struct link *l_ptr, u32 event);
 static void link_reset_statistics(struct link *l_ptr);
-static void link_print(struct link *l_ptr, struct print_buf *buf, 
+static void link_print(struct link *l_ptr, struct print_buf *buf,
                       const char *str);
 
 /*
@@ -132,17 +132,17 @@ static void link_print(struct link *l_ptr, struct print_buf *buf,
  * allow the output from multiple links to be intermixed.  For this reason
  * routines of the form "dbg_link_XXX()" have been created that will capture
  * debug info into a link's personal print buffer, which can then be dumped
- * into the TIPC system log (LOG) upon request.
+ * into the TIPC system log (TIPC_LOG) upon request.
  *
  * To enable per-link debugging, use LINK_LOG_BUF_SIZE to specify the size
  * of the print buffer used by each link.  If LINK_LOG_BUF_SIZE is set to 0,
- * the dbg_link_XXX() routines simply send their output to the standard 
+ * the dbg_link_XXX() routines simply send their output to the standard
  * debug print buffer (DBG_OUTPUT), if it has been defined; this can be useful
  * when there is only a single link in the system being debugged.
  *
  * Notes:
- * - When enabled, LINK_LOG_BUF_SIZE should be set to at least 1000 (bytes)
- * - "l_ptr" must be valid when using dbg_link_XXX() macros  
+ * - When enabled, LINK_LOG_BUF_SIZE should be set to at least TIPC_PB_MIN_SIZE
+ * - "l_ptr" must be valid when using dbg_link_XXX() macros
  */
 
 #define LINK_LOG_BUF_SIZE 0
@@ -159,13 +159,13 @@ static void link_print(struct link *l_ptr, struct print_buf *buf,
 
 static void dbg_print_link(struct link *l_ptr, const char *str)
 {
-       if (DBG_OUTPUT)
+       if (DBG_OUTPUT != TIPC_NULL)
                link_print(l_ptr, DBG_OUTPUT, str);
 }
 
 static void dbg_print_buf_chain(struct sk_buff *root_buf)
 {
-       if (DBG_OUTPUT) {
+       if (DBG_OUTPUT != TIPC_NULL) {
                struct sk_buff *buf = root_buf;
 
                while (buf) {
@@ -222,18 +222,18 @@ static u32 link_max_pkt(struct link *l_ptr)
 static void link_init_max_pkt(struct link *l_ptr)
 {
        u32 max_pkt;
-       
+
        max_pkt = (l_ptr->b_ptr->publ.mtu & ~3);
        if (max_pkt > MAX_MSG_SIZE)
                max_pkt = MAX_MSG_SIZE;
 
-        l_ptr->max_pkt_target = max_pkt;
+       l_ptr->max_pkt_target = max_pkt;
        if (l_ptr->max_pkt_target < MAX_PKT_DEFAULT)
                l_ptr->max_pkt = l_ptr->max_pkt_target;
-       else 
+       else
                l_ptr->max_pkt = MAX_PKT_DEFAULT;
 
-        l_ptr->max_pkt_probes = 0;
+       l_ptr->max_pkt_probes = 0;
 }
 
 static u32 link_next_sent(struct link *l_ptr)
@@ -269,7 +269,7 @@ int tipc_link_is_active(struct link *l_ptr)
  * link_name_validate - validate & (optionally) deconstruct link name
  * @name - ptr to link name string
  * @name_parts - ptr to area for link name components (or NULL if not needed)
- * 
+ *
  * Returns 1 if link name is valid, otherwise 0.
  */
 
@@ -317,8 +317,8 @@ static int link_name_validate(const char *name, struct link_name *name_parts)
                    &z_peer, &c_peer, &n_peer, &dummy) != 3) ||
            (z_local > 255) || (c_local > 4095) || (n_local > 4095) ||
            (z_peer  > 255) || (c_peer  > 4095) || (n_peer  > 4095) ||
-           (if_local_len <= 1) || (if_local_len > TIPC_MAX_IF_NAME) || 
-           (if_peer_len  <= 1) || (if_peer_len  > TIPC_MAX_IF_NAME) || 
+           (if_local_len <= 1) || (if_local_len > TIPC_MAX_IF_NAME) ||
+           (if_peer_len  <= 1) || (if_peer_len  > TIPC_MAX_IF_NAME) ||
            (strspn(if_local, tipc_alphabet) != (if_local_len - 1)) ||
            (strspn(if_peer, tipc_alphabet) != (if_peer_len - 1)))
                return 0;
@@ -337,7 +337,7 @@ static int link_name_validate(const char *name, struct link_name *name_parts)
 /**
  * link_timeout - handle expiration of link timer
  * @l_ptr: pointer to link
- * 
+ *
  * This routine must not grab "tipc_net_lock" to avoid a potential deadlock conflict
  * with tipc_link_delete().  (There is no risk that the node will be deleted by
  * another thread because tipc_link_delete() always cancels the link timer before
@@ -406,7 +406,7 @@ static void link_set_timer(struct link *l_ptr, u32 time)
  * @b_ptr: pointer to associated bearer
  * @peer: network address of node at other end of link
  * @media_addr: media address to use when sending messages over link
- * 
+ *
  * Returns pointer to link.
  */
 
@@ -417,18 +417,17 @@ struct link *tipc_link_create(struct bearer *b_ptr, const u32 peer,
        struct tipc_msg *msg;
        char *if_name;
 
-       l_ptr = (struct link *)kmalloc(sizeof(*l_ptr), GFP_ATOMIC);
+       l_ptr = kzalloc(sizeof(*l_ptr), GFP_ATOMIC);
        if (!l_ptr) {
-               warn("Memory squeeze; Failed to create link\n");
+               warn("Link creation failed, no memory\n");
                return NULL;
        }
-       memset(l_ptr, 0, sizeof(*l_ptr));
 
        l_ptr->addr = peer;
        if_name = strchr(b_ptr->publ.name, ':') + 1;
        sprintf(l_ptr->name, "%u.%u.%u:%s-%u.%u.%u:",
                tipc_zone(tipc_own_addr), tipc_cluster(tipc_own_addr),
-               tipc_node(tipc_own_addr), 
+               tipc_node(tipc_own_addr),
                if_name,
                tipc_zone(peer), tipc_cluster(peer), tipc_node(peer));
                /* note: peer i/f is appended to link name by reset/activate */
@@ -469,7 +468,7 @@ struct link *tipc_link_create(struct bearer *b_ptr, const u32 peer,
 
                if (!pb) {
                        kfree(l_ptr);
-                       warn("Memory squeeze; Failed to create link\n");
+                       warn("Link creation failed, no memory for print buffer\n");
                        return NULL;
                }
                tipc_printbuf_init(&l_ptr->print_buf, pb, LINK_LOG_BUF_SIZE);
@@ -479,17 +478,17 @@ struct link *tipc_link_create(struct bearer *b_ptr, const u32 peer,
 
        dbg("tipc_link_create(): tolerance = %u,cont intv = %u, abort_limit = %u\n",
            l_ptr->tolerance, l_ptr->continuity_interval, l_ptr->abort_limit);
-       
+
        return l_ptr;
 }
 
-/** 
+/**
  * tipc_link_delete - delete a link
  * @l_ptr: pointer to link
- * 
+ *
  * Note: 'tipc_net_lock' is write_locked, bearer is locked.
  * This routine must not grab the node lock until after link timer cancellation
- * to avoid a potential deadlock situation.  
+ * to avoid a potential deadlock situation.
  */
 
 void tipc_link_delete(struct link *l_ptr)
@@ -502,7 +501,7 @@ void tipc_link_delete(struct link *l_ptr)
        dbg("tipc_link_delete()\n");
 
        k_cancel_timer(&l_ptr->timer);
-       
+
        tipc_node_lock(l_ptr->owner);
        tipc_link_reset(l_ptr);
        tipc_node_detach_link(l_ptr->owner, l_ptr);
@@ -522,12 +521,12 @@ void tipc_link_start(struct link *l_ptr)
 }
 
 /**
- * link_schedule_port - schedule port for deferred sending 
+ * link_schedule_port - schedule port for deferred sending
  * @l_ptr: pointer to link
  * @origport: reference to sending port
  * @sz: amount of data to be sent
- * 
- * Schedules port for renewed sending of messages after link congestion 
+ *
+ * Schedules port for renewed sending of messages after link congestion
  * has abated.
  */
 
@@ -568,13 +567,12 @@ void tipc_link_wakeup_ports(struct link *l_ptr, int all)
                return;
        if (link_congested(l_ptr))
                goto exit;
-       list_for_each_entry_safe(p_ptr, temp_p_ptr, &l_ptr->waiting_ports, 
+       list_for_each_entry_safe(p_ptr, temp_p_ptr, &l_ptr->waiting_ports,
                                 wait_list) {
                if (win <= 0)
                        break;
                list_del_init(&p_ptr->wait_list);
                p_ptr->congested_link = NULL;
-               assert(p_ptr->wakeup);
                spin_lock_bh(p_ptr->publ.lock);
                p_ptr->publ.congested = 0;
                p_ptr->wakeup(&p_ptr->publ);
@@ -586,7 +584,7 @@ exit:
        spin_unlock_bh(&tipc_port_list_lock);
 }
 
-/** 
+/**
  * link_release_outqueue - purge link's outbound message queue
  * @l_ptr: pointer to link
  */
@@ -623,7 +621,7 @@ void tipc_link_reset_fragments(struct link *l_ptr)
        l_ptr->defragm_buf = NULL;
 }
 
-/** 
+/**
  * tipc_link_stop - purge all inbound and outbound messages associated with link
  * @l_ptr: pointer to link
  */
@@ -667,7 +665,7 @@ static void link_send_event(void (*fcn)(u32 a, char *n, int up),
                            struct link *l_ptr, int up)
 {
        struct link_event *ev;
-       
+
        ev = kmalloc(sizeof(*ev), GFP_ATOMIC);
        if (!ev) {
                warn("Link event allocation failure\n");
@@ -691,15 +689,16 @@ void tipc_link_reset(struct link *l_ptr)
        struct sk_buff *buf;
        u32 prev_state = l_ptr->state;
        u32 checkpoint = l_ptr->next_in_no;
-       
+       int was_active_link = tipc_link_is_active(l_ptr);
+
        msg_set_session(l_ptr->pmsg, msg_session(l_ptr->pmsg) + 1);
 
-        /* Link is down, accept any session: */
+       /* Link is down, accept any session: */
        l_ptr->peer_session = 0;
 
-        /* Prepare for max packet size negotiation */
+       /* Prepare for max packet size negotiation */
        link_init_max_pkt(l_ptr);
-       
+
        l_ptr->state = RESET_UNKNOWN;
        dbg_link_state("Resetting Link\n");
 
@@ -712,7 +711,7 @@ void tipc_link_reset(struct link *l_ptr)
        tipc_printf(TIPC_CONS, "\nReset link <%s>\n", l_ptr->name);
        dbg_link_dump();
 #endif
-       if (tipc_node_has_active_links(l_ptr->owner) &&
+       if (was_active_link && tipc_node_has_active_links(l_ptr->owner) &&
            l_ptr->owner->permit_changeover) {
                l_ptr->reset_checkpoint = checkpoint;
                l_ptr->exp_msg_count = START_CHANGEOVER;
@@ -755,7 +754,7 @@ void tipc_link_reset(struct link *l_ptr)
 
 static void link_activate(struct link *l_ptr)
 {
-       l_ptr->next_in_no = 1;
+       l_ptr->next_in_no = l_ptr->stats.recv_info = 1;
        tipc_node_link_up(l_ptr->owner, l_ptr);
        tipc_bearer_add_dest(l_ptr->b_ptr, l_ptr->addr);
        link_send_event(tipc_cfg_link_event, l_ptr, 1);
@@ -771,7 +770,7 @@ static void link_activate(struct link *l_ptr)
 
 static void link_state_event(struct link *l_ptr, unsigned event)
 {
-       struct link *other; 
+       struct link *other;
        u32 cont_intv = l_ptr->continuity_interval;
 
        if (!l_ptr->started && (event != STARTING_EVT))
@@ -800,11 +799,11 @@ static void link_state_event(struct link *l_ptr, unsigned event)
                        if (l_ptr->next_in_no != l_ptr->checkpoint) {
                                l_ptr->checkpoint = l_ptr->next_in_no;
                                if (tipc_bclink_acks_missing(l_ptr->owner)) {
-                                       tipc_link_send_proto_msg(l_ptr, STATE_MSG, 
+                                       tipc_link_send_proto_msg(l_ptr, STATE_MSG,
                                                                 0, 0, 0, 0, 0);
                                        l_ptr->fsm_msg_cnt++;
                                } else if (l_ptr->max_pkt < l_ptr->max_pkt_target) {
-                                       tipc_link_send_proto_msg(l_ptr, STATE_MSG, 
+                                       tipc_link_send_proto_msg(l_ptr, STATE_MSG,
                                                                 1, 0, 0, 0, 0);
                                        l_ptr->fsm_msg_cnt++;
                                }
@@ -820,6 +819,8 @@ static void link_state_event(struct link *l_ptr, unsigned event)
                        break;
                case RESET_MSG:
                        dbg_link("RES -> RR\n");
+                       info("Resetting link <%s>, requested by peer\n",
+                            l_ptr->name);
                        tipc_link_reset(l_ptr);
                        l_ptr->state = RESET_RESET;
                        l_ptr->fsm_msg_cnt = 0;
@@ -844,6 +845,8 @@ static void link_state_event(struct link *l_ptr, unsigned event)
                        break;
                case RESET_MSG:
                        dbg_link("RES -> RR\n");
+                       info("Resetting link <%s>, requested by peer "
+                            "while probing\n", l_ptr->name);
                        tipc_link_reset(l_ptr);
                        l_ptr->state = RESET_RESET;
                        l_ptr->fsm_msg_cnt = 0;
@@ -868,13 +871,15 @@ static void link_state_event(struct link *l_ptr, unsigned event)
                                dbg_link("Probing %u/%u,timer = %u ms)\n",
                                         l_ptr->fsm_msg_cnt, l_ptr->abort_limit,
                                         cont_intv / 4);
-                               tipc_link_send_proto_msg(l_ptr, STATE_MSG, 
+                               tipc_link_send_proto_msg(l_ptr, STATE_MSG,
                                                         1, 0, 0, 0, 0);
                                l_ptr->fsm_msg_cnt++;
                                link_set_timer(l_ptr, cont_intv / 4);
                        } else {        /* Link has failed */
                                dbg_link("-> RU (%u probes unanswered)\n",
                                         l_ptr->fsm_msg_cnt);
+                               warn("Resetting link <%s>, peer not responding\n",
+                                    l_ptr->name);
                                tipc_link_reset(l_ptr);
                                l_ptr->state = RESET_UNKNOWN;
                                l_ptr->fsm_msg_cnt = 0;
@@ -972,27 +977,30 @@ static void link_state_event(struct link *l_ptr, unsigned event)
 
 /*
  * link_bundle_buf(): Append contents of a buffer to
- * the tail of an existing one. 
+ * the tail of an existing one.
  */
 
 static int link_bundle_buf(struct link *l_ptr,
-                          struct sk_buff *bundler, 
+                          struct sk_buff *bundler,
                           struct sk_buff *buf)
 {
        struct tipc_msg *bundler_msg = buf_msg(bundler);
        struct tipc_msg *msg = buf_msg(buf);
        u32 size = msg_size(msg);
-       u32 to_pos = align(msg_size(bundler_msg));
-       u32 rest = link_max_pkt(l_ptr) - to_pos;
+       u32 bundle_size = msg_size(bundler_msg);
+       u32 to_pos = align(bundle_size);
+       u32 pad = to_pos - bundle_size;
 
        if (msg_user(bundler_msg) != MSG_BUNDLER)
                return 0;
        if (msg_type(bundler_msg) != OPEN_MSG)
                return 0;
-       if (rest < align(size))
+       if (skb_tailroom(bundler) < (pad + size))
+               return 0;
+       if (link_max_pkt(l_ptr) < (to_pos + size))
                return 0;
 
-       skb_put(bundler, (to_pos - msg_size(bundler_msg)) + size);
+       skb_put(bundler, pad + size);
        memcpy(bundler->data + to_pos, buf->data, size);
        msg_set_size(bundler_msg, to_pos + size);
        msg_set_msgcnt(bundler_msg, msg_msgcnt(bundler_msg) + 1);
@@ -1022,8 +1030,8 @@ static void link_add_to_outqueue(struct link *l_ptr,
        l_ptr->out_queue_size++;
 }
 
-/* 
- * tipc_link_send_buf() is the 'full path' for messages, called from 
+/*
+ * tipc_link_send_buf() is the 'full path' for messages, called from
  * inside TIPC when the 'fast path' in tipc_send_buf
  * has failed, and from link_send()
  */
@@ -1050,7 +1058,7 @@ int tipc_link_send_buf(struct link *l_ptr, struct sk_buff *buf)
                msg_dbg(msg, "TIPC: Congestion, throwing away\n");
                buf_discard(buf);
                if (imp > CONN_MANAGER) {
-                       warn("Resetting <%s>, send queue full", l_ptr->name);
+                       warn("Resetting link <%s>, send queue full", l_ptr->name);
                        tipc_link_reset(l_ptr);
                }
                return dsz;
@@ -1066,7 +1074,7 @@ int tipc_link_send_buf(struct link *l_ptr, struct sk_buff *buf)
        if (queue_size > l_ptr->stats.max_queue_sz)
                l_ptr->stats.max_queue_sz = queue_size;
 
-       if (likely(!tipc_bearer_congested(l_ptr->b_ptr, l_ptr) && 
+       if (likely(!tipc_bearer_congested(l_ptr->b_ptr, l_ptr) &&
                   !link_congested(l_ptr))) {
                link_add_to_outqueue(l_ptr, buf, msg);
 
@@ -1086,7 +1094,7 @@ int tipc_link_send_buf(struct link *l_ptr, struct sk_buff *buf)
 
                /* Try adding message to an existing bundle */
 
-               if (l_ptr->next_out && 
+               if (l_ptr->next_out &&
                    link_bundle_buf(l_ptr, l_ptr->last_out, buf)) {
                        tipc_bearer_resolve_congestion(l_ptr->b_ptr, l_ptr);
                        return dsz;
@@ -1101,7 +1109,7 @@ int tipc_link_send_buf(struct link *l_ptr, struct sk_buff *buf)
                        if (bundler) {
                                msg_init(&bundler_hdr, MSG_BUNDLER, OPEN_MSG,
                                         TIPC_OK, INT_H_SIZE, l_ptr->addr);
-                               memcpy(bundler->data, (unchar *)&bundler_hdr, 
+                               memcpy(bundler->data, (unchar *)&bundler_hdr,
                                       INT_H_SIZE);
                                skb_trim(bundler, INT_H_SIZE);
                                link_bundle_buf(l_ptr, bundler, buf);
@@ -1118,8 +1126,8 @@ int tipc_link_send_buf(struct link *l_ptr, struct sk_buff *buf)
        return dsz;
 }
 
-/* 
- * tipc_link_send(): same as tipc_link_send_buf(), but the link to use has 
+/*
+ * tipc_link_send(): same as tipc_link_send_buf(), but the link to use has
  * not been selected yet, and the the owner node is not locked
  * Called by TIPC internal users, e.g. the name distributor
  */
@@ -1135,9 +1143,13 @@ int tipc_link_send(struct sk_buff *buf, u32 dest, u32 selector)
        if (n_ptr) {
                tipc_node_lock(n_ptr);
                l_ptr = n_ptr->active_links[selector & 1];
-               dbg("tipc_link_send: found link %x for dest %x\n", l_ptr, dest);
                if (l_ptr) {
+                       dbg("tipc_link_send: found link %x for dest %x\n", l_ptr, dest);
                        res = tipc_link_send_buf(l_ptr, buf);
+               } else {
+                       dbg("Attempt to send msg to unreachable node:\n");
+                       msg_dbg(buf_msg(buf),">>>");
+                       buf_discard(buf);
                }
                tipc_node_unlock(n_ptr);
        } else {
@@ -1149,8 +1161,8 @@ int tipc_link_send(struct sk_buff *buf, u32 dest, u32 selector)
        return res;
 }
 
-/* 
- * link_send_buf_fast: Entry for data messages where the 
+/*
+ * link_send_buf_fast: Entry for data messages where the
  * destination link is known and the header is complete,
  * inclusive total message length. Very time critical.
  * Link is locked. Returns user data length.
@@ -1185,8 +1197,8 @@ static int link_send_buf_fast(struct link *l_ptr, struct sk_buff *buf,
        return tipc_link_send_buf(l_ptr, buf);  /* All other cases */
 }
 
-/* 
- * tipc_send_buf_fast: Entry for data messages where the 
+/*
+ * tipc_send_buf_fast: Entry for data messages where the
  * destination node is known and the header is complete,
  * inclusive total message length.
  * Returns user data length.
@@ -1224,15 +1236,15 @@ int tipc_send_buf_fast(struct sk_buff *buf, u32 destnode)
 }
 
 
-/* 
- * tipc_link_send_sections_fast: Entry for messages where the 
+/*
+ * tipc_link_send_sections_fast: Entry for messages where the
  * destination processor is known and the header is complete,
- * except for total message length. 
+ * except for total message length.
  * Returns user data length or errno.
  */
-int tipc_link_send_sections_fast(struct port *sender, 
+int tipc_link_send_sections_fast(struct port *sender,
                                 struct iovec const *msg_sect,
-                                const u32 num_sect, 
+                                const u32 num_sect,
                                 u32 destaddr)
 {
        struct tipc_msg *hdr = &sender->publ.phdr;
@@ -1242,8 +1254,6 @@ int tipc_link_send_sections_fast(struct port *sender,
        int res;
        u32 selector = msg_origport(hdr) & 1;
 
-       assert(destaddr != tipc_own_addr);
-
 again:
        /*
         * Try building message using port's max_pkt hint.
@@ -1277,14 +1287,14 @@ exit:
 
                        /* Exit if link (or bearer) is congested */
 
-                       if (link_congested(l_ptr) || 
+                       if (link_congested(l_ptr) ||
                            !list_empty(&l_ptr->b_ptr->cong_links)) {
                                res = link_schedule_port(l_ptr,
                                                         sender->publ.ref, res);
                                goto exit;
                        }
 
-                       /* 
+                       /*
                         * Message size exceeds max_pkt hint; update hint,
                         * then re-try fast path or fragment the message
                         */
@@ -1314,10 +1324,10 @@ exit:
        return res;
 }
 
-/* 
- * link_send_sections_long(): Entry for long messages where the 
+/*
+ * link_send_sections_long(): Entry for long messages where the
  * destination node is known and the header is complete,
- * inclusive total message length. 
+ * inclusive total message length.
  * Link and bearer congestion status have been checked to be ok,
  * and are ignored if they change.
  *
@@ -1347,9 +1357,9 @@ static int link_send_sections_long(struct port *sender,
 
 again:
        fragm_no = 1;
-       max_pkt = sender->max_pkt - INT_H_SIZE;  
+       max_pkt = sender->max_pkt - INT_H_SIZE;
                /* leave room for tunnel header in case of link changeover */
-       fragm_sz = max_pkt - INT_H_SIZE; 
+       fragm_sz = max_pkt - INT_H_SIZE;
                /* leave room for fragmentation header in each fragment */
        rest = dsz;
        fragm_crs = 0;
@@ -1430,7 +1440,7 @@ error:
                        if (!buf)
                                goto error;
 
-                       buf->next = NULL;                                
+                       buf->next = NULL;
                        prev->next = buf;
                        memcpy(buf->data, (unchar *)&fragm_hdr, INT_H_SIZE);
                        fragm_crs = INT_H_SIZE;
@@ -1440,7 +1450,7 @@ error:
        }
        while (rest > 0);
 
-       /* 
+       /*
         * Now we have a buffer chain. Select a link and check
         * that packet size is still OK
         */
@@ -1496,7 +1506,7 @@ reject:
        return dsz;
 }
 
-/* 
+/*
  * tipc_link_push_packet: Push one unsent packet to the media
  */
 u32 tipc_link_push_packet(struct link *l_ptr)
@@ -1509,7 +1519,7 @@ u32 tipc_link_push_packet(struct link *l_ptr)
        /* consider that buffers may have been released in meantime */
 
        if (r_q_size && buf) {
-               u32 last = lesser(mod(r_q_head + r_q_size), 
+               u32 last = lesser(mod(r_q_head + r_q_size),
                                  link_last_sent(l_ptr));
                u32 first = msg_seqno(buf_msg(buf));
 
@@ -1525,7 +1535,7 @@ u32 tipc_link_push_packet(struct link *l_ptr)
 
        if (r_q_size && buf && !skb_cloned(buf)) {
                msg_set_ack(buf_msg(buf), mod(l_ptr->next_in_no - 1));
-               msg_set_bcast_ack(buf_msg(buf), l_ptr->owner->bclink.last_in); 
+               msg_set_bcast_ack(buf_msg(buf), l_ptr->owner->bclink.last_in);
                if (tipc_bearer_send(l_ptr->b_ptr, buf, &l_ptr->media_addr)) {
                        msg_dbg(buf_msg(buf), ">DEF-RETR>");
                        l_ptr->retransm_queue_head = mod(++r_q_head);
@@ -1544,7 +1554,7 @@ u32 tipc_link_push_packet(struct link *l_ptr)
        buf = l_ptr->proto_msg_queue;
        if (buf) {
                msg_set_ack(buf_msg(buf), mod(l_ptr->next_in_no - 1));
-               msg_set_bcast_ack(buf_msg(buf),l_ptr->owner->bclink.last_in); 
+               msg_set_bcast_ack(buf_msg(buf),l_ptr->owner->bclink.last_in);
                if (tipc_bearer_send(l_ptr->b_ptr, buf, &l_ptr->media_addr)) {
                        msg_dbg(buf_msg(buf), ">DEF-PROT>");
                        l_ptr->unacked_window = 0;
@@ -1568,7 +1578,7 @@ u32 tipc_link_push_packet(struct link *l_ptr)
 
                if (mod(next - first) < l_ptr->queue_limit[0]) {
                        msg_set_ack(msg, mod(l_ptr->next_in_no - 1));
-                       msg_set_bcast_ack(msg, l_ptr->owner->bclink.last_in); 
+                       msg_set_bcast_ack(msg, l_ptr->owner->bclink.last_in);
                        if (tipc_bearer_send(l_ptr->b_ptr, buf, &l_ptr->media_addr)) {
                                if (msg_user(msg) == MSG_BUNDLER)
                                        msg_set_type(msg, CLOSED_MSG);
@@ -1604,40 +1614,122 @@ void tipc_link_push_queue(struct link *l_ptr)
                tipc_bearer_schedule(l_ptr->b_ptr, l_ptr);
 }
 
-void tipc_link_retransmit(struct link *l_ptr, struct sk_buff *buf, 
+static void link_reset_all(unsigned long addr)
+{
+       struct node *n_ptr;
+       char addr_string[16];
+       u32 i;
+
+       read_lock_bh(&tipc_net_lock);
+       n_ptr = tipc_node_find((u32)addr);
+       if (!n_ptr) {
+               read_unlock_bh(&tipc_net_lock);
+               return; /* node no longer exists */
+       }
+
+       tipc_node_lock(n_ptr);
+
+       warn("Resetting all links to %s\n",
+            addr_string_fill(addr_string, n_ptr->addr));
+
+       for (i = 0; i < MAX_BEARERS; i++) {
+               if (n_ptr->links[i]) {
+                       link_print(n_ptr->links[i], TIPC_OUTPUT,
+                                  "Resetting link\n");
+                       tipc_link_reset(n_ptr->links[i]);
+               }
+       }
+
+       tipc_node_unlock(n_ptr);
+       read_unlock_bh(&tipc_net_lock);
+}
+
+static void link_retransmit_failure(struct link *l_ptr, struct sk_buff *buf)
+{
+       struct tipc_msg *msg = buf_msg(buf);
+
+       warn("Retransmission failure on link <%s>\n", l_ptr->name);
+       tipc_msg_print(TIPC_OUTPUT, msg, ">RETR-FAIL>");
+
+       if (l_ptr->addr) {
+
+               /* Handle failure on standard link */
+
+               link_print(l_ptr, TIPC_OUTPUT, "Resetting link\n");
+               tipc_link_reset(l_ptr);
+
+       } else {
+
+               /* Handle failure on broadcast link */
+
+               struct node *n_ptr;
+               char addr_string[16];
+
+               tipc_printf(TIPC_OUTPUT, "Msg seq number: %u,  ", msg_seqno(msg));
+               tipc_printf(TIPC_OUTPUT, "Outstanding acks: %lu\n",
+                                    (unsigned long) TIPC_SKB_CB(buf)->handle);
+
+               n_ptr = l_ptr->owner->next;
+               tipc_node_lock(n_ptr);
+
+               addr_string_fill(addr_string, n_ptr->addr);
+               tipc_printf(TIPC_OUTPUT, "Multicast link info for %s\n", addr_string);
+               tipc_printf(TIPC_OUTPUT, "Supported: %d,  ", n_ptr->bclink.supported);
+               tipc_printf(TIPC_OUTPUT, "Acked: %u\n", n_ptr->bclink.acked);
+               tipc_printf(TIPC_OUTPUT, "Last in: %u,  ", n_ptr->bclink.last_in);
+               tipc_printf(TIPC_OUTPUT, "Gap after: %u,  ", n_ptr->bclink.gap_after);
+               tipc_printf(TIPC_OUTPUT, "Gap to: %u\n", n_ptr->bclink.gap_to);
+               tipc_printf(TIPC_OUTPUT, "Nack sync: %u\n\n", n_ptr->bclink.nack_sync);
+
+               tipc_k_signal((Handler)link_reset_all, (unsigned long)n_ptr->addr);
+
+               tipc_node_unlock(n_ptr);
+
+               l_ptr->stale_count = 0;
+       }
+}
+
+void tipc_link_retransmit(struct link *l_ptr, struct sk_buff *buf,
                          u32 retransmits)
 {
        struct tipc_msg *msg;
 
+       if (!buf)
+               return;
+
+       msg = buf_msg(buf);
+
        dbg("Retransmitting %u in link %x\n", retransmits, l_ptr);
 
-       if (tipc_bearer_congested(l_ptr->b_ptr, l_ptr) && buf && !skb_cloned(buf)) {
-               msg_dbg(buf_msg(buf), ">NO_RETR->BCONG>");
-               dbg_print_link(l_ptr, "   ");
-               l_ptr->retransm_queue_head = msg_seqno(buf_msg(buf));
-               l_ptr->retransm_queue_size = retransmits;
-               return;
+       if (tipc_bearer_congested(l_ptr->b_ptr, l_ptr)) {
+               if (!skb_cloned(buf)) {
+                       msg_dbg(msg, ">NO_RETR->BCONG>");
+                       dbg_print_link(l_ptr, "   ");
+                       l_ptr->retransm_queue_head = msg_seqno(msg);
+                       l_ptr->retransm_queue_size = retransmits;
+                       return;
+               } else {
+                       /* Don't retransmit if driver already has the buffer */
+               }
+       } else {
+               /* Detect repeated retransmit failures on uncongested bearer */
+
+               if (l_ptr->last_retransmitted == msg_seqno(msg)) {
+                       if (++l_ptr->stale_count > 100) {
+                               link_retransmit_failure(l_ptr, buf);
+                               return;
+                       }
+               } else {
+                       l_ptr->last_retransmitted = msg_seqno(msg);
+                       l_ptr->stale_count = 1;
+               }
        }
+
        while (retransmits && (buf != l_ptr->next_out) && buf && !skb_cloned(buf)) {
                msg = buf_msg(buf);
                msg_set_ack(msg, mod(l_ptr->next_in_no - 1));
-               msg_set_bcast_ack(msg, l_ptr->owner->bclink.last_in); 
+               msg_set_bcast_ack(msg, l_ptr->owner->bclink.last_in);
                if (tipc_bearer_send(l_ptr->b_ptr, buf, &l_ptr->media_addr)) {
-                        /* Catch if retransmissions fail repeatedly: */
-                        if (l_ptr->last_retransmitted == msg_seqno(msg)) {
-                                if (++l_ptr->stale_count > 100) {
-                                        tipc_msg_print(TIPC_CONS, buf_msg(buf), ">RETR>");
-                                        info("...Retransmitted %u times\n",
-                                            l_ptr->stale_count);
-                                        link_print(l_ptr, TIPC_CONS, "Resetting Link\n");
-                                        tipc_link_reset(l_ptr);
-                                        break;
-                                }
-                        } else {
-                                l_ptr->stale_count = 0;
-                        }
-                        l_ptr->last_retransmitted = msg_seqno(msg);
-
                        msg_dbg(buf_msg(buf), ">RETR>");
                        buf = buf->next;
                        retransmits--;
@@ -1650,10 +1742,11 @@ void tipc_link_retransmit(struct link *l_ptr, struct sk_buff *buf,
                        return;
                }
        }
+
        l_ptr->retransm_queue_head = l_ptr->retransm_queue_size = 0;
 }
 
-/* 
+/*
  * link_recv_non_seq: Receive packets which are outside
  *                    the link sequence flow
  */
@@ -1668,11 +1761,11 @@ static void link_recv_non_seq(struct sk_buff *buf)
                tipc_bclink_recv_pkt(buf);
 }
 
-/** 
+/**
  * link_insert_deferred_queue - insert deferred messages back into receive chain
  */
 
-static struct sk_buff *link_insert_deferred_queue(struct link *l_ptr, 
+static struct sk_buff *link_insert_deferred_queue(struct link *l_ptr,
                                                  struct sk_buff *buf)
 {
        u32 seq_no;
@@ -1720,6 +1813,11 @@ void tipc_recv_msg(struct sk_buff *head, struct tipc_bearer *tb_ptr)
                        link_recv_non_seq(buf);
                        continue;
                }
+
+               if (unlikely(!msg_short(msg) &&
+                            (msg_destnode(msg) != tipc_own_addr)))
+                       goto cont;
+
                n_ptr = tipc_node_find(msg_prevnode(msg));
                if (unlikely(!n_ptr))
                        goto cont;
@@ -1730,8 +1828,8 @@ void tipc_recv_msg(struct sk_buff *head, struct tipc_bearer *tb_ptr)
                        tipc_node_unlock(n_ptr);
                        goto cont;
                }
-               /* 
-                * Release acked messages 
+               /*
+                * Release acked messages
                 */
                if (less(n_ptr->bclink.acked, msg_bcast_ack(msg))) {
                        if (tipc_node_is_up(n_ptr) && n_ptr->bclink.supported)
@@ -1739,7 +1837,7 @@ void tipc_recv_msg(struct sk_buff *head, struct tipc_bearer *tb_ptr)
                }
 
                crs = l_ptr->first_out;
-               while ((crs != l_ptr->next_out) && 
+               while ((crs != l_ptr->next_out) &&
                       less_eq(msg_seqno(buf_msg(crs)), ackd)) {
                        struct sk_buff *next = crs->next;
 
@@ -1777,7 +1875,7 @@ deliver:
                                        switch (msg_user(msg)) {
                                        case MSG_BUNDLER:
                                                l_ptr->stats.recv_bundles++;
-                                               l_ptr->stats.recv_bundled += 
+                                               l_ptr->stats.recv_bundled +=
                                                        msg_msgcnt(msg);
                                                tipc_node_unlock(n_ptr);
                                                tipc_link_recv_bundle(buf);
@@ -1796,7 +1894,7 @@ deliver:
                                                continue;
                                        case MSG_FRAGMENTER:
                                                l_ptr->stats.recv_fragments++;
-                                               if (tipc_link_recv_fragment(&l_ptr->defragm_buf, 
+                                               if (tipc_link_recv_fragment(&l_ptr->defragm_buf,
                                                                            &buf, &msg)) {
                                                        l_ptr->stats.recv_fragmented++;
                                                        goto deliver;
@@ -1807,7 +1905,7 @@ deliver:
                                                if (link_recv_changeover_msg(&l_ptr, &buf)) {
                                                        msg = buf_msg(buf);
                                                        seq_no = msg_seqno(msg);
-                                                       TIPC_SKB_CB(buf)->handle 
+                                                       TIPC_SKB_CB(buf)->handle
                                                                = b_ptr;
                                                        if (type == ORIGINAL_MSG)
                                                                goto deliver;
@@ -1850,8 +1948,8 @@ cont:
        read_unlock_bh(&tipc_net_lock);
 }
 
-/* 
- * link_defer_buf(): Sort a received out-of-sequence packet 
+/*
+ * link_defer_buf(): Sort a received out-of-sequence packet
  *                   into the deferred reception queue.
  * Returns the increase of the queue length,i.e. 0 or 1
  */
@@ -1888,7 +1986,7 @@ u32 tipc_link_defer_pkt(struct sk_buff **head,
                        if (prev)
                                prev->next = buf;
                        else
-                               *head = buf;   
+                               *head = buf;
                        return 1;
                }
                if (seq_no == msg_seqno(msg)) {
@@ -1905,11 +2003,11 @@ u32 tipc_link_defer_pkt(struct sk_buff **head,
        return 0;
 }
 
-/** 
+/**
  * link_handle_out_of_seq_msg - handle arrival of out-of-sequence packet
  */
 
-static void link_handle_out_of_seq_msg(struct link *l_ptr, 
+static void link_handle_out_of_seq_msg(struct link *l_ptr,
                                       struct sk_buff *buf)
 {
        u32 seq_no = msg_seqno(buf_msg(buf));
@@ -1919,14 +2017,14 @@ static void link_handle_out_of_seq_msg(struct link *l_ptr,
                return;
        }
 
-       dbg("rx OOS msg: seq_no %u, expecting %u (%u)\n", 
+       dbg("rx OOS msg: seq_no %u, expecting %u (%u)\n",
            seq_no, mod(l_ptr->next_in_no), l_ptr->next_in_no);
 
        /* Record OOS packet arrival (force mismatch on next timeout) */
 
        l_ptr->checkpoint--;
 
-       /* 
+       /*
         * Discard packet if a duplicate; otherwise add it to deferred queue
         * and notify peer of gap as per protocol specification
         */
@@ -1955,13 +2053,13 @@ void tipc_link_send_proto_msg(struct link *l_ptr, u32 msg_typ, int probe_msg,
 {
        struct sk_buff *buf = NULL;
        struct tipc_msg *msg = l_ptr->pmsg;
-        u32 msg_size = sizeof(l_ptr->proto_msg);
+       u32 msg_size = sizeof(l_ptr->proto_msg);
 
        if (link_blocked(l_ptr))
                return;
        msg_set_type(msg, msg_typ);
        msg_set_net_plane(msg, l_ptr->b_ptr->net_plane);
-       msg_set_bcast_ack(msg, mod(l_ptr->owner->bclink.last_in)); 
+       msg_set_bcast_ack(msg, mod(l_ptr->owner->bclink.last_in));
        msg_set_last_bcast(msg, tipc_bclink_get_last_sent());
 
        if (msg_typ == STATE_MSG) {
@@ -1984,23 +2082,23 @@ void tipc_link_send_proto_msg(struct link *l_ptr, u32 msg_typ, int probe_msg,
                msg_set_max_pkt(msg, ack_mtu);
                msg_set_ack(msg, mod(l_ptr->next_in_no - 1));
                msg_set_probe(msg, probe_msg != 0);
-               if (probe_msg) { 
+               if (probe_msg) {
                        u32 mtu = l_ptr->max_pkt;
 
-                        if ((mtu < l_ptr->max_pkt_target) &&
+                       if ((mtu < l_ptr->max_pkt_target) &&
                            link_working_working(l_ptr) &&
                            l_ptr->fsm_msg_cnt) {
                                msg_size = (mtu + (l_ptr->max_pkt_target - mtu)/2 + 2) & ~3;
-                                if (l_ptr->max_pkt_probes == 10) {
-                                        l_ptr->max_pkt_target = (msg_size - 4);
-                                        l_ptr->max_pkt_probes = 0;
+                               if (l_ptr->max_pkt_probes == 10) {
+                                       l_ptr->max_pkt_target = (msg_size - 4);
+                                       l_ptr->max_pkt_probes = 0;
                                        msg_size = (mtu + (l_ptr->max_pkt_target - mtu)/2 + 2) & ~3;
-                                }
+                               }
                                l_ptr->max_pkt_probes++;
-                        }
+                       }
 
                        l_ptr->stats.sent_probes++;
-                }
+               }
                l_ptr->stats.sent_states++;
        } else {                /* RESET_MSG or ACTIVATE_MSG */
                msg_set_ack(msg, mod(l_ptr->reset_checkpoint - 1));
@@ -2046,7 +2144,7 @@ void tipc_link_send_proto_msg(struct link *l_ptr, u32 msg_typ, int probe_msg,
                return;
 
        memcpy(buf->data, (unchar *)msg, sizeof(l_ptr->proto_msg));
-        msg_set_size(buf_msg(buf), msg_size);
+       msg_set_size(buf_msg(buf), msg_size);
 
        if (tipc_bearer_send(l_ptr->b_ptr, buf, &l_ptr->media_addr)) {
                l_ptr->unacked_window = 0;
@@ -2062,15 +2160,15 @@ void tipc_link_send_proto_msg(struct link *l_ptr, u32 msg_typ, int probe_msg,
 
 /*
  * Receive protocol message :
- * Note that network plane id propagates through the network, and may 
- * change at any time. The node with lowest address rules    
+ * Note that network plane id propagates through the network, and may
+ * change at any time. The node with lowest address rules
  */
 
 static void link_recv_proto_msg(struct link *l_ptr, struct sk_buff *buf)
 {
        u32 rec_gap = 0;
        u32 max_pkt_info;
-        u32 max_pkt_ack;
+       u32 max_pkt_ack;
        u32 msg_tol;
        struct tipc_msg *msg = buf_msg(buf);
 
@@ -2090,12 +2188,12 @@ static void link_recv_proto_msg(struct link *l_ptr, struct sk_buff *buf)
        l_ptr->owner->permit_changeover = msg_redundant_link(msg);
 
        switch (msg_type(msg)) {
-       
+
        case RESET_MSG:
                if (!link_working_unknown(l_ptr) && l_ptr->peer_session) {
                        if (msg_session(msg) == l_ptr->peer_session) {
                                dbg("Duplicate RESET: %u<->%u\n",
-                                   msg_session(msg), l_ptr->peer_session);                                     
+                                   msg_session(msg), l_ptr->peer_session);
                                break; /* duplicate: ignore */
                        }
                }
@@ -2113,13 +2211,13 @@ static void link_recv_proto_msg(struct link *l_ptr, struct sk_buff *buf)
                        l_ptr->priority = msg_linkprio(msg);
 
                max_pkt_info = msg_max_pkt(msg);
-                if (max_pkt_info) {
+               if (max_pkt_info) {
                        if (max_pkt_info < l_ptr->max_pkt_target)
                                l_ptr->max_pkt_target = max_pkt_info;
                        if (l_ptr->max_pkt > l_ptr->max_pkt_target)
                                l_ptr->max_pkt = l_ptr->max_pkt_target;
                } else {
-                        l_ptr->max_pkt = l_ptr->max_pkt_target;
+                       l_ptr->max_pkt = l_ptr->max_pkt_target;
                }
                l_ptr->owner->bclink.supported = (max_pkt_info != 0);
 
@@ -2137,10 +2235,10 @@ static void link_recv_proto_msg(struct link *l_ptr, struct sk_buff *buf)
 
                if ((msg_tol = msg_link_tolerance(msg)))
                        link_set_supervision_props(l_ptr, msg_tol);
-               
-               if (msg_linkprio(msg) && 
+
+               if (msg_linkprio(msg) &&
                    (msg_linkprio(msg) != l_ptr->priority)) {
-                       warn("Changing prio <%s>: %u->%u\n",
+                       warn("Resetting link <%s>, priority change %u->%u\n",
                             l_ptr->name, l_ptr->priority, msg_linkprio(msg));
                        l_ptr->priority = msg_linkprio(msg);
                        tipc_link_reset(l_ptr); /* Enforce change to take effect */
@@ -2152,25 +2250,25 @@ static void link_recv_proto_msg(struct link *l_ptr, struct sk_buff *buf)
                        break;
 
                if (less_eq(mod(l_ptr->next_in_no), msg_next_sent(msg))) {
-                       rec_gap = mod(msg_next_sent(msg) - 
+                       rec_gap = mod(msg_next_sent(msg) -
                                      mod(l_ptr->next_in_no));
                }
 
                max_pkt_ack = msg_max_pkt(msg);
-                if (max_pkt_ack > l_ptr->max_pkt) {
-                        dbg("Link <%s> updated MTU %u -> %u\n",
-                            l_ptr->name, l_ptr->max_pkt, max_pkt_ack);
-                        l_ptr->max_pkt = max_pkt_ack;
-                        l_ptr->max_pkt_probes = 0;
-                }
+               if (max_pkt_ack > l_ptr->max_pkt) {
+                       dbg("Link <%s> updated MTU %u -> %u\n",
+                           l_ptr->name, l_ptr->max_pkt, max_pkt_ack);
+                       l_ptr->max_pkt = max_pkt_ack;
+                       l_ptr->max_pkt_probes = 0;
+               }
 
                max_pkt_ack = 0;
-                if (msg_probe(msg)) {
+               if (msg_probe(msg)) {
                        l_ptr->stats.recv_probes++;
-                        if (msg_size(msg) > sizeof(l_ptr->proto_msg)) {
-                                max_pkt_ack = msg_size(msg);
-                        }
-                }
+                       if (msg_size(msg) > sizeof(l_ptr->proto_msg)) {
+                               max_pkt_ack = msg_size(msg);
+                       }
+               }
 
                /* Protocol message before retransmits, reduce loss risk */
 
@@ -2196,11 +2294,11 @@ exit:
 
 
 /*
- * tipc_link_tunnel(): Send one message via a link belonging to 
+ * tipc_link_tunnel(): Send one message via a link belonging to
  * another bearer. Owner node is locked.
  */
-void tipc_link_tunnel(struct link *l_ptr, 
-                     struct tipc_msg *tunnel_hdr, 
+void tipc_link_tunnel(struct link *l_ptr,
+                     struct tipc_msg *tunnel_hdr,
                      struct tipc_msg  *msg,
                      u32 selector)
 {
@@ -2209,17 +2307,22 @@ void tipc_link_tunnel(struct link *l_ptr,
        u32 length = msg_size(msg);
 
        tunnel = l_ptr->owner->active_links[selector & 1];
-       if (!tipc_link_is_up(tunnel))
+       if (!tipc_link_is_up(tunnel)) {
+               warn("Link changeover error, "
+                    "tunnel link no longer available\n");
                return;
+       }
        msg_set_size(tunnel_hdr, length + INT_H_SIZE);
        buf = buf_acquire(length + INT_H_SIZE);
-       if (!buf)
+       if (!buf) {
+               warn("Link changeover error, "
+                    "unable to send tunnel msg\n");
                return;
+       }
        memcpy(buf->data, (unchar *)tunnel_hdr, INT_H_SIZE);
        memcpy(buf->data + INT_H_SIZE, (unchar *)msg, length);
        dbg("%c->%c:", l_ptr->b_ptr->net_plane, tunnel->b_ptr->net_plane);
        msg_dbg(buf_msg(buf), ">SEND>");
-       assert(tunnel);
        tipc_link_send_buf(tunnel, buf);
 }
 
@@ -2235,23 +2338,27 @@ void tipc_link_changeover(struct link *l_ptr)
        u32 msgcount = l_ptr->out_queue_size;
        struct sk_buff *crs = l_ptr->first_out;
        struct link *tunnel = l_ptr->owner->active_links[0];
-       int split_bundles = tipc_node_has_redundant_links(l_ptr->owner);
        struct tipc_msg tunnel_hdr;
+       int split_bundles;
 
        if (!tunnel)
                return;
 
-       if (!l_ptr->owner->permit_changeover)
+       if (!l_ptr->owner->permit_changeover) {
+               warn("Link changeover error, "
+                    "peer did not permit changeover\n");
                return;
+       }
 
        msg_init(&tunnel_hdr, CHANGEOVER_PROTOCOL,
                 ORIGINAL_MSG, TIPC_OK, INT_H_SIZE, l_ptr->addr);
        msg_set_bearer_id(&tunnel_hdr, l_ptr->peer_bearer_id);
        msg_set_msgcnt(&tunnel_hdr, msgcount);
+       dbg("Link changeover requires %u tunnel messages\n", msgcount);
+
        if (!l_ptr->first_out) {
                struct sk_buff *buf;
 
-               assert(!msgcount);
                buf = buf_acquire(INT_H_SIZE);
                if (buf) {
                        memcpy(buf->data, (unchar *)&tunnel_hdr, INT_H_SIZE);
@@ -2261,10 +2368,15 @@ void tipc_link_changeover(struct link *l_ptr)
                        msg_dbg(&tunnel_hdr, "EMPTY>SEND>");
                        tipc_link_send_buf(tunnel, buf);
                } else {
-                       warn("Memory squeeze; link changeover failed\n");
+                       warn("Link changeover error, "
+                            "unable to send changeover msg\n");
                }
                return;
        }
+
+       split_bundles = (l_ptr->owner->active_links[0] !=
+                        l_ptr->owner->active_links[1]);
+
        while (crs) {
                struct tipc_msg *msg = buf_msg(crs);
 
@@ -2306,11 +2418,12 @@ void tipc_link_send_duplicate(struct link *l_ptr, struct link *tunnel)
                if (msg_user(msg) == MSG_BUNDLER)
                        msg_set_type(msg, CLOSED_MSG);
                msg_set_ack(msg, mod(l_ptr->next_in_no - 1));   /* Update */
-               msg_set_bcast_ack(msg, l_ptr->owner->bclink.last_in); 
+               msg_set_bcast_ack(msg, l_ptr->owner->bclink.last_in);
                msg_set_size(&tunnel_hdr, length + INT_H_SIZE);
                outbuf = buf_acquire(length + INT_H_SIZE);
                if (outbuf == NULL) {
-                       warn("Memory squeeze; buffer duplication failed\n");
+                       warn("Link changeover error, "
+                            "unable to send duplicate msg\n");
                        return;
                }
                memcpy(outbuf->data, (unchar *)&tunnel_hdr, INT_H_SIZE);
@@ -2332,7 +2445,7 @@ void tipc_link_send_duplicate(struct link *l_ptr, struct link *tunnel)
  * @skb: encapsulating message buffer
  * @from_pos: offset to extract from
  *
- * Returns a new message buffer containing an embedded message.  The 
+ * Returns a new message buffer containing an embedded message.  The
  * encapsulating message itself is left unchanged.
  */
 
@@ -2348,7 +2461,7 @@ static struct sk_buff *buf_extract(struct sk_buff *skb, u32 from_pos)
        return eb;
 }
 
-/* 
+/*
  *  link_recv_changeover_msg(): Receive tunneled packet sent
  *  via other link. Node is locked. Return extracted buffer.
  */
@@ -2364,11 +2477,15 @@ static int link_recv_changeover_msg(struct link **l_ptr,
        u32 msg_count = msg_msgcnt(tunnel_msg);
 
        dest_link = (*l_ptr)->owner->links[msg_bearer_id(tunnel_msg)];
-       assert(dest_link != *l_ptr);
        if (!dest_link) {
                msg_dbg(tunnel_msg, "NOLINK/<REC<");
                goto exit;
        }
+       if (dest_link == *l_ptr) {
+               err("Unexpected changeover message on link <%s>\n",
+                   (*l_ptr)->name);
+               goto exit;
+       }
        dbg("%c<-%c:", dest_link->b_ptr->net_plane,
            (*l_ptr)->b_ptr->net_plane);
        *l_ptr = dest_link;
@@ -2381,7 +2498,7 @@ static int link_recv_changeover_msg(struct link **l_ptr,
                }
                *buf = buf_extract(tunnel_buf,INT_H_SIZE);
                if (*buf == NULL) {
-                       warn("Memory squeeze; failed to extract msg\n");
+                       warn("Link changeover error, duplicate msg dropped\n");
                        goto exit;
                }
                msg_dbg(tunnel_msg, "TNL<REC<");
@@ -2393,13 +2510,17 @@ static int link_recv_changeover_msg(struct link **l_ptr,
 
        if (tipc_link_is_up(dest_link)) {
                msg_dbg(tunnel_msg, "UP/FIRST/<REC<");
+               info("Resetting link <%s>, changeover initiated by peer\n",
+                    dest_link->name);
                tipc_link_reset(dest_link);
                dest_link->exp_msg_count = msg_count;
+               dbg("Expecting %u tunnelled messages\n", msg_count);
                if (!msg_count)
                        goto exit;
        } else if (dest_link->exp_msg_count == START_CHANGEOVER) {
                msg_dbg(tunnel_msg, "BLK/FIRST/<REC<");
                dest_link->exp_msg_count = msg_count;
+               dbg("Expecting %u tunnelled messages\n", msg_count);
                if (!msg_count)
                        goto exit;
        }
@@ -2407,6 +2528,8 @@ static int link_recv_changeover_msg(struct link **l_ptr,
        /* Receive original message */
 
        if (dest_link->exp_msg_count == 0) {
+               warn("Link switchover error, "
+                    "got too many tunnelled messages\n");
                msg_dbg(tunnel_msg, "OVERDUE/DROP/<REC<");
                dbg_print_link(dest_link, "LINK:");
                goto exit;
@@ -2422,7 +2545,7 @@ static int link_recv_changeover_msg(struct link **l_ptr,
                        buf_discard(tunnel_buf);
                        return 1;
                } else {
-                       warn("Memory squeeze; dropped incoming msg\n");
+                       warn("Link changeover error, original msg dropped\n");
                }
        }
 exit:
@@ -2444,13 +2567,8 @@ void tipc_link_recv_bundle(struct sk_buff *buf)
        while (msgcount--) {
                obuf = buf_extract(buf, pos);
                if (obuf == NULL) {
-                       char addr_string[16];
-
-                       warn("Buffer allocation failure;\n");
-                       warn("  incoming message(s) from %s lost\n",
-                            addr_string_fill(addr_string, 
-                                             msg_orignode(buf_msg(buf))));
-                       return;
+                       warn("Link unable to unbundle message(s)\n");
+                       break;
                };
                pos += align(msg_size(buf_msg(obuf)));
                msg_dbg(buf_msg(obuf), "     /");
@@ -2464,9 +2582,9 @@ void tipc_link_recv_bundle(struct sk_buff *buf)
  */
 
 
-/* 
+/*
  * tipc_link_send_long_buf: Entry for buffers needing fragmentation.
- * The buffer is complete, inclusive total message length. 
+ * The buffer is complete, inclusive total message length.
  * Returns user data length.
  */
 int tipc_link_send_long_buf(struct link *l_ptr, struct sk_buff *buf)
@@ -2508,7 +2626,7 @@ int tipc_link_send_long_buf(struct link *l_ptr, struct sk_buff *buf)
                }
                fragm = buf_acquire(fragm_sz + INT_H_SIZE);
                if (fragm == NULL) {
-                       warn("Memory squeeze; failed to fragment msg\n");
+                       warn("Link unable to fragment message\n");
                        dsz = -ENOMEM;
                        goto exit;
                }
@@ -2532,9 +2650,9 @@ exit:
        return dsz;
 }
 
-/* 
- * A pending message being re-assembled must store certain values 
- * to handle subsequent fragments correctly. The following functions 
+/*
+ * A pending message being re-assembled must store certain values
+ * to handle subsequent fragments correctly. The following functions
  * help storing these values in unused, available fields in the
  * pending message. This makes dynamic memory allocation unecessary.
  */
@@ -2574,11 +2692,11 @@ static void incr_timer_cnt(struct sk_buff *buf)
        msg_incr_reroute_cnt(buf_msg(buf));
 }
 
-/* 
- * tipc_link_recv_fragment(): Called with node lock on. Returns 
+/*
+ * tipc_link_recv_fragment(): Called with node lock on. Returns
  * the reassembled buffer if message is complete.
  */
-int tipc_link_recv_fragment(struct sk_buff **pending, struct sk_buff **fb, 
+int tipc_link_recv_fragment(struct sk_buff **pending, struct sk_buff **fb,
                            struct tipc_msg **m)
 {
        struct sk_buff *prev = NULL;
@@ -2619,11 +2737,11 @@ int tipc_link_recv_fragment(struct sk_buff **pending, struct sk_buff **fb,
 
                        /*  Prepare buffer for subsequent fragments. */
 
-                       set_long_msg_seqno(pbuf, long_msg_seq_no); 
-                       set_fragm_size(pbuf,fragm_sz); 
-                       set_expected_frags(pbuf,exp_fragm_cnt - 1); 
+                       set_long_msg_seqno(pbuf, long_msg_seq_no);
+                       set_fragm_size(pbuf,fragm_sz);
+                       set_expected_frags(pbuf,exp_fragm_cnt - 1);
                } else {
-                       warn("Memory squeeze; got no defragmenting buffer\n");
+                       warn("Link unable to reassemble fragmented message\n");
                }
                buf_discard(fbuf);
                return 0;
@@ -2647,7 +2765,7 @@ int tipc_link_recv_fragment(struct sk_buff **pending, struct sk_buff **fb,
                        *m = buf_msg(pbuf);
                        return 1;
                }
-               set_expected_frags(pbuf,exp_frags);     
+               set_expected_frags(pbuf,exp_frags);
                return 0;
        }
        dbg(" Discarding orphan fragment %x\n",fbuf);
@@ -2731,10 +2849,10 @@ void tipc_link_set_queue_limits(struct link *l_ptr, u32 window)
  * link_find_link - locate link by name
  * @name - ptr to link name string
  * @node - ptr to area to be filled with ptr to associated node
- * 
+ *
  * Caller must hold 'tipc_net_lock' to ensure node and bearer are not deleted;
  * this also prevents link deletion.
- * 
+ *
  * Returns pointer to link (or 0 if invalid link name).
  */
 
@@ -2742,7 +2860,7 @@ static struct link *link_find_link(const char *name, struct node **node)
 {
        struct link_name link_name_parts;
        struct bearer *b_ptr;
-       struct link *l_ptr; 
+       struct link *l_ptr;
 
        if (!link_name_validate(name, &link_name_parts))
                return NULL;
@@ -2751,7 +2869,7 @@ static struct link *link_find_link(const char *name, struct node **node)
        if (!b_ptr)
                return NULL;
 
-       *node = tipc_node_find(link_name_parts.addr_peer); 
+       *node = tipc_node_find(link_name_parts.addr_peer);
        if (!*node)
                return NULL;
 
@@ -2762,14 +2880,14 @@ static struct link *link_find_link(const char *name, struct node **node)
        return l_ptr;
 }
 
-struct sk_buff *tipc_link_cmd_config(const void *req_tlv_area, int req_tlv_space, 
+struct sk_buff *tipc_link_cmd_config(const void *req_tlv_area, int req_tlv_space,
                                     u16 cmd)
 {
        struct tipc_link_config *args;
-        u32 new_value;
+       u32 new_value;
        struct link *l_ptr;
        struct node *node;
-        int res;
+       int res;
 
        if (!TLV_CHECK(req_tlv_area, req_tlv_space, TIPC_TLV_LINK_CONFIG))
                return tipc_cfg_reply_error_string(TIPC_CFG_TLV_ERROR);
@@ -2781,40 +2899,40 @@ struct sk_buff *tipc_link_cmd_config(const void *req_tlv_area, int req_tlv_space
                if ((cmd == TIPC_CMD_SET_LINK_WINDOW) &&
                    (tipc_bclink_set_queue_limits(new_value) == 0))
                        return tipc_cfg_reply_none();
-               return tipc_cfg_reply_error_string(TIPC_CFG_NOT_SUPPORTED
+               return tipc_cfg_reply_error_string(TIPC_CFG_NOT_SUPPORTED
                                                   " (cannot change setting on broadcast link)");
        }
 
        read_lock_bh(&tipc_net_lock);
-       l_ptr = link_find_link(args->name, &node); 
+       l_ptr = link_find_link(args->name, &node);
        if (!l_ptr) {
                read_unlock_bh(&tipc_net_lock);
-               return tipc_cfg_reply_error_string("link not found");
+               return tipc_cfg_reply_error_string("link not found");
        }
 
        tipc_node_lock(node);
        res = -EINVAL;
        switch (cmd) {
-       case TIPC_CMD_SET_LINK_TOL: 
-               if ((new_value >= TIPC_MIN_LINK_TOL) && 
+       case TIPC_CMD_SET_LINK_TOL:
+               if ((new_value >= TIPC_MIN_LINK_TOL) &&
                    (new_value <= TIPC_MAX_LINK_TOL)) {
                        link_set_supervision_props(l_ptr, new_value);
-                       tipc_link_send_proto_msg(l_ptr, STATE_MSG, 
+                       tipc_link_send_proto_msg(l_ptr, STATE_MSG,
                                                 0, 0, new_value, 0, 0);
                        res = TIPC_OK;
                }
                break;
-       case TIPC_CMD_SET_LINK_PRI: 
+       case TIPC_CMD_SET_LINK_PRI:
                if ((new_value >= TIPC_MIN_LINK_PRI) &&
                    (new_value <= TIPC_MAX_LINK_PRI)) {
                        l_ptr->priority = new_value;
-                       tipc_link_send_proto_msg(l_ptr, STATE_MSG, 
+                       tipc_link_send_proto_msg(l_ptr, STATE_MSG,
                                                 0, 0, 0, new_value, 0);
                        res = TIPC_OK;
                }
                break;
-       case TIPC_CMD_SET_LINK_WINDOW: 
-               if ((new_value >= TIPC_MIN_LINK_WIN) && 
+       case TIPC_CMD_SET_LINK_WINDOW:
+               if ((new_value >= TIPC_MIN_LINK_WIN) &&
                    (new_value <= TIPC_MAX_LINK_WIN)) {
                        tipc_link_set_queue_limits(l_ptr, new_value);
                        res = TIPC_OK;
@@ -2825,7 +2943,7 @@ struct sk_buff *tipc_link_cmd_config(const void *req_tlv_area, int req_tlv_space
 
        read_unlock_bh(&tipc_net_lock);
        if (res)
-               return tipc_cfg_reply_error_string("cannot change link setting");
+               return tipc_cfg_reply_error_string("cannot change link setting");
 
        return tipc_cfg_reply_none();
 }
@@ -2845,7 +2963,7 @@ static void link_reset_statistics(struct link *l_ptr)
 struct sk_buff *tipc_link_cmd_reset_stats(const void *req_tlv_area, int req_tlv_space)
 {
        char *link_name;
-       struct link *l_ptr; 
+       struct link *l_ptr;
        struct node *node;
 
        if (!TLV_CHECK(req_tlv_area, req_tlv_space, TIPC_TLV_LINK_NAME))
@@ -2859,7 +2977,7 @@ struct sk_buff *tipc_link_cmd_reset_stats(const void *req_tlv_area, int req_tlv_
        }
 
        read_lock_bh(&tipc_net_lock);
-       l_ptr = link_find_link(link_name, &node); 
+       l_ptr = link_find_link(link_name, &node);
        if (!l_ptr) {
                read_unlock_bh(&tipc_net_lock);
                return tipc_cfg_reply_error_string("link not found");
@@ -2886,14 +3004,14 @@ static u32 percent(u32 count, u32 total)
  * @name: link name
  * @buf: print buffer area
  * @buf_size: size of print buffer area
- * 
+ *
  * Returns length of print buffer data string (or 0 if error)
  */
 
 static int tipc_link_stats(const char *name, char *buf, const u32 buf_size)
 {
        struct print_buf pb;
-       struct link *l_ptr; 
+       struct link *l_ptr;
        struct node *node;
        char *status;
        u32 profile_total = 0;
@@ -2904,7 +3022,7 @@ static int tipc_link_stats(const char *name, char *buf, const u32 buf_size)
        tipc_printbuf_init(&pb, buf, buf_size);
 
        read_lock_bh(&tipc_net_lock);
-       l_ptr = link_find_link(name, &node); 
+       l_ptr = link_find_link(name, &node);
        if (!l_ptr) {
                read_unlock_bh(&tipc_net_lock);
                return 0;
@@ -2918,28 +3036,28 @@ static int tipc_link_stats(const char *name, char *buf, const u32 buf_size)
        else
                status = "DEFUNCT";
        tipc_printf(&pb, "Link <%s>\n"
-                        "  %s  MTU:%u  Priority:%u  Tolerance:%u ms"
-                        "  Window:%u packets\n", 
-                   l_ptr->name, status, link_max_pkt(l_ptr), 
+                        "  %s  MTU:%u  Priority:%u  Tolerance:%u ms"
+                        "  Window:%u packets\n",
+                   l_ptr->name, status, link_max_pkt(l_ptr),
                    l_ptr->priority, l_ptr->tolerance, l_ptr->queue_limit[0]);
-       tipc_printf(&pb, "  RX packets:%u fragments:%u/%u bundles:%u/%u\n", 
+       tipc_printf(&pb, "  RX packets:%u fragments:%u/%u bundles:%u/%u\n",
                    l_ptr->next_in_no - l_ptr->stats.recv_info,
                    l_ptr->stats.recv_fragments,
                    l_ptr->stats.recv_fragmented,
                    l_ptr->stats.recv_bundles,
                    l_ptr->stats.recv_bundled);
-       tipc_printf(&pb, "  TX packets:%u fragments:%u/%u bundles:%u/%u\n", 
+       tipc_printf(&pb, "  TX packets:%u fragments:%u/%u bundles:%u/%u\n",
                    l_ptr->next_out_no - l_ptr->stats.sent_info,
                    l_ptr->stats.sent_fragments,
-                   l_ptr->stats.sent_fragmented, 
+                   l_ptr->stats.sent_fragmented,
                    l_ptr->stats.sent_bundles,
                    l_ptr->stats.sent_bundled);
        profile_total = l_ptr->stats.msg_length_counts;
        if (!profile_total)
                profile_total = 1;
        tipc_printf(&pb, "  TX profile sample:%u packets  average:%u octets\n"
-                        "  0-64:%u%% -256:%u%% -1024:%u%% -4096:%u%% "
-                        "-16354:%u%% -32768:%u%% -66000:%u%%\n",
+                        "  0-64:%u%% -256:%u%% -1024:%u%% -4096:%u%% "
+                        "-16354:%u%% -32768:%u%% -66000:%u%%\n",
                    l_ptr->stats.msg_length_counts,
                    l_ptr->stats.msg_lengths_total / profile_total,
                    percent(l_ptr->stats.msg_length_profile[0], profile_total),
@@ -2949,21 +3067,21 @@ static int tipc_link_stats(const char *name, char *buf, const u32 buf_size)
                    percent(l_ptr->stats.msg_length_profile[4], profile_total),
                    percent(l_ptr->stats.msg_length_profile[5], profile_total),
                    percent(l_ptr->stats.msg_length_profile[6], profile_total));
-       tipc_printf(&pb, "  RX states:%u probes:%u naks:%u defs:%u dups:%u\n", 
+       tipc_printf(&pb, "  RX states:%u probes:%u naks:%u defs:%u dups:%u\n",
                    l_ptr->stats.recv_states,
                    l_ptr->stats.recv_probes,
                    l_ptr->stats.recv_nacks,
-                   l_ptr->stats.deferred_recv, 
+                   l_ptr->stats.deferred_recv,
                    l_ptr->stats.duplicates);
-       tipc_printf(&pb, "  TX states:%u probes:%u naks:%u acks:%u dups:%u\n", 
-                   l_ptr->stats.sent_states, 
-                   l_ptr->stats.sent_probes, 
-                   l_ptr->stats.sent_nacks, 
-                   l_ptr->stats.sent_acks, 
+       tipc_printf(&pb, "  TX states:%u probes:%u naks:%u acks:%u dups:%u\n",
+                   l_ptr->stats.sent_states,
+                   l_ptr->stats.sent_probes,
+                   l_ptr->stats.sent_nacks,
+                   l_ptr->stats.sent_acks,
                    l_ptr->stats.retransmitted);
        tipc_printf(&pb, "  Congestion bearer:%u link:%u  Send queue max:%u avg:%u\n",
                    l_ptr->stats.bearer_congs,
-                   l_ptr->stats.link_congs, 
+                   l_ptr->stats.link_congs,
                    l_ptr->stats.max_queue_sz,
                    l_ptr->stats.queue_sz_counts
                    ? (l_ptr->stats.accu_queue_sz / l_ptr->stats.queue_sz_counts)
@@ -2995,7 +3113,7 @@ struct sk_buff *tipc_link_cmd_show_stats(const void *req_tlv_area, int req_tlv_s
                                  (char *)TLV_DATA(rep_tlv), MAX_LINK_STATS_INFO);
        if (!str_len) {
                buf_discard(buf);
-               return tipc_cfg_reply_error_string("link not found");
+               return tipc_cfg_reply_error_string("link not found");
        }
 
        skb_put(buf, TLV_SPACE(str_len));
@@ -3046,7 +3164,7 @@ int link_control(const char *name, u32 op, u32 val)
  * tipc_link_get_max_pkt - get maximum packet size to use when sending to destination
  * @dest: network address of destination node
  * @selector: used to select from set of active links
- * 
+ *
  * If no active link can be found, uses default maximum packet size.
  */
 
@@ -3055,11 +3173,11 @@ u32 tipc_link_get_max_pkt(u32 dest, u32 selector)
        struct node *n_ptr;
        struct link *l_ptr;
        u32 res = MAX_PKT_DEFAULT;
-       
+
        if (dest == tipc_own_addr)
                return MAX_MSG_SIZE;
 
-       read_lock_bh(&tipc_net_lock);        
+       read_lock_bh(&tipc_net_lock);
        n_ptr = tipc_node_select(dest, selector);
        if (n_ptr) {
                tipc_node_lock(n_ptr);
@@ -3068,7 +3186,7 @@ u32 tipc_link_get_max_pkt(u32 dest, u32 selector)
                        res = link_max_pkt(l_ptr);
                tipc_node_unlock(n_ptr);
        }
-       read_unlock_bh(&tipc_net_lock);       
+       read_unlock_bh(&tipc_net_lock);
        return res;
 }
 
@@ -3126,8 +3244,8 @@ static void link_print(struct link *l_ptr, struct print_buf *buf,
                tipc_printf(buf, "%u]",
                            msg_seqno(buf_msg
                                      (l_ptr->last_out)), l_ptr->out_queue_size);
-               if ((mod(msg_seqno(buf_msg(l_ptr->last_out)) - 
-                        msg_seqno(buf_msg(l_ptr->first_out))) 
+               if ((mod(msg_seqno(buf_msg(l_ptr->last_out)) -
+                        msg_seqno(buf_msg(l_ptr->first_out)))
                     != (l_ptr->out_queue_size - 1))
                    || (l_ptr->last_out->next != 0)) {
                        tipc_printf(buf, "\nSend queue inconsistency\n");