cf414cf05e722800ebb32975586717c8b417f5c6
[linux-2.6.git] / net / tipc / link.c
1 /*
2  * net/tipc/link.c: TIPC link code
3  *
4  * Copyright (c) 1996-2007, Ericsson AB
5  * Copyright (c) 2004-2007, Wind River Systems
6  * All rights reserved.
7  *
8  * Redistribution and use in source and binary forms, with or without
9  * modification, are permitted provided that the following conditions are met:
10  *
11  * 1. Redistributions of source code must retain the above copyright
12  *    notice, this list of conditions and the following disclaimer.
13  * 2. Redistributions in binary form must reproduce the above copyright
14  *    notice, this list of conditions and the following disclaimer in the
15  *    documentation and/or other materials provided with the distribution.
16  * 3. Neither the names of the copyright holders nor the names of its
17  *    contributors may be used to endorse or promote products derived from
18  *    this software without specific prior written permission.
19  *
20  * Alternatively, this software may be distributed under the terms of the
21  * GNU General Public License ("GPL") version 2 as published by the Free
22  * Software Foundation.
23  *
24  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
25  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
26  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
27  * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
28  * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
29  * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
30  * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
31  * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
32  * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
33  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
34  * POSSIBILITY OF SUCH DAMAGE.
35  */
36
37 #include "core.h"
38 #include "link.h"
39 #include "port.h"
40 #include "name_distr.h"
41 #include "discover.h"
42 #include "config.h"
43
44
45 /*
46  * Out-of-range value for link session numbers
47  */
48
49 #define INVALID_SESSION 0x10000
50
51 /*
52  * Link state events:
53  */
54
55 #define  STARTING_EVT    856384768      /* link processing trigger */
56 #define  TRAFFIC_MSG_EVT 560815u        /* rx'd ??? */
57 #define  TIMEOUT_EVT     560817u        /* link timer expired */
58
59 /*
60  * The following two 'message types' is really just implementation
61  * data conveniently stored in the message header.
62  * They must not be considered part of the protocol
63  */
64 #define OPEN_MSG   0
65 #define CLOSED_MSG 1
66
67 /*
68  * State value stored in 'exp_msg_count'
69  */
70
71 #define START_CHANGEOVER 100000u
72
73 /**
74  * struct link_name - deconstructed link name
75  * @addr_local: network address of node at this end
76  * @if_local: name of interface at this end
77  * @addr_peer: network address of node at far end
78  * @if_peer: name of interface at far end
79  */
80
81 struct link_name {
82         u32 addr_local;
83         char if_local[TIPC_MAX_IF_NAME];
84         u32 addr_peer;
85         char if_peer[TIPC_MAX_IF_NAME];
86 };
87
88 static void link_handle_out_of_seq_msg(struct link *l_ptr,
89                                        struct sk_buff *buf);
90 static void link_recv_proto_msg(struct link *l_ptr, struct sk_buff *buf);
91 static int  link_recv_changeover_msg(struct link **l_ptr, struct sk_buff **buf);
92 static void link_set_supervision_props(struct link *l_ptr, u32 tolerance);
93 static int  link_send_sections_long(struct port *sender,
94                                     struct iovec const *msg_sect,
95                                     u32 num_sect, u32 destnode);
96 static void link_check_defragm_bufs(struct link *l_ptr);
97 static void link_state_event(struct link *l_ptr, u32 event);
98 static void link_reset_statistics(struct link *l_ptr);
99 static void link_print(struct link *l_ptr, struct print_buf *buf,
100                        const char *str);
101 static void link_start(struct link *l_ptr);
102 static int link_send_long_buf(struct link *l_ptr, struct sk_buff *buf);
103
104
105 /*
106  * Debugging code used by link routines only
107  *
108  * When debugging link problems on a system that has multiple links,
109  * the standard TIPC debugging routines may not be useful since they
110  * allow the output from multiple links to be intermixed.  For this reason
111  * routines of the form "dbg_link_XXX()" have been created that will capture
112  * debug info into a link's personal print buffer, which can then be dumped
113  * into the TIPC system log (TIPC_LOG) upon request.
114  *
115  * To enable per-link debugging, use LINK_LOG_BUF_SIZE to specify the size
116  * of the print buffer used by each link.  If LINK_LOG_BUF_SIZE is set to 0,
117  * the dbg_link_XXX() routines simply send their output to the standard
118  * debug print buffer (DBG_OUTPUT), if it has been defined; this can be useful
119  * when there is only a single link in the system being debugged.
120  *
121  * Notes:
122  * - When enabled, LINK_LOG_BUF_SIZE should be set to at least TIPC_PB_MIN_SIZE
123  * - "l_ptr" must be valid when using dbg_link_XXX() macros
124  */
125
126 #define LINK_LOG_BUF_SIZE 0
127
128 #define dbg_link(fmt, arg...) \
129         do { \
130                 if (LINK_LOG_BUF_SIZE) \
131                         tipc_printf(&l_ptr->print_buf, fmt, ## arg); \
132         } while (0)
133 #define dbg_link_msg(msg, txt) \
134         do { \
135                 if (LINK_LOG_BUF_SIZE) \
136                         tipc_msg_dbg(&l_ptr->print_buf, msg, txt); \
137         } while (0)
138 #define dbg_link_state(txt) \
139         do { \
140                 if (LINK_LOG_BUF_SIZE) \
141                         link_print(l_ptr, &l_ptr->print_buf, txt); \
142         } while (0)
143 #define dbg_link_dump() do { \
144         if (LINK_LOG_BUF_SIZE) { \
145                 tipc_printf(LOG, "\n\nDumping link <%s>:\n", l_ptr->name); \
146                 tipc_printbuf_move(LOG, &l_ptr->print_buf); \
147         } \
148 } while (0)
149
150 static void dbg_print_link(struct link *l_ptr, const char *str)
151 {
152         if (DBG_OUTPUT != TIPC_NULL)
153                 link_print(l_ptr, DBG_OUTPUT, str);
154 }
155
156 static void dbg_print_buf_chain(struct sk_buff *root_buf)
157 {
158         if (DBG_OUTPUT != TIPC_NULL) {
159                 struct sk_buff *buf = root_buf;
160
161                 while (buf) {
162                         msg_dbg(buf_msg(buf), "In chain: ");
163                         buf = buf->next;
164                 }
165         }
166 }
167
168 /*
169  *  Simple link routines
170  */
171
172 static unsigned int align(unsigned int i)
173 {
174         return (i + 3) & ~3u;
175 }
176
177 static void link_init_max_pkt(struct link *l_ptr)
178 {
179         u32 max_pkt;
180
181         max_pkt = (l_ptr->b_ptr->publ.mtu & ~3);
182         if (max_pkt > MAX_MSG_SIZE)
183                 max_pkt = MAX_MSG_SIZE;
184
185         l_ptr->max_pkt_target = max_pkt;
186         if (l_ptr->max_pkt_target < MAX_PKT_DEFAULT)
187                 l_ptr->max_pkt = l_ptr->max_pkt_target;
188         else
189                 l_ptr->max_pkt = MAX_PKT_DEFAULT;
190
191         l_ptr->max_pkt_probes = 0;
192 }
193
194 static u32 link_next_sent(struct link *l_ptr)
195 {
196         if (l_ptr->next_out)
197                 return msg_seqno(buf_msg(l_ptr->next_out));
198         return mod(l_ptr->next_out_no);
199 }
200
201 static u32 link_last_sent(struct link *l_ptr)
202 {
203         return mod(link_next_sent(l_ptr) - 1);
204 }
205
206 /*
207  *  Simple non-static link routines (i.e. referenced outside this file)
208  */
209
210 int tipc_link_is_up(struct link *l_ptr)
211 {
212         if (!l_ptr)
213                 return 0;
214         return link_working_working(l_ptr) || link_working_unknown(l_ptr);
215 }
216
217 int tipc_link_is_active(struct link *l_ptr)
218 {
219         return  (l_ptr->owner->active_links[0] == l_ptr) ||
220                 (l_ptr->owner->active_links[1] == l_ptr);
221 }
222
223 /**
224  * link_name_validate - validate & (optionally) deconstruct link name
225  * @name - ptr to link name string
226  * @name_parts - ptr to area for link name components (or NULL if not needed)
227  *
228  * Returns 1 if link name is valid, otherwise 0.
229  */
230
231 static int link_name_validate(const char *name, struct link_name *name_parts)
232 {
233         char name_copy[TIPC_MAX_LINK_NAME];
234         char *addr_local;
235         char *if_local;
236         char *addr_peer;
237         char *if_peer;
238         char dummy;
239         u32 z_local, c_local, n_local;
240         u32 z_peer, c_peer, n_peer;
241         u32 if_local_len;
242         u32 if_peer_len;
243
244         /* copy link name & ensure length is OK */
245
246         name_copy[TIPC_MAX_LINK_NAME - 1] = 0;
247         /* need above in case non-Posix strncpy() doesn't pad with nulls */
248         strncpy(name_copy, name, TIPC_MAX_LINK_NAME);
249         if (name_copy[TIPC_MAX_LINK_NAME - 1] != 0)
250                 return 0;
251
252         /* ensure all component parts of link name are present */
253
254         addr_local = name_copy;
255         if ((if_local = strchr(addr_local, ':')) == NULL)
256                 return 0;
257         *(if_local++) = 0;
258         if ((addr_peer = strchr(if_local, '-')) == NULL)
259                 return 0;
260         *(addr_peer++) = 0;
261         if_local_len = addr_peer - if_local;
262         if ((if_peer = strchr(addr_peer, ':')) == NULL)
263                 return 0;
264         *(if_peer++) = 0;
265         if_peer_len = strlen(if_peer) + 1;
266
267         /* validate component parts of link name */
268
269         if ((sscanf(addr_local, "%u.%u.%u%c",
270                     &z_local, &c_local, &n_local, &dummy) != 3) ||
271             (sscanf(addr_peer, "%u.%u.%u%c",
272                     &z_peer, &c_peer, &n_peer, &dummy) != 3) ||
273             (z_local > 255) || (c_local > 4095) || (n_local > 4095) ||
274             (z_peer  > 255) || (c_peer  > 4095) || (n_peer  > 4095) ||
275             (if_local_len <= 1) || (if_local_len > TIPC_MAX_IF_NAME) ||
276             (if_peer_len  <= 1) || (if_peer_len  > TIPC_MAX_IF_NAME) ||
277             (strspn(if_local, tipc_alphabet) != (if_local_len - 1)) ||
278             (strspn(if_peer, tipc_alphabet) != (if_peer_len - 1)))
279                 return 0;
280
281         /* return link name components, if necessary */
282
283         if (name_parts) {
284                 name_parts->addr_local = tipc_addr(z_local, c_local, n_local);
285                 strcpy(name_parts->if_local, if_local);
286                 name_parts->addr_peer = tipc_addr(z_peer, c_peer, n_peer);
287                 strcpy(name_parts->if_peer, if_peer);
288         }
289         return 1;
290 }
291
292 /**
293  * link_timeout - handle expiration of link timer
294  * @l_ptr: pointer to link
295  *
296  * This routine must not grab "tipc_net_lock" to avoid a potential deadlock conflict
297  * with tipc_link_delete().  (There is no risk that the node will be deleted by
298  * another thread because tipc_link_delete() always cancels the link timer before
299  * tipc_node_delete() is called.)
300  */
301
302 static void link_timeout(struct link *l_ptr)
303 {
304         tipc_node_lock(l_ptr->owner);
305
306         /* update counters used in statistical profiling of send traffic */
307
308         l_ptr->stats.accu_queue_sz += l_ptr->out_queue_size;
309         l_ptr->stats.queue_sz_counts++;
310
311         if (l_ptr->out_queue_size > l_ptr->stats.max_queue_sz)
312                 l_ptr->stats.max_queue_sz = l_ptr->out_queue_size;
313
314         if (l_ptr->first_out) {
315                 struct tipc_msg *msg = buf_msg(l_ptr->first_out);
316                 u32 length = msg_size(msg);
317
318                 if ((msg_user(msg) == MSG_FRAGMENTER) &&
319                     (msg_type(msg) == FIRST_FRAGMENT)) {
320                         length = msg_size(msg_get_wrapped(msg));
321                 }
322                 if (length) {
323                         l_ptr->stats.msg_lengths_total += length;
324                         l_ptr->stats.msg_length_counts++;
325                         if (length <= 64)
326                                 l_ptr->stats.msg_length_profile[0]++;
327                         else if (length <= 256)
328                                 l_ptr->stats.msg_length_profile[1]++;
329                         else if (length <= 1024)
330                                 l_ptr->stats.msg_length_profile[2]++;
331                         else if (length <= 4096)
332                                 l_ptr->stats.msg_length_profile[3]++;
333                         else if (length <= 16384)
334                                 l_ptr->stats.msg_length_profile[4]++;
335                         else if (length <= 32768)
336                                 l_ptr->stats.msg_length_profile[5]++;
337                         else
338                                 l_ptr->stats.msg_length_profile[6]++;
339                 }
340         }
341
342         /* do all other link processing performed on a periodic basis */
343
344         link_check_defragm_bufs(l_ptr);
345
346         link_state_event(l_ptr, TIMEOUT_EVT);
347
348         if (l_ptr->next_out)
349                 tipc_link_push_queue(l_ptr);
350
351         tipc_node_unlock(l_ptr->owner);
352 }
353
354 static void link_set_timer(struct link *l_ptr, u32 time)
355 {
356         k_start_timer(&l_ptr->timer, time);
357 }
358
359 /**
360  * tipc_link_create - create a new link
361  * @b_ptr: pointer to associated bearer
362  * @peer: network address of node at other end of link
363  * @media_addr: media address to use when sending messages over link
364  *
365  * Returns pointer to link.
366  */
367
368 struct link *tipc_link_create(struct bearer *b_ptr, const u32 peer,
369                               const struct tipc_media_addr *media_addr)
370 {
371         struct link *l_ptr;
372         struct tipc_msg *msg;
373         char *if_name;
374
375         l_ptr = kzalloc(sizeof(*l_ptr), GFP_ATOMIC);
376         if (!l_ptr) {
377                 warn("Link creation failed, no memory\n");
378                 return NULL;
379         }
380
381         if (LINK_LOG_BUF_SIZE) {
382                 char *pb = kmalloc(LINK_LOG_BUF_SIZE, GFP_ATOMIC);
383
384                 if (!pb) {
385                         kfree(l_ptr);
386                         warn("Link creation failed, no memory for print buffer\n");
387                         return NULL;
388                 }
389                 tipc_printbuf_init(&l_ptr->print_buf, pb, LINK_LOG_BUF_SIZE);
390         }
391
392         l_ptr->addr = peer;
393         if_name = strchr(b_ptr->publ.name, ':') + 1;
394         sprintf(l_ptr->name, "%u.%u.%u:%s-%u.%u.%u:",
395                 tipc_zone(tipc_own_addr), tipc_cluster(tipc_own_addr),
396                 tipc_node(tipc_own_addr),
397                 if_name,
398                 tipc_zone(peer), tipc_cluster(peer), tipc_node(peer));
399                 /* note: peer i/f is appended to link name by reset/activate */
400         memcpy(&l_ptr->media_addr, media_addr, sizeof(*media_addr));
401         l_ptr->checkpoint = 1;
402         l_ptr->b_ptr = b_ptr;
403         link_set_supervision_props(l_ptr, b_ptr->media->tolerance);
404         l_ptr->state = RESET_UNKNOWN;
405
406         l_ptr->pmsg = (struct tipc_msg *)&l_ptr->proto_msg;
407         msg = l_ptr->pmsg;
408         tipc_msg_init(msg, LINK_PROTOCOL, RESET_MSG, INT_H_SIZE, l_ptr->addr);
409         msg_set_size(msg, sizeof(l_ptr->proto_msg));
410         msg_set_session(msg, (tipc_random & 0xffff));
411         msg_set_bearer_id(msg, b_ptr->identity);
412         strcpy((char *)msg_data(msg), if_name);
413
414         l_ptr->priority = b_ptr->priority;
415         tipc_link_set_queue_limits(l_ptr, b_ptr->media->window);
416
417         link_init_max_pkt(l_ptr);
418
419         l_ptr->next_out_no = 1;
420         INIT_LIST_HEAD(&l_ptr->waiting_ports);
421
422         link_reset_statistics(l_ptr);
423
424         l_ptr->owner = tipc_node_attach_link(l_ptr);
425         if (!l_ptr->owner) {
426                 if (LINK_LOG_BUF_SIZE)
427                         kfree(l_ptr->print_buf.buf);
428                 kfree(l_ptr);
429                 return NULL;
430         }
431
432         k_init_timer(&l_ptr->timer, (Handler)link_timeout, (unsigned long)l_ptr);
433         list_add_tail(&l_ptr->link_list, &b_ptr->links);
434         tipc_k_signal((Handler)link_start, (unsigned long)l_ptr);
435
436         dbg("tipc_link_create(): tolerance = %u,cont intv = %u, abort_limit = %u\n",
437             l_ptr->tolerance, l_ptr->continuity_interval, l_ptr->abort_limit);
438
439         return l_ptr;
440 }
441
442 /**
443  * tipc_link_delete - delete a link
444  * @l_ptr: pointer to link
445  *
446  * Note: 'tipc_net_lock' is write_locked, bearer is locked.
447  * This routine must not grab the node lock until after link timer cancellation
448  * to avoid a potential deadlock situation.
449  */
450
451 void tipc_link_delete(struct link *l_ptr)
452 {
453         if (!l_ptr) {
454                 err("Attempt to delete non-existent link\n");
455                 return;
456         }
457
458         dbg("tipc_link_delete()\n");
459
460         k_cancel_timer(&l_ptr->timer);
461
462         tipc_node_lock(l_ptr->owner);
463         tipc_link_reset(l_ptr);
464         tipc_node_detach_link(l_ptr->owner, l_ptr);
465         tipc_link_stop(l_ptr);
466         list_del_init(&l_ptr->link_list);
467         if (LINK_LOG_BUF_SIZE)
468                 kfree(l_ptr->print_buf.buf);
469         tipc_node_unlock(l_ptr->owner);
470         k_term_timer(&l_ptr->timer);
471         kfree(l_ptr);
472 }
473
474 static void link_start(struct link *l_ptr)
475 {
476         dbg("link_start %x\n", l_ptr);
477         link_state_event(l_ptr, STARTING_EVT);
478 }
479
480 /**
481  * link_schedule_port - schedule port for deferred sending
482  * @l_ptr: pointer to link
483  * @origport: reference to sending port
484  * @sz: amount of data to be sent
485  *
486  * Schedules port for renewed sending of messages after link congestion
487  * has abated.
488  */
489
490 static int link_schedule_port(struct link *l_ptr, u32 origport, u32 sz)
491 {
492         struct port *p_ptr;
493
494         spin_lock_bh(&tipc_port_list_lock);
495         p_ptr = tipc_port_lock(origport);
496         if (p_ptr) {
497                 if (!p_ptr->wakeup)
498                         goto exit;
499                 if (!list_empty(&p_ptr->wait_list))
500                         goto exit;
501                 p_ptr->publ.congested = 1;
502                 p_ptr->waiting_pkts = 1 + ((sz - 1) / l_ptr->max_pkt);
503                 list_add_tail(&p_ptr->wait_list, &l_ptr->waiting_ports);
504                 l_ptr->stats.link_congs++;
505 exit:
506                 tipc_port_unlock(p_ptr);
507         }
508         spin_unlock_bh(&tipc_port_list_lock);
509         return -ELINKCONG;
510 }
511
512 void tipc_link_wakeup_ports(struct link *l_ptr, int all)
513 {
514         struct port *p_ptr;
515         struct port *temp_p_ptr;
516         int win = l_ptr->queue_limit[0] - l_ptr->out_queue_size;
517
518         if (all)
519                 win = 100000;
520         if (win <= 0)
521                 return;
522         if (!spin_trylock_bh(&tipc_port_list_lock))
523                 return;
524         if (link_congested(l_ptr))
525                 goto exit;
526         list_for_each_entry_safe(p_ptr, temp_p_ptr, &l_ptr->waiting_ports,
527                                  wait_list) {
528                 if (win <= 0)
529                         break;
530                 list_del_init(&p_ptr->wait_list);
531                 spin_lock_bh(p_ptr->publ.lock);
532                 p_ptr->publ.congested = 0;
533                 p_ptr->wakeup(&p_ptr->publ);
534                 win -= p_ptr->waiting_pkts;
535                 spin_unlock_bh(p_ptr->publ.lock);
536         }
537
538 exit:
539         spin_unlock_bh(&tipc_port_list_lock);
540 }
541
542 /**
543  * link_release_outqueue - purge link's outbound message queue
544  * @l_ptr: pointer to link
545  */
546
547 static void link_release_outqueue(struct link *l_ptr)
548 {
549         struct sk_buff *buf = l_ptr->first_out;
550         struct sk_buff *next;
551
552         while (buf) {
553                 next = buf->next;
554                 buf_discard(buf);
555                 buf = next;
556         }
557         l_ptr->first_out = NULL;
558         l_ptr->out_queue_size = 0;
559 }
560
561 /**
562  * tipc_link_reset_fragments - purge link's inbound message fragments queue
563  * @l_ptr: pointer to link
564  */
565
566 void tipc_link_reset_fragments(struct link *l_ptr)
567 {
568         struct sk_buff *buf = l_ptr->defragm_buf;
569         struct sk_buff *next;
570
571         while (buf) {
572                 next = buf->next;
573                 buf_discard(buf);
574                 buf = next;
575         }
576         l_ptr->defragm_buf = NULL;
577 }
578
579 /**
580  * tipc_link_stop - purge all inbound and outbound messages associated with link
581  * @l_ptr: pointer to link
582  */
583
584 void tipc_link_stop(struct link *l_ptr)
585 {
586         struct sk_buff *buf;
587         struct sk_buff *next;
588
589         buf = l_ptr->oldest_deferred_in;
590         while (buf) {
591                 next = buf->next;
592                 buf_discard(buf);
593                 buf = next;
594         }
595
596         buf = l_ptr->first_out;
597         while (buf) {
598                 next = buf->next;
599                 buf_discard(buf);
600                 buf = next;
601         }
602
603         tipc_link_reset_fragments(l_ptr);
604
605         buf_discard(l_ptr->proto_msg_queue);
606         l_ptr->proto_msg_queue = NULL;
607 }
608
609 /* LINK EVENT CODE IS NOT SUPPORTED AT PRESENT */
610 #define link_send_event(fcn, l_ptr, up) do { } while (0)
611
612 void tipc_link_reset(struct link *l_ptr)
613 {
614         struct sk_buff *buf;
615         u32 prev_state = l_ptr->state;
616         u32 checkpoint = l_ptr->next_in_no;
617         int was_active_link = tipc_link_is_active(l_ptr);
618
619         msg_set_session(l_ptr->pmsg, ((msg_session(l_ptr->pmsg) + 1) & 0xffff));
620
621         /* Link is down, accept any session */
622         l_ptr->peer_session = INVALID_SESSION;
623
624         /* Prepare for max packet size negotiation */
625         link_init_max_pkt(l_ptr);
626
627         l_ptr->state = RESET_UNKNOWN;
628         dbg_link_state("Resetting Link\n");
629
630         if ((prev_state == RESET_UNKNOWN) || (prev_state == RESET_RESET))
631                 return;
632
633         tipc_node_link_down(l_ptr->owner, l_ptr);
634         tipc_bearer_remove_dest(l_ptr->b_ptr, l_ptr->addr);
635
636         if (was_active_link && tipc_node_has_active_links(l_ptr->owner) &&
637             l_ptr->owner->permit_changeover) {
638                 l_ptr->reset_checkpoint = checkpoint;
639                 l_ptr->exp_msg_count = START_CHANGEOVER;
640         }
641
642         /* Clean up all queues: */
643
644         link_release_outqueue(l_ptr);
645         buf_discard(l_ptr->proto_msg_queue);
646         l_ptr->proto_msg_queue = NULL;
647         buf = l_ptr->oldest_deferred_in;
648         while (buf) {
649                 struct sk_buff *next = buf->next;
650                 buf_discard(buf);
651                 buf = next;
652         }
653         if (!list_empty(&l_ptr->waiting_ports))
654                 tipc_link_wakeup_ports(l_ptr, 1);
655
656         l_ptr->retransm_queue_head = 0;
657         l_ptr->retransm_queue_size = 0;
658         l_ptr->last_out = NULL;
659         l_ptr->first_out = NULL;
660         l_ptr->next_out = NULL;
661         l_ptr->unacked_window = 0;
662         l_ptr->checkpoint = 1;
663         l_ptr->next_out_no = 1;
664         l_ptr->deferred_inqueue_sz = 0;
665         l_ptr->oldest_deferred_in = NULL;
666         l_ptr->newest_deferred_in = NULL;
667         l_ptr->fsm_msg_cnt = 0;
668         l_ptr->stale_count = 0;
669         link_reset_statistics(l_ptr);
670
671         link_send_event(tipc_cfg_link_event, l_ptr, 0);
672         if (!in_own_cluster(l_ptr->addr))
673                 link_send_event(tipc_disc_link_event, l_ptr, 0);
674 }
675
676
677 static void link_activate(struct link *l_ptr)
678 {
679         l_ptr->next_in_no = l_ptr->stats.recv_info = 1;
680         tipc_node_link_up(l_ptr->owner, l_ptr);
681         tipc_bearer_add_dest(l_ptr->b_ptr, l_ptr->addr);
682         link_send_event(tipc_cfg_link_event, l_ptr, 1);
683         if (!in_own_cluster(l_ptr->addr))
684                 link_send_event(tipc_disc_link_event, l_ptr, 1);
685 }
686
687 /**
688  * link_state_event - link finite state machine
689  * @l_ptr: pointer to link
690  * @event: state machine event to process
691  */
692
693 static void link_state_event(struct link *l_ptr, unsigned event)
694 {
695         struct link *other;
696         u32 cont_intv = l_ptr->continuity_interval;
697
698         if (!l_ptr->started && (event != STARTING_EVT))
699                 return;         /* Not yet. */
700
701         if (link_blocked(l_ptr)) {
702                 if (event == TIMEOUT_EVT) {
703                         link_set_timer(l_ptr, cont_intv);
704                 }
705                 return;   /* Changeover going on */
706         }
707         dbg_link("STATE_EV: <%s> ", l_ptr->name);
708
709         switch (l_ptr->state) {
710         case WORKING_WORKING:
711                 dbg_link("WW/");
712                 switch (event) {
713                 case TRAFFIC_MSG_EVT:
714                         dbg_link("TRF-");
715                         /* fall through */
716                 case ACTIVATE_MSG:
717                         dbg_link("ACT\n");
718                         break;
719                 case TIMEOUT_EVT:
720                         dbg_link("TIM ");
721                         if (l_ptr->next_in_no != l_ptr->checkpoint) {
722                                 l_ptr->checkpoint = l_ptr->next_in_no;
723                                 if (tipc_bclink_acks_missing(l_ptr->owner)) {
724                                         tipc_link_send_proto_msg(l_ptr, STATE_MSG,
725                                                                  0, 0, 0, 0, 0);
726                                         l_ptr->fsm_msg_cnt++;
727                                 } else if (l_ptr->max_pkt < l_ptr->max_pkt_target) {
728                                         tipc_link_send_proto_msg(l_ptr, STATE_MSG,
729                                                                  1, 0, 0, 0, 0);
730                                         l_ptr->fsm_msg_cnt++;
731                                 }
732                                 link_set_timer(l_ptr, cont_intv);
733                                 break;
734                         }
735                         dbg_link(" -> WU\n");
736                         l_ptr->state = WORKING_UNKNOWN;
737                         l_ptr->fsm_msg_cnt = 0;
738                         tipc_link_send_proto_msg(l_ptr, STATE_MSG, 1, 0, 0, 0, 0);
739                         l_ptr->fsm_msg_cnt++;
740                         link_set_timer(l_ptr, cont_intv / 4);
741                         break;
742                 case RESET_MSG:
743                         dbg_link("RES -> RR\n");
744                         info("Resetting link <%s>, requested by peer\n",
745                              l_ptr->name);
746                         tipc_link_reset(l_ptr);
747                         l_ptr->state = RESET_RESET;
748                         l_ptr->fsm_msg_cnt = 0;
749                         tipc_link_send_proto_msg(l_ptr, ACTIVATE_MSG, 0, 0, 0, 0, 0);
750                         l_ptr->fsm_msg_cnt++;
751                         link_set_timer(l_ptr, cont_intv);
752                         break;
753                 default:
754                         err("Unknown link event %u in WW state\n", event);
755                 }
756                 break;
757         case WORKING_UNKNOWN:
758                 dbg_link("WU/");
759                 switch (event) {
760                 case TRAFFIC_MSG_EVT:
761                         dbg_link("TRF-");
762                 case ACTIVATE_MSG:
763                         dbg_link("ACT -> WW\n");
764                         l_ptr->state = WORKING_WORKING;
765                         l_ptr->fsm_msg_cnt = 0;
766                         link_set_timer(l_ptr, cont_intv);
767                         break;
768                 case RESET_MSG:
769                         dbg_link("RES -> RR\n");
770                         info("Resetting link <%s>, requested by peer "
771                              "while probing\n", l_ptr->name);
772                         tipc_link_reset(l_ptr);
773                         l_ptr->state = RESET_RESET;
774                         l_ptr->fsm_msg_cnt = 0;
775                         tipc_link_send_proto_msg(l_ptr, ACTIVATE_MSG, 0, 0, 0, 0, 0);
776                         l_ptr->fsm_msg_cnt++;
777                         link_set_timer(l_ptr, cont_intv);
778                         break;
779                 case TIMEOUT_EVT:
780                         dbg_link("TIM ");
781                         if (l_ptr->next_in_no != l_ptr->checkpoint) {
782                                 dbg_link("-> WW\n");
783                                 l_ptr->state = WORKING_WORKING;
784                                 l_ptr->fsm_msg_cnt = 0;
785                                 l_ptr->checkpoint = l_ptr->next_in_no;
786                                 if (tipc_bclink_acks_missing(l_ptr->owner)) {
787                                         tipc_link_send_proto_msg(l_ptr, STATE_MSG,
788                                                                  0, 0, 0, 0, 0);
789                                         l_ptr->fsm_msg_cnt++;
790                                 }
791                                 link_set_timer(l_ptr, cont_intv);
792                         } else if (l_ptr->fsm_msg_cnt < l_ptr->abort_limit) {
793                                 dbg_link("Probing %u/%u,timer = %u ms)\n",
794                                          l_ptr->fsm_msg_cnt, l_ptr->abort_limit,
795                                          cont_intv / 4);
796                                 tipc_link_send_proto_msg(l_ptr, STATE_MSG,
797                                                          1, 0, 0, 0, 0);
798                                 l_ptr->fsm_msg_cnt++;
799                                 link_set_timer(l_ptr, cont_intv / 4);
800                         } else {        /* Link has failed */
801                                 dbg_link("-> RU (%u probes unanswered)\n",
802                                          l_ptr->fsm_msg_cnt);
803                                 warn("Resetting link <%s>, peer not responding\n",
804                                      l_ptr->name);
805                                 tipc_link_reset(l_ptr);
806                                 l_ptr->state = RESET_UNKNOWN;
807                                 l_ptr->fsm_msg_cnt = 0;
808                                 tipc_link_send_proto_msg(l_ptr, RESET_MSG,
809                                                          0, 0, 0, 0, 0);
810                                 l_ptr->fsm_msg_cnt++;
811                                 link_set_timer(l_ptr, cont_intv);
812                         }
813                         break;
814                 default:
815                         err("Unknown link event %u in WU state\n", event);
816                 }
817                 break;
818         case RESET_UNKNOWN:
819                 dbg_link("RU/");
820                 switch (event) {
821                 case TRAFFIC_MSG_EVT:
822                         dbg_link("TRF-\n");
823                         break;
824                 case ACTIVATE_MSG:
825                         other = l_ptr->owner->active_links[0];
826                         if (other && link_working_unknown(other)) {
827                                 dbg_link("ACT\n");
828                                 break;
829                         }
830                         dbg_link("ACT -> WW\n");
831                         l_ptr->state = WORKING_WORKING;
832                         l_ptr->fsm_msg_cnt = 0;
833                         link_activate(l_ptr);
834                         tipc_link_send_proto_msg(l_ptr, STATE_MSG, 1, 0, 0, 0, 0);
835                         l_ptr->fsm_msg_cnt++;
836                         link_set_timer(l_ptr, cont_intv);
837                         break;
838                 case RESET_MSG:
839                         dbg_link("RES\n");
840                         dbg_link(" -> RR\n");
841                         l_ptr->state = RESET_RESET;
842                         l_ptr->fsm_msg_cnt = 0;
843                         tipc_link_send_proto_msg(l_ptr, ACTIVATE_MSG, 1, 0, 0, 0, 0);
844                         l_ptr->fsm_msg_cnt++;
845                         link_set_timer(l_ptr, cont_intv);
846                         break;
847                 case STARTING_EVT:
848                         dbg_link("START-");
849                         l_ptr->started = 1;
850                         /* fall through */
851                 case TIMEOUT_EVT:
852                         dbg_link("TIM\n");
853                         tipc_link_send_proto_msg(l_ptr, RESET_MSG, 0, 0, 0, 0, 0);
854                         l_ptr->fsm_msg_cnt++;
855                         link_set_timer(l_ptr, cont_intv);
856                         break;
857                 default:
858                         err("Unknown link event %u in RU state\n", event);
859                 }
860                 break;
861         case RESET_RESET:
862                 dbg_link("RR/ ");
863                 switch (event) {
864                 case TRAFFIC_MSG_EVT:
865                         dbg_link("TRF-");
866                         /* fall through */
867                 case ACTIVATE_MSG:
868                         other = l_ptr->owner->active_links[0];
869                         if (other && link_working_unknown(other)) {
870                                 dbg_link("ACT\n");
871                                 break;
872                         }
873                         dbg_link("ACT -> WW\n");
874                         l_ptr->state = WORKING_WORKING;
875                         l_ptr->fsm_msg_cnt = 0;
876                         link_activate(l_ptr);
877                         tipc_link_send_proto_msg(l_ptr, STATE_MSG, 1, 0, 0, 0, 0);
878                         l_ptr->fsm_msg_cnt++;
879                         link_set_timer(l_ptr, cont_intv);
880                         break;
881                 case RESET_MSG:
882                         dbg_link("RES\n");
883                         break;
884                 case TIMEOUT_EVT:
885                         dbg_link("TIM\n");
886                         tipc_link_send_proto_msg(l_ptr, ACTIVATE_MSG, 0, 0, 0, 0, 0);
887                         l_ptr->fsm_msg_cnt++;
888                         link_set_timer(l_ptr, cont_intv);
889                         dbg_link("fsm_msg_cnt %u\n", l_ptr->fsm_msg_cnt);
890                         break;
891                 default:
892                         err("Unknown link event %u in RR state\n", event);
893                 }
894                 break;
895         default:
896                 err("Unknown link state %u/%u\n", l_ptr->state, event);
897         }
898 }
899
900 /*
901  * link_bundle_buf(): Append contents of a buffer to
902  * the tail of an existing one.
903  */
904
905 static int link_bundle_buf(struct link *l_ptr,
906                            struct sk_buff *bundler,
907                            struct sk_buff *buf)
908 {
909         struct tipc_msg *bundler_msg = buf_msg(bundler);
910         struct tipc_msg *msg = buf_msg(buf);
911         u32 size = msg_size(msg);
912         u32 bundle_size = msg_size(bundler_msg);
913         u32 to_pos = align(bundle_size);
914         u32 pad = to_pos - bundle_size;
915
916         if (msg_user(bundler_msg) != MSG_BUNDLER)
917                 return 0;
918         if (msg_type(bundler_msg) != OPEN_MSG)
919                 return 0;
920         if (skb_tailroom(bundler) < (pad + size))
921                 return 0;
922         if (l_ptr->max_pkt < (to_pos + size))
923                 return 0;
924
925         skb_put(bundler, pad + size);
926         skb_copy_to_linear_data_offset(bundler, to_pos, buf->data, size);
927         msg_set_size(bundler_msg, to_pos + size);
928         msg_set_msgcnt(bundler_msg, msg_msgcnt(bundler_msg) + 1);
929         dbg("Packed msg # %u(%u octets) into pos %u in buf(#%u)\n",
930             msg_msgcnt(bundler_msg), size, to_pos, msg_seqno(bundler_msg));
931         msg_dbg(msg, "PACKD:");
932         buf_discard(buf);
933         l_ptr->stats.sent_bundled++;
934         return 1;
935 }
936
937 static void link_add_to_outqueue(struct link *l_ptr,
938                                  struct sk_buff *buf,
939                                  struct tipc_msg *msg)
940 {
941         u32 ack = mod(l_ptr->next_in_no - 1);
942         u32 seqno = mod(l_ptr->next_out_no++);
943
944         msg_set_word(msg, 2, ((ack << 16) | seqno));
945         msg_set_bcast_ack(msg, l_ptr->owner->bclink.last_in);
946         buf->next = NULL;
947         if (l_ptr->first_out) {
948                 l_ptr->last_out->next = buf;
949                 l_ptr->last_out = buf;
950         } else
951                 l_ptr->first_out = l_ptr->last_out = buf;
952         l_ptr->out_queue_size++;
953 }
954
955 /*
956  * tipc_link_send_buf() is the 'full path' for messages, called from
957  * inside TIPC when the 'fast path' in tipc_send_buf
958  * has failed, and from link_send()
959  */
960
961 int tipc_link_send_buf(struct link *l_ptr, struct sk_buff *buf)
962 {
963         struct tipc_msg *msg = buf_msg(buf);
964         u32 size = msg_size(msg);
965         u32 dsz = msg_data_sz(msg);
966         u32 queue_size = l_ptr->out_queue_size;
967         u32 imp = tipc_msg_tot_importance(msg);
968         u32 queue_limit = l_ptr->queue_limit[imp];
969         u32 max_packet = l_ptr->max_pkt;
970
971         msg_set_prevnode(msg, tipc_own_addr);   /* If routed message */
972
973         /* Match msg importance against queue limits: */
974
975         if (unlikely(queue_size >= queue_limit)) {
976                 if (imp <= TIPC_CRITICAL_IMPORTANCE) {
977                         return link_schedule_port(l_ptr, msg_origport(msg),
978                                                   size);
979                 }
980                 msg_dbg(msg, "TIPC: Congestion, throwing away\n");
981                 buf_discard(buf);
982                 if (imp > CONN_MANAGER) {
983                         warn("Resetting link <%s>, send queue full", l_ptr->name);
984                         tipc_link_reset(l_ptr);
985                 }
986                 return dsz;
987         }
988
989         /* Fragmentation needed ? */
990
991         if (size > max_packet)
992                 return link_send_long_buf(l_ptr, buf);
993
994         /* Packet can be queued or sent: */
995
996         if (queue_size > l_ptr->stats.max_queue_sz)
997                 l_ptr->stats.max_queue_sz = queue_size;
998
999         if (likely(!tipc_bearer_congested(l_ptr->b_ptr, l_ptr) &&
1000                    !link_congested(l_ptr))) {
1001                 link_add_to_outqueue(l_ptr, buf, msg);
1002
1003                 if (likely(tipc_bearer_send(l_ptr->b_ptr, buf, &l_ptr->media_addr))) {
1004                         l_ptr->unacked_window = 0;
1005                 } else {
1006                         tipc_bearer_schedule(l_ptr->b_ptr, l_ptr);
1007                         l_ptr->stats.bearer_congs++;
1008                         l_ptr->next_out = buf;
1009                 }
1010                 return dsz;
1011         }
1012         /* Congestion: can message be bundled ?: */
1013
1014         if ((msg_user(msg) != CHANGEOVER_PROTOCOL) &&
1015             (msg_user(msg) != MSG_FRAGMENTER)) {
1016
1017                 /* Try adding message to an existing bundle */
1018
1019                 if (l_ptr->next_out &&
1020                     link_bundle_buf(l_ptr, l_ptr->last_out, buf)) {
1021                         tipc_bearer_resolve_congestion(l_ptr->b_ptr, l_ptr);
1022                         return dsz;
1023                 }
1024
1025                 /* Try creating a new bundle */
1026
1027                 if (size <= max_packet * 2 / 3) {
1028                         struct sk_buff *bundler = tipc_buf_acquire(max_packet);
1029                         struct tipc_msg bundler_hdr;
1030
1031                         if (bundler) {
1032                                 tipc_msg_init(&bundler_hdr, MSG_BUNDLER, OPEN_MSG,
1033                                          INT_H_SIZE, l_ptr->addr);
1034                                 skb_copy_to_linear_data(bundler, &bundler_hdr,
1035                                                         INT_H_SIZE);
1036                                 skb_trim(bundler, INT_H_SIZE);
1037                                 link_bundle_buf(l_ptr, bundler, buf);
1038                                 buf = bundler;
1039                                 msg = buf_msg(buf);
1040                                 l_ptr->stats.sent_bundles++;
1041                         }
1042                 }
1043         }
1044         if (!l_ptr->next_out)
1045                 l_ptr->next_out = buf;
1046         link_add_to_outqueue(l_ptr, buf, msg);
1047         tipc_bearer_resolve_congestion(l_ptr->b_ptr, l_ptr);
1048         return dsz;
1049 }
1050
1051 /*
1052  * tipc_link_send(): same as tipc_link_send_buf(), but the link to use has
1053  * not been selected yet, and the the owner node is not locked
1054  * Called by TIPC internal users, e.g. the name distributor
1055  */
1056
1057 int tipc_link_send(struct sk_buff *buf, u32 dest, u32 selector)
1058 {
1059         struct link *l_ptr;
1060         struct tipc_node *n_ptr;
1061         int res = -ELINKCONG;
1062
1063         read_lock_bh(&tipc_net_lock);
1064         n_ptr = tipc_node_select(dest, selector);
1065         if (n_ptr) {
1066                 tipc_node_lock(n_ptr);
1067                 l_ptr = n_ptr->active_links[selector & 1];
1068                 if (l_ptr) {
1069                         dbg("tipc_link_send: found link %x for dest %x\n", l_ptr, dest);
1070                         res = tipc_link_send_buf(l_ptr, buf);
1071                 } else {
1072                         dbg("Attempt to send msg to unreachable node:\n");
1073                         msg_dbg(buf_msg(buf),">>>");
1074                         buf_discard(buf);
1075                 }
1076                 tipc_node_unlock(n_ptr);
1077         } else {
1078                 dbg("Attempt to send msg to unknown node:\n");
1079                 msg_dbg(buf_msg(buf),">>>");
1080                 buf_discard(buf);
1081         }
1082         read_unlock_bh(&tipc_net_lock);
1083         return res;
1084 }
1085
1086 /*
1087  * link_send_buf_fast: Entry for data messages where the
1088  * destination link is known and the header is complete,
1089  * inclusive total message length. Very time critical.
1090  * Link is locked. Returns user data length.
1091  */
1092
1093 static int link_send_buf_fast(struct link *l_ptr, struct sk_buff *buf,
1094                               u32 *used_max_pkt)
1095 {
1096         struct tipc_msg *msg = buf_msg(buf);
1097         int res = msg_data_sz(msg);
1098
1099         if (likely(!link_congested(l_ptr))) {
1100                 if (likely(msg_size(msg) <= l_ptr->max_pkt)) {
1101                         if (likely(list_empty(&l_ptr->b_ptr->cong_links))) {
1102                                 link_add_to_outqueue(l_ptr, buf, msg);
1103                                 if (likely(tipc_bearer_send(l_ptr->b_ptr, buf,
1104                                                             &l_ptr->media_addr))) {
1105                                         l_ptr->unacked_window = 0;
1106                                         msg_dbg(msg,"SENT_FAST:");
1107                                         return res;
1108                                 }
1109                                 dbg("failed sent fast...\n");
1110                                 tipc_bearer_schedule(l_ptr->b_ptr, l_ptr);
1111                                 l_ptr->stats.bearer_congs++;
1112                                 l_ptr->next_out = buf;
1113                                 return res;
1114                         }
1115                 }
1116                 else
1117                         *used_max_pkt = l_ptr->max_pkt;
1118         }
1119         return tipc_link_send_buf(l_ptr, buf);  /* All other cases */
1120 }
1121
1122 /*
1123  * tipc_send_buf_fast: Entry for data messages where the
1124  * destination node is known and the header is complete,
1125  * inclusive total message length.
1126  * Returns user data length.
1127  */
1128 int tipc_send_buf_fast(struct sk_buff *buf, u32 destnode)
1129 {
1130         struct link *l_ptr;
1131         struct tipc_node *n_ptr;
1132         int res;
1133         u32 selector = msg_origport(buf_msg(buf)) & 1;
1134         u32 dummy;
1135
1136         if (destnode == tipc_own_addr)
1137                 return tipc_port_recv_msg(buf);
1138
1139         read_lock_bh(&tipc_net_lock);
1140         n_ptr = tipc_node_select(destnode, selector);
1141         if (likely(n_ptr)) {
1142                 tipc_node_lock(n_ptr);
1143                 l_ptr = n_ptr->active_links[selector];
1144                 dbg("send_fast: buf %x selected %x, destnode = %x\n",
1145                     buf, l_ptr, destnode);
1146                 if (likely(l_ptr)) {
1147                         res = link_send_buf_fast(l_ptr, buf, &dummy);
1148                         tipc_node_unlock(n_ptr);
1149                         read_unlock_bh(&tipc_net_lock);
1150                         return res;
1151                 }
1152                 tipc_node_unlock(n_ptr);
1153         }
1154         read_unlock_bh(&tipc_net_lock);
1155         res = msg_data_sz(buf_msg(buf));
1156         tipc_reject_msg(buf, TIPC_ERR_NO_NODE);
1157         return res;
1158 }
1159
1160
1161 /*
1162  * tipc_link_send_sections_fast: Entry for messages where the
1163  * destination processor is known and the header is complete,
1164  * except for total message length.
1165  * Returns user data length or errno.
1166  */
1167 int tipc_link_send_sections_fast(struct port *sender,
1168                                  struct iovec const *msg_sect,
1169                                  const u32 num_sect,
1170                                  u32 destaddr)
1171 {
1172         struct tipc_msg *hdr = &sender->publ.phdr;
1173         struct link *l_ptr;
1174         struct sk_buff *buf;
1175         struct tipc_node *node;
1176         int res;
1177         u32 selector = msg_origport(hdr) & 1;
1178
1179 again:
1180         /*
1181          * Try building message using port's max_pkt hint.
1182          * (Must not hold any locks while building message.)
1183          */
1184
1185         res = tipc_msg_build(hdr, msg_sect, num_sect, sender->publ.max_pkt,
1186                         !sender->user_port, &buf);
1187
1188         read_lock_bh(&tipc_net_lock);
1189         node = tipc_node_select(destaddr, selector);
1190         if (likely(node)) {
1191                 tipc_node_lock(node);
1192                 l_ptr = node->active_links[selector];
1193                 if (likely(l_ptr)) {
1194                         if (likely(buf)) {
1195                                 res = link_send_buf_fast(l_ptr, buf,
1196                                                          &sender->publ.max_pkt);
1197                                 if (unlikely(res < 0))
1198                                         buf_discard(buf);
1199 exit:
1200                                 tipc_node_unlock(node);
1201                                 read_unlock_bh(&tipc_net_lock);
1202                                 return res;
1203                         }
1204
1205                         /* Exit if build request was invalid */
1206
1207                         if (unlikely(res < 0))
1208                                 goto exit;
1209
1210                         /* Exit if link (or bearer) is congested */
1211
1212                         if (link_congested(l_ptr) ||
1213                             !list_empty(&l_ptr->b_ptr->cong_links)) {
1214                                 res = link_schedule_port(l_ptr,
1215                                                          sender->publ.ref, res);
1216                                 goto exit;
1217                         }
1218
1219                         /*
1220                          * Message size exceeds max_pkt hint; update hint,
1221                          * then re-try fast path or fragment the message
1222                          */
1223
1224                         sender->publ.max_pkt = l_ptr->max_pkt;
1225                         tipc_node_unlock(node);
1226                         read_unlock_bh(&tipc_net_lock);
1227
1228
1229                         if ((msg_hdr_sz(hdr) + res) <= sender->publ.max_pkt)
1230                                 goto again;
1231
1232                         return link_send_sections_long(sender, msg_sect,
1233                                                        num_sect, destaddr);
1234                 }
1235                 tipc_node_unlock(node);
1236         }
1237         read_unlock_bh(&tipc_net_lock);
1238
1239         /* Couldn't find a link to the destination node */
1240
1241         if (buf)
1242                 return tipc_reject_msg(buf, TIPC_ERR_NO_NODE);
1243         if (res >= 0)
1244                 return tipc_port_reject_sections(sender, hdr, msg_sect, num_sect,
1245                                                  TIPC_ERR_NO_NODE);
1246         return res;
1247 }
1248
1249 /*
1250  * link_send_sections_long(): Entry for long messages where the
1251  * destination node is known and the header is complete,
1252  * inclusive total message length.
1253  * Link and bearer congestion status have been checked to be ok,
1254  * and are ignored if they change.
1255  *
1256  * Note that fragments do not use the full link MTU so that they won't have
1257  * to undergo refragmentation if link changeover causes them to be sent
1258  * over another link with an additional tunnel header added as prefix.
1259  * (Refragmentation will still occur if the other link has a smaller MTU.)
1260  *
1261  * Returns user data length or errno.
1262  */
1263 static int link_send_sections_long(struct port *sender,
1264                                    struct iovec const *msg_sect,
1265                                    u32 num_sect,
1266                                    u32 destaddr)
1267 {
1268         struct link *l_ptr;
1269         struct tipc_node *node;
1270         struct tipc_msg *hdr = &sender->publ.phdr;
1271         u32 dsz = msg_data_sz(hdr);
1272         u32 max_pkt,fragm_sz,rest;
1273         struct tipc_msg fragm_hdr;
1274         struct sk_buff *buf,*buf_chain,*prev;
1275         u32 fragm_crs,fragm_rest,hsz,sect_rest;
1276         const unchar *sect_crs;
1277         int curr_sect;
1278         u32 fragm_no;
1279
1280 again:
1281         fragm_no = 1;
1282         max_pkt = sender->publ.max_pkt - INT_H_SIZE;
1283                 /* leave room for tunnel header in case of link changeover */
1284         fragm_sz = max_pkt - INT_H_SIZE;
1285                 /* leave room for fragmentation header in each fragment */
1286         rest = dsz;
1287         fragm_crs = 0;
1288         fragm_rest = 0;
1289         sect_rest = 0;
1290         sect_crs = NULL;
1291         curr_sect = -1;
1292
1293         /* Prepare reusable fragment header: */
1294
1295         msg_dbg(hdr, ">FRAGMENTING>");
1296         tipc_msg_init(&fragm_hdr, MSG_FRAGMENTER, FIRST_FRAGMENT,
1297                  INT_H_SIZE, msg_destnode(hdr));
1298         msg_set_link_selector(&fragm_hdr, sender->publ.ref);
1299         msg_set_size(&fragm_hdr, max_pkt);
1300         msg_set_fragm_no(&fragm_hdr, 1);
1301
1302         /* Prepare header of first fragment: */
1303
1304         buf_chain = buf = tipc_buf_acquire(max_pkt);
1305         if (!buf)
1306                 return -ENOMEM;
1307         buf->next = NULL;
1308         skb_copy_to_linear_data(buf, &fragm_hdr, INT_H_SIZE);
1309         hsz = msg_hdr_sz(hdr);
1310         skb_copy_to_linear_data_offset(buf, INT_H_SIZE, hdr, hsz);
1311         msg_dbg(buf_msg(buf), ">BUILD>");
1312
1313         /* Chop up message: */
1314
1315         fragm_crs = INT_H_SIZE + hsz;
1316         fragm_rest = fragm_sz - hsz;
1317
1318         do {            /* For all sections */
1319                 u32 sz;
1320
1321                 if (!sect_rest) {
1322                         sect_rest = msg_sect[++curr_sect].iov_len;
1323                         sect_crs = (const unchar *)msg_sect[curr_sect].iov_base;
1324                 }
1325
1326                 if (sect_rest < fragm_rest)
1327                         sz = sect_rest;
1328                 else
1329                         sz = fragm_rest;
1330
1331                 if (likely(!sender->user_port)) {
1332                         if (copy_from_user(buf->data + fragm_crs, sect_crs, sz)) {
1333 error:
1334                                 for (; buf_chain; buf_chain = buf) {
1335                                         buf = buf_chain->next;
1336                                         buf_discard(buf_chain);
1337                                 }
1338                                 return -EFAULT;
1339                         }
1340                 } else
1341                         skb_copy_to_linear_data_offset(buf, fragm_crs,
1342                                                        sect_crs, sz);
1343                 sect_crs += sz;
1344                 sect_rest -= sz;
1345                 fragm_crs += sz;
1346                 fragm_rest -= sz;
1347                 rest -= sz;
1348
1349                 if (!fragm_rest && rest) {
1350
1351                         /* Initiate new fragment: */
1352                         if (rest <= fragm_sz) {
1353                                 fragm_sz = rest;
1354                                 msg_set_type(&fragm_hdr,LAST_FRAGMENT);
1355                         } else {
1356                                 msg_set_type(&fragm_hdr, FRAGMENT);
1357                         }
1358                         msg_set_size(&fragm_hdr, fragm_sz + INT_H_SIZE);
1359                         msg_set_fragm_no(&fragm_hdr, ++fragm_no);
1360                         prev = buf;
1361                         buf = tipc_buf_acquire(fragm_sz + INT_H_SIZE);
1362                         if (!buf)
1363                                 goto error;
1364
1365                         buf->next = NULL;
1366                         prev->next = buf;
1367                         skb_copy_to_linear_data(buf, &fragm_hdr, INT_H_SIZE);
1368                         fragm_crs = INT_H_SIZE;
1369                         fragm_rest = fragm_sz;
1370                         msg_dbg(buf_msg(buf),"  >BUILD>");
1371                 }
1372         }
1373         while (rest > 0);
1374
1375         /*
1376          * Now we have a buffer chain. Select a link and check
1377          * that packet size is still OK
1378          */
1379         node = tipc_node_select(destaddr, sender->publ.ref & 1);
1380         if (likely(node)) {
1381                 tipc_node_lock(node);
1382                 l_ptr = node->active_links[sender->publ.ref & 1];
1383                 if (!l_ptr) {
1384                         tipc_node_unlock(node);
1385                         goto reject;
1386                 }
1387                 if (l_ptr->max_pkt < max_pkt) {
1388                         sender->publ.max_pkt = l_ptr->max_pkt;
1389                         tipc_node_unlock(node);
1390                         for (; buf_chain; buf_chain = buf) {
1391                                 buf = buf_chain->next;
1392                                 buf_discard(buf_chain);
1393                         }
1394                         goto again;
1395                 }
1396         } else {
1397 reject:
1398                 for (; buf_chain; buf_chain = buf) {
1399                         buf = buf_chain->next;
1400                         buf_discard(buf_chain);
1401                 }
1402                 return tipc_port_reject_sections(sender, hdr, msg_sect, num_sect,
1403                                                  TIPC_ERR_NO_NODE);
1404         }
1405
1406         /* Append whole chain to send queue: */
1407
1408         buf = buf_chain;
1409         l_ptr->long_msg_seq_no = mod(l_ptr->long_msg_seq_no + 1);
1410         if (!l_ptr->next_out)
1411                 l_ptr->next_out = buf_chain;
1412         l_ptr->stats.sent_fragmented++;
1413         while (buf) {
1414                 struct sk_buff *next = buf->next;
1415                 struct tipc_msg *msg = buf_msg(buf);
1416
1417                 l_ptr->stats.sent_fragments++;
1418                 msg_set_long_msgno(msg, l_ptr->long_msg_seq_no);
1419                 link_add_to_outqueue(l_ptr, buf, msg);
1420                 msg_dbg(msg, ">ADD>");
1421                 buf = next;
1422         }
1423
1424         /* Send it, if possible: */
1425
1426         tipc_link_push_queue(l_ptr);
1427         tipc_node_unlock(node);
1428         return dsz;
1429 }
1430
1431 /*
1432  * tipc_link_push_packet: Push one unsent packet to the media
1433  */
1434 u32 tipc_link_push_packet(struct link *l_ptr)
1435 {
1436         struct sk_buff *buf = l_ptr->first_out;
1437         u32 r_q_size = l_ptr->retransm_queue_size;
1438         u32 r_q_head = l_ptr->retransm_queue_head;
1439
1440         /* Step to position where retransmission failed, if any,    */
1441         /* consider that buffers may have been released in meantime */
1442
1443         if (r_q_size && buf) {
1444                 u32 last = lesser(mod(r_q_head + r_q_size),
1445                                   link_last_sent(l_ptr));
1446                 u32 first = msg_seqno(buf_msg(buf));
1447
1448                 while (buf && less(first, r_q_head)) {
1449                         first = mod(first + 1);
1450                         buf = buf->next;
1451                 }
1452                 l_ptr->retransm_queue_head = r_q_head = first;
1453                 l_ptr->retransm_queue_size = r_q_size = mod(last - first);
1454         }
1455
1456         /* Continue retransmission now, if there is anything: */
1457
1458         if (r_q_size && buf) {
1459                 msg_set_ack(buf_msg(buf), mod(l_ptr->next_in_no - 1));
1460                 msg_set_bcast_ack(buf_msg(buf), l_ptr->owner->bclink.last_in);
1461                 if (tipc_bearer_send(l_ptr->b_ptr, buf, &l_ptr->media_addr)) {
1462                         msg_dbg(buf_msg(buf), ">DEF-RETR>");
1463                         l_ptr->retransm_queue_head = mod(++r_q_head);
1464                         l_ptr->retransm_queue_size = --r_q_size;
1465                         l_ptr->stats.retransmitted++;
1466                         return 0;
1467                 } else {
1468                         l_ptr->stats.bearer_congs++;
1469                         msg_dbg(buf_msg(buf), "|>DEF-RETR>");
1470                         return PUSH_FAILED;
1471                 }
1472         }
1473
1474         /* Send deferred protocol message, if any: */
1475
1476         buf = l_ptr->proto_msg_queue;
1477         if (buf) {
1478                 msg_set_ack(buf_msg(buf), mod(l_ptr->next_in_no - 1));
1479                 msg_set_bcast_ack(buf_msg(buf),l_ptr->owner->bclink.last_in);
1480                 if (tipc_bearer_send(l_ptr->b_ptr, buf, &l_ptr->media_addr)) {
1481                         msg_dbg(buf_msg(buf), ">DEF-PROT>");
1482                         l_ptr->unacked_window = 0;
1483                         buf_discard(buf);
1484                         l_ptr->proto_msg_queue = NULL;
1485                         return 0;
1486                 } else {
1487                         msg_dbg(buf_msg(buf), "|>DEF-PROT>");
1488                         l_ptr->stats.bearer_congs++;
1489                         return PUSH_FAILED;
1490                 }
1491         }
1492
1493         /* Send one deferred data message, if send window not full: */
1494
1495         buf = l_ptr->next_out;
1496         if (buf) {
1497                 struct tipc_msg *msg = buf_msg(buf);
1498                 u32 next = msg_seqno(msg);
1499                 u32 first = msg_seqno(buf_msg(l_ptr->first_out));
1500
1501                 if (mod(next - first) < l_ptr->queue_limit[0]) {
1502                         msg_set_ack(msg, mod(l_ptr->next_in_no - 1));
1503                         msg_set_bcast_ack(msg, l_ptr->owner->bclink.last_in);
1504                         if (tipc_bearer_send(l_ptr->b_ptr, buf, &l_ptr->media_addr)) {
1505                                 if (msg_user(msg) == MSG_BUNDLER)
1506                                         msg_set_type(msg, CLOSED_MSG);
1507                                 msg_dbg(msg, ">PUSH-DATA>");
1508                                 l_ptr->next_out = buf->next;
1509                                 return 0;
1510                         } else {
1511                                 msg_dbg(msg, "|PUSH-DATA|");
1512                                 l_ptr->stats.bearer_congs++;
1513                                 return PUSH_FAILED;
1514                         }
1515                 }
1516         }
1517         return PUSH_FINISHED;
1518 }
1519
1520 /*
1521  * push_queue(): push out the unsent messages of a link where
1522  *               congestion has abated. Node is locked
1523  */
1524 void tipc_link_push_queue(struct link *l_ptr)
1525 {
1526         u32 res;
1527
1528         if (tipc_bearer_congested(l_ptr->b_ptr, l_ptr))
1529                 return;
1530
1531         do {
1532                 res = tipc_link_push_packet(l_ptr);
1533         } while (!res);
1534
1535         if (res == PUSH_FAILED)
1536                 tipc_bearer_schedule(l_ptr->b_ptr, l_ptr);
1537 }
1538
1539 static void link_reset_all(unsigned long addr)
1540 {
1541         struct tipc_node *n_ptr;
1542         char addr_string[16];
1543         u32 i;
1544
1545         read_lock_bh(&tipc_net_lock);
1546         n_ptr = tipc_node_find((u32)addr);
1547         if (!n_ptr) {
1548                 read_unlock_bh(&tipc_net_lock);
1549                 return; /* node no longer exists */
1550         }
1551
1552         tipc_node_lock(n_ptr);
1553
1554         warn("Resetting all links to %s\n",
1555              tipc_addr_string_fill(addr_string, n_ptr->addr));
1556
1557         for (i = 0; i < MAX_BEARERS; i++) {
1558                 if (n_ptr->links[i]) {
1559                         link_print(n_ptr->links[i], TIPC_OUTPUT,
1560                                    "Resetting link\n");
1561                         tipc_link_reset(n_ptr->links[i]);
1562                 }
1563         }
1564
1565         tipc_node_unlock(n_ptr);
1566         read_unlock_bh(&tipc_net_lock);
1567 }
1568
1569 static void link_retransmit_failure(struct link *l_ptr, struct sk_buff *buf)
1570 {
1571         struct tipc_msg *msg = buf_msg(buf);
1572
1573         warn("Retransmission failure on link <%s>\n", l_ptr->name);
1574         tipc_msg_dbg(TIPC_OUTPUT, msg, ">RETR-FAIL>");
1575
1576         if (l_ptr->addr) {
1577
1578                 /* Handle failure on standard link */
1579
1580                 link_print(l_ptr, TIPC_OUTPUT, "Resetting link\n");
1581                 tipc_link_reset(l_ptr);
1582
1583         } else {
1584
1585                 /* Handle failure on broadcast link */
1586
1587                 struct tipc_node *n_ptr;
1588                 char addr_string[16];
1589
1590                 tipc_printf(TIPC_OUTPUT, "Msg seq number: %u,  ", msg_seqno(msg));
1591                 tipc_printf(TIPC_OUTPUT, "Outstanding acks: %lu\n",
1592                                      (unsigned long) TIPC_SKB_CB(buf)->handle);
1593
1594                 n_ptr = l_ptr->owner->next;
1595                 tipc_node_lock(n_ptr);
1596
1597                 tipc_addr_string_fill(addr_string, n_ptr->addr);
1598                 tipc_printf(TIPC_OUTPUT, "Multicast link info for %s\n", addr_string);
1599                 tipc_printf(TIPC_OUTPUT, "Supported: %d,  ", n_ptr->bclink.supported);
1600                 tipc_printf(TIPC_OUTPUT, "Acked: %u\n", n_ptr->bclink.acked);
1601                 tipc_printf(TIPC_OUTPUT, "Last in: %u,  ", n_ptr->bclink.last_in);
1602                 tipc_printf(TIPC_OUTPUT, "Gap after: %u,  ", n_ptr->bclink.gap_after);
1603                 tipc_printf(TIPC_OUTPUT, "Gap to: %u\n", n_ptr->bclink.gap_to);
1604                 tipc_printf(TIPC_OUTPUT, "Nack sync: %u\n\n", n_ptr->bclink.nack_sync);
1605
1606                 tipc_k_signal((Handler)link_reset_all, (unsigned long)n_ptr->addr);
1607
1608                 tipc_node_unlock(n_ptr);
1609
1610                 l_ptr->stale_count = 0;
1611         }
1612 }
1613
1614 void tipc_link_retransmit(struct link *l_ptr, struct sk_buff *buf,
1615                           u32 retransmits)
1616 {
1617         struct tipc_msg *msg;
1618
1619         if (!buf)
1620                 return;
1621
1622         msg = buf_msg(buf);
1623
1624         dbg("Retransmitting %u in link %x\n", retransmits, l_ptr);
1625
1626         if (tipc_bearer_congested(l_ptr->b_ptr, l_ptr)) {
1627                 if (l_ptr->retransm_queue_size == 0) {
1628                         msg_dbg(msg, ">NO_RETR->BCONG>");
1629                         dbg_print_link(l_ptr, "   ");
1630                         l_ptr->retransm_queue_head = msg_seqno(msg);
1631                         l_ptr->retransm_queue_size = retransmits;
1632                 } else {
1633                         err("Unexpected retransmit on link %s (qsize=%d)\n",
1634                             l_ptr->name, l_ptr->retransm_queue_size);
1635                 }
1636                 return;
1637         } else {
1638                 /* Detect repeated retransmit failures on uncongested bearer */
1639
1640                 if (l_ptr->last_retransmitted == msg_seqno(msg)) {
1641                         if (++l_ptr->stale_count > 100) {
1642                                 link_retransmit_failure(l_ptr, buf);
1643                                 return;
1644                         }
1645                 } else {
1646                         l_ptr->last_retransmitted = msg_seqno(msg);
1647                         l_ptr->stale_count = 1;
1648                 }
1649         }
1650
1651         while (retransmits && (buf != l_ptr->next_out) && buf) {
1652                 msg = buf_msg(buf);
1653                 msg_set_ack(msg, mod(l_ptr->next_in_no - 1));
1654                 msg_set_bcast_ack(msg, l_ptr->owner->bclink.last_in);
1655                 if (tipc_bearer_send(l_ptr->b_ptr, buf, &l_ptr->media_addr)) {
1656                         msg_dbg(buf_msg(buf), ">RETR>");
1657                         buf = buf->next;
1658                         retransmits--;
1659                         l_ptr->stats.retransmitted++;
1660                 } else {
1661                         tipc_bearer_schedule(l_ptr->b_ptr, l_ptr);
1662                         l_ptr->stats.bearer_congs++;
1663                         l_ptr->retransm_queue_head = msg_seqno(buf_msg(buf));
1664                         l_ptr->retransm_queue_size = retransmits;
1665                         return;
1666                 }
1667         }
1668
1669         l_ptr->retransm_queue_head = l_ptr->retransm_queue_size = 0;
1670 }
1671
1672 /**
1673  * link_insert_deferred_queue - insert deferred messages back into receive chain
1674  */
1675
1676 static struct sk_buff *link_insert_deferred_queue(struct link *l_ptr,
1677                                                   struct sk_buff *buf)
1678 {
1679         u32 seq_no;
1680
1681         if (l_ptr->oldest_deferred_in == NULL)
1682                 return buf;
1683
1684         seq_no = msg_seqno(buf_msg(l_ptr->oldest_deferred_in));
1685         if (seq_no == mod(l_ptr->next_in_no)) {
1686                 l_ptr->newest_deferred_in->next = buf;
1687                 buf = l_ptr->oldest_deferred_in;
1688                 l_ptr->oldest_deferred_in = NULL;
1689                 l_ptr->deferred_inqueue_sz = 0;
1690         }
1691         return buf;
1692 }
1693
1694 /**
1695  * link_recv_buf_validate - validate basic format of received message
1696  *
1697  * This routine ensures a TIPC message has an acceptable header, and at least
1698  * as much data as the header indicates it should.  The routine also ensures
1699  * that the entire message header is stored in the main fragment of the message
1700  * buffer, to simplify future access to message header fields.
1701  *
1702  * Note: Having extra info present in the message header or data areas is OK.
1703  * TIPC will ignore the excess, under the assumption that it is optional info
1704  * introduced by a later release of the protocol.
1705  */
1706
1707 static int link_recv_buf_validate(struct sk_buff *buf)
1708 {
1709         static u32 min_data_hdr_size[8] = {
1710                 SHORT_H_SIZE, MCAST_H_SIZE, LONG_H_SIZE, DIR_MSG_H_SIZE,
1711                 MAX_H_SIZE, MAX_H_SIZE, MAX_H_SIZE, MAX_H_SIZE
1712                 };
1713
1714         struct tipc_msg *msg;
1715         u32 tipc_hdr[2];
1716         u32 size;
1717         u32 hdr_size;
1718         u32 min_hdr_size;
1719
1720         if (unlikely(buf->len < MIN_H_SIZE))
1721                 return 0;
1722
1723         msg = skb_header_pointer(buf, 0, sizeof(tipc_hdr), tipc_hdr);
1724         if (msg == NULL)
1725                 return 0;
1726
1727         if (unlikely(msg_version(msg) != TIPC_VERSION))
1728                 return 0;
1729
1730         size = msg_size(msg);
1731         hdr_size = msg_hdr_sz(msg);
1732         min_hdr_size = msg_isdata(msg) ?
1733                 min_data_hdr_size[msg_type(msg)] : INT_H_SIZE;
1734
1735         if (unlikely((hdr_size < min_hdr_size) ||
1736                      (size < hdr_size) ||
1737                      (buf->len < size) ||
1738                      (size - hdr_size > TIPC_MAX_USER_MSG_SIZE)))
1739                 return 0;
1740
1741         return pskb_may_pull(buf, hdr_size);
1742 }
1743
1744 /**
1745  * tipc_recv_msg - process TIPC messages arriving from off-node
1746  * @head: pointer to message buffer chain
1747  * @tb_ptr: pointer to bearer message arrived on
1748  *
1749  * Invoked with no locks held.  Bearer pointer must point to a valid bearer
1750  * structure (i.e. cannot be NULL), but bearer can be inactive.
1751  */
1752
1753 void tipc_recv_msg(struct sk_buff *head, struct tipc_bearer *tb_ptr)
1754 {
1755         read_lock_bh(&tipc_net_lock);
1756         while (head) {
1757                 struct bearer *b_ptr = (struct bearer *)tb_ptr;
1758                 struct tipc_node *n_ptr;
1759                 struct link *l_ptr;
1760                 struct sk_buff *crs;
1761                 struct sk_buff *buf = head;
1762                 struct tipc_msg *msg;
1763                 u32 seq_no;
1764                 u32 ackd;
1765                 u32 released = 0;
1766                 int type;
1767
1768                 head = head->next;
1769
1770                 /* Ensure bearer is still enabled */
1771
1772                 if (unlikely(!b_ptr->active))
1773                         goto cont;
1774
1775                 /* Ensure message is well-formed */
1776
1777                 if (unlikely(!link_recv_buf_validate(buf)))
1778                         goto cont;
1779
1780                 /* Ensure message data is a single contiguous unit */
1781
1782                 if (unlikely(buf_linearize(buf))) {
1783                         goto cont;
1784                 }
1785
1786                 /* Handle arrival of a non-unicast link message */
1787
1788                 msg = buf_msg(buf);
1789
1790                 if (unlikely(msg_non_seq(msg))) {
1791                         if (msg_user(msg) ==  LINK_CONFIG)
1792                                 tipc_disc_recv_msg(buf, b_ptr);
1793                         else
1794                                 tipc_bclink_recv_pkt(buf);
1795                         continue;
1796                 }
1797
1798                 if (unlikely(!msg_short(msg) &&
1799                              (msg_destnode(msg) != tipc_own_addr)))
1800                         goto cont;
1801
1802                 /* Discard non-routeable messages destined for another node */
1803
1804                 if (unlikely(!msg_isdata(msg) &&
1805                              (msg_destnode(msg) != tipc_own_addr))) {
1806                         if ((msg_user(msg) != CONN_MANAGER) &&
1807                             (msg_user(msg) != MSG_FRAGMENTER))
1808                                 goto cont;
1809                 }
1810
1811                 /* Locate neighboring node that sent message */
1812
1813                 n_ptr = tipc_node_find(msg_prevnode(msg));
1814                 if (unlikely(!n_ptr))
1815                         goto cont;
1816                 tipc_node_lock(n_ptr);
1817
1818                 /* Don't talk to neighbor during cleanup after last session */
1819
1820                 if (n_ptr->cleanup_required) {
1821                         tipc_node_unlock(n_ptr);
1822                         goto cont;
1823                 }
1824
1825                 /* Locate unicast link endpoint that should handle message */
1826
1827                 l_ptr = n_ptr->links[b_ptr->identity];
1828                 if (unlikely(!l_ptr)) {
1829                         tipc_node_unlock(n_ptr);
1830                         goto cont;
1831                 }
1832
1833                 /* Validate message sequence number info */
1834
1835                 seq_no = msg_seqno(msg);
1836                 ackd = msg_ack(msg);
1837
1838                 /* Release acked messages */
1839
1840                 if (less(n_ptr->bclink.acked, msg_bcast_ack(msg))) {
1841                         if (tipc_node_is_up(n_ptr) && n_ptr->bclink.supported)
1842                                 tipc_bclink_acknowledge(n_ptr, msg_bcast_ack(msg));
1843                 }
1844
1845                 crs = l_ptr->first_out;
1846                 while ((crs != l_ptr->next_out) &&
1847                        less_eq(msg_seqno(buf_msg(crs)), ackd)) {
1848                         struct sk_buff *next = crs->next;
1849
1850                         buf_discard(crs);
1851                         crs = next;
1852                         released++;
1853                 }
1854                 if (released) {
1855                         l_ptr->first_out = crs;
1856                         l_ptr->out_queue_size -= released;
1857                 }
1858
1859                 /* Try sending any messages link endpoint has pending */
1860
1861                 if (unlikely(l_ptr->next_out))
1862                         tipc_link_push_queue(l_ptr);
1863                 if (unlikely(!list_empty(&l_ptr->waiting_ports)))
1864                         tipc_link_wakeup_ports(l_ptr, 0);
1865                 if (unlikely(++l_ptr->unacked_window >= TIPC_MIN_LINK_WIN)) {
1866                         l_ptr->stats.sent_acks++;
1867                         tipc_link_send_proto_msg(l_ptr, STATE_MSG, 0, 0, 0, 0, 0);
1868                 }
1869
1870                 /* Now (finally!) process the incoming message */
1871
1872 protocol_check:
1873                 if (likely(link_working_working(l_ptr))) {
1874                         if (likely(seq_no == mod(l_ptr->next_in_no))) {
1875                                 l_ptr->next_in_no++;
1876                                 if (unlikely(l_ptr->oldest_deferred_in))
1877                                         head = link_insert_deferred_queue(l_ptr,
1878                                                                           head);
1879                                 if (likely(msg_is_dest(msg, tipc_own_addr))) {
1880 deliver:
1881                                         if (likely(msg_isdata(msg))) {
1882                                                 tipc_node_unlock(n_ptr);
1883                                                 tipc_port_recv_msg(buf);
1884                                                 continue;
1885                                         }
1886                                         switch (msg_user(msg)) {
1887                                         case MSG_BUNDLER:
1888                                                 l_ptr->stats.recv_bundles++;
1889                                                 l_ptr->stats.recv_bundled +=
1890                                                         msg_msgcnt(msg);
1891                                                 tipc_node_unlock(n_ptr);
1892                                                 tipc_link_recv_bundle(buf);
1893                                                 continue;
1894                                         case ROUTE_DISTRIBUTOR:
1895                                                 tipc_node_unlock(n_ptr);
1896                                                 tipc_cltr_recv_routing_table(buf);
1897                                                 continue;
1898                                         case NAME_DISTRIBUTOR:
1899                                                 tipc_node_unlock(n_ptr);
1900                                                 tipc_named_recv(buf);
1901                                                 continue;
1902                                         case CONN_MANAGER:
1903                                                 tipc_node_unlock(n_ptr);
1904                                                 tipc_port_recv_proto_msg(buf);
1905                                                 continue;
1906                                         case MSG_FRAGMENTER:
1907                                                 l_ptr->stats.recv_fragments++;
1908                                                 if (tipc_link_recv_fragment(&l_ptr->defragm_buf,
1909                                                                             &buf, &msg)) {
1910                                                         l_ptr->stats.recv_fragmented++;
1911                                                         goto deliver;
1912                                                 }
1913                                                 break;
1914                                         case CHANGEOVER_PROTOCOL:
1915                                                 type = msg_type(msg);
1916                                                 if (link_recv_changeover_msg(&l_ptr, &buf)) {
1917                                                         msg = buf_msg(buf);
1918                                                         seq_no = msg_seqno(msg);
1919                                                         if (type == ORIGINAL_MSG)
1920                                                                 goto deliver;
1921                                                         goto protocol_check;
1922                                                 }
1923                                                 break;
1924                                         }
1925                                 }
1926                                 tipc_node_unlock(n_ptr);
1927                                 tipc_net_route_msg(buf);
1928                                 continue;
1929                         }
1930                         link_handle_out_of_seq_msg(l_ptr, buf);
1931                         head = link_insert_deferred_queue(l_ptr, head);
1932                         tipc_node_unlock(n_ptr);
1933                         continue;
1934                 }
1935
1936                 if (msg_user(msg) == LINK_PROTOCOL) {
1937                         link_recv_proto_msg(l_ptr, buf);
1938                         head = link_insert_deferred_queue(l_ptr, head);
1939                         tipc_node_unlock(n_ptr);
1940                         continue;
1941                 }
1942                 msg_dbg(msg,"NSEQ<REC<");
1943                 link_state_event(l_ptr, TRAFFIC_MSG_EVT);
1944
1945                 if (link_working_working(l_ptr)) {
1946                         /* Re-insert in front of queue */
1947                         msg_dbg(msg,"RECV-REINS:");
1948                         buf->next = head;
1949                         head = buf;
1950                         tipc_node_unlock(n_ptr);
1951                         continue;
1952                 }
1953                 tipc_node_unlock(n_ptr);
1954 cont:
1955                 buf_discard(buf);
1956         }
1957         read_unlock_bh(&tipc_net_lock);
1958 }
1959
1960 /*
1961  * link_defer_buf(): Sort a received out-of-sequence packet
1962  *                   into the deferred reception queue.
1963  * Returns the increase of the queue length,i.e. 0 or 1
1964  */
1965
1966 u32 tipc_link_defer_pkt(struct sk_buff **head,
1967                         struct sk_buff **tail,
1968                         struct sk_buff *buf)
1969 {
1970         struct sk_buff *prev = NULL;
1971         struct sk_buff *crs = *head;
1972         u32 seq_no = msg_seqno(buf_msg(buf));
1973
1974         buf->next = NULL;
1975
1976         /* Empty queue ? */
1977         if (*head == NULL) {
1978                 *head = *tail = buf;
1979                 return 1;
1980         }
1981
1982         /* Last ? */
1983         if (less(msg_seqno(buf_msg(*tail)), seq_no)) {
1984                 (*tail)->next = buf;
1985                 *tail = buf;
1986                 return 1;
1987         }
1988
1989         /* Scan through queue and sort it in */
1990         do {
1991                 struct tipc_msg *msg = buf_msg(crs);
1992
1993                 if (less(seq_no, msg_seqno(msg))) {
1994                         buf->next = crs;
1995                         if (prev)
1996                                 prev->next = buf;
1997                         else
1998                                 *head = buf;
1999                         return 1;
2000                 }
2001                 if (seq_no == msg_seqno(msg)) {
2002                         break;
2003                 }
2004                 prev = crs;
2005                 crs = crs->next;
2006         }
2007         while (crs);
2008
2009         /* Message is a duplicate of an existing message */
2010
2011         buf_discard(buf);
2012         return 0;
2013 }
2014
2015 /**
2016  * link_handle_out_of_seq_msg - handle arrival of out-of-sequence packet
2017  */
2018
2019 static void link_handle_out_of_seq_msg(struct link *l_ptr,
2020                                        struct sk_buff *buf)
2021 {
2022         u32 seq_no = msg_seqno(buf_msg(buf));
2023
2024         if (likely(msg_user(buf_msg(buf)) == LINK_PROTOCOL)) {
2025                 link_recv_proto_msg(l_ptr, buf);
2026                 return;
2027         }
2028
2029         dbg("rx OOS msg: seq_no %u, expecting %u (%u)\n",
2030             seq_no, mod(l_ptr->next_in_no), l_ptr->next_in_no);
2031
2032         /* Record OOS packet arrival (force mismatch on next timeout) */
2033
2034         l_ptr->checkpoint--;
2035
2036         /*
2037          * Discard packet if a duplicate; otherwise add it to deferred queue
2038          * and notify peer of gap as per protocol specification
2039          */
2040
2041         if (less(seq_no, mod(l_ptr->next_in_no))) {
2042                 l_ptr->stats.duplicates++;
2043                 buf_discard(buf);
2044                 return;
2045         }
2046
2047         if (tipc_link_defer_pkt(&l_ptr->oldest_deferred_in,
2048                                 &l_ptr->newest_deferred_in, buf)) {
2049                 l_ptr->deferred_inqueue_sz++;
2050                 l_ptr->stats.deferred_recv++;
2051                 if ((l_ptr->deferred_inqueue_sz % 16) == 1)
2052                         tipc_link_send_proto_msg(l_ptr, STATE_MSG, 0, 0, 0, 0, 0);
2053         } else
2054                 l_ptr->stats.duplicates++;
2055 }
2056
2057 /*
2058  * Send protocol message to the other endpoint.
2059  */
2060 void tipc_link_send_proto_msg(struct link *l_ptr, u32 msg_typ, int probe_msg,
2061                               u32 gap, u32 tolerance, u32 priority, u32 ack_mtu)
2062 {
2063         struct sk_buff *buf = NULL;
2064         struct tipc_msg *msg = l_ptr->pmsg;
2065         u32 msg_size = sizeof(l_ptr->proto_msg);
2066
2067         if (link_blocked(l_ptr))
2068                 return;
2069         msg_set_type(msg, msg_typ);
2070         msg_set_net_plane(msg, l_ptr->b_ptr->net_plane);
2071         msg_set_bcast_ack(msg, mod(l_ptr->owner->bclink.last_in));
2072         msg_set_last_bcast(msg, tipc_bclink_get_last_sent());
2073
2074         if (msg_typ == STATE_MSG) {
2075                 u32 next_sent = mod(l_ptr->next_out_no);
2076
2077                 if (!tipc_link_is_up(l_ptr))
2078                         return;
2079                 if (l_ptr->next_out)
2080                         next_sent = msg_seqno(buf_msg(l_ptr->next_out));
2081                 msg_set_next_sent(msg, next_sent);
2082                 if (l_ptr->oldest_deferred_in) {
2083                         u32 rec = msg_seqno(buf_msg(l_ptr->oldest_deferred_in));
2084                         gap = mod(rec - mod(l_ptr->next_in_no));
2085                 }
2086                 msg_set_seq_gap(msg, gap);
2087                 if (gap)
2088                         l_ptr->stats.sent_nacks++;
2089                 msg_set_link_tolerance(msg, tolerance);
2090                 msg_set_linkprio(msg, priority);
2091                 msg_set_max_pkt(msg, ack_mtu);
2092                 msg_set_ack(msg, mod(l_ptr->next_in_no - 1));
2093                 msg_set_probe(msg, probe_msg != 0);
2094                 if (probe_msg) {
2095                         u32 mtu = l_ptr->max_pkt;
2096
2097                         if ((mtu < l_ptr->max_pkt_target) &&
2098                             link_working_working(l_ptr) &&
2099                             l_ptr->fsm_msg_cnt) {
2100                                 msg_size = (mtu + (l_ptr->max_pkt_target - mtu)/2 + 2) & ~3;
2101                                 if (l_ptr->max_pkt_probes == 10) {
2102                                         l_ptr->max_pkt_target = (msg_size - 4);
2103                                         l_ptr->max_pkt_probes = 0;
2104                                         msg_size = (mtu + (l_ptr->max_pkt_target - mtu)/2 + 2) & ~3;
2105                                 }
2106                                 l_ptr->max_pkt_probes++;
2107                         }
2108
2109                         l_ptr->stats.sent_probes++;
2110                 }
2111                 l_ptr->stats.sent_states++;
2112         } else {                /* RESET_MSG or ACTIVATE_MSG */
2113                 msg_set_ack(msg, mod(l_ptr->reset_checkpoint - 1));
2114                 msg_set_seq_gap(msg, 0);
2115                 msg_set_next_sent(msg, 1);
2116                 msg_set_link_tolerance(msg, l_ptr->tolerance);
2117                 msg_set_linkprio(msg, l_ptr->priority);
2118                 msg_set_max_pkt(msg, l_ptr->max_pkt_target);
2119         }
2120
2121         if (tipc_node_has_redundant_links(l_ptr->owner)) {
2122                 msg_set_redundant_link(msg);
2123         } else {
2124                 msg_clear_redundant_link(msg);
2125         }
2126         msg_set_linkprio(msg, l_ptr->priority);
2127
2128         /* Ensure sequence number will not fit : */
2129
2130         msg_set_seqno(msg, mod(l_ptr->next_out_no + (0xffff/2)));
2131
2132         /* Congestion? */
2133
2134         if (tipc_bearer_congested(l_ptr->b_ptr, l_ptr)) {
2135                 if (!l_ptr->proto_msg_queue) {
2136                         l_ptr->proto_msg_queue =
2137                                 tipc_buf_acquire(sizeof(l_ptr->proto_msg));
2138                 }
2139                 buf = l_ptr->proto_msg_queue;
2140                 if (!buf)
2141                         return;
2142                 skb_copy_to_linear_data(buf, msg, sizeof(l_ptr->proto_msg));
2143                 return;
2144         }
2145         msg_set_timestamp(msg, jiffies_to_msecs(jiffies));
2146
2147         /* Message can be sent */
2148
2149         msg_dbg(msg, ">>");
2150
2151         buf = tipc_buf_acquire(msg_size);
2152         if (!buf)
2153                 return;
2154
2155         skb_copy_to_linear_data(buf, msg, sizeof(l_ptr->proto_msg));
2156         msg_set_size(buf_msg(buf), msg_size);
2157
2158         if (tipc_bearer_send(l_ptr->b_ptr, buf, &l_ptr->media_addr)) {
2159                 l_ptr->unacked_window = 0;
2160                 buf_discard(buf);
2161                 return;
2162         }
2163
2164         /* New congestion */
2165         tipc_bearer_schedule(l_ptr->b_ptr, l_ptr);
2166         l_ptr->proto_msg_queue = buf;
2167         l_ptr->stats.bearer_congs++;
2168 }
2169
2170 /*
2171  * Receive protocol message :
2172  * Note that network plane id propagates through the network, and may
2173  * change at any time. The node with lowest address rules
2174  */
2175
2176 static void link_recv_proto_msg(struct link *l_ptr, struct sk_buff *buf)
2177 {
2178         u32 rec_gap = 0;
2179         u32 max_pkt_info;
2180         u32 max_pkt_ack;
2181         u32 msg_tol;
2182         struct tipc_msg *msg = buf_msg(buf);
2183
2184         dbg("AT(%u):", jiffies_to_msecs(jiffies));
2185         msg_dbg(msg, "<<");
2186         if (link_blocked(l_ptr))
2187                 goto exit;
2188
2189         /* record unnumbered packet arrival (force mismatch on next timeout) */
2190
2191         l_ptr->checkpoint--;
2192
2193         if (l_ptr->b_ptr->net_plane != msg_net_plane(msg))
2194                 if (tipc_own_addr > msg_prevnode(msg))
2195                         l_ptr->b_ptr->net_plane = msg_net_plane(msg);
2196
2197         l_ptr->owner->permit_changeover = msg_redundant_link(msg);
2198
2199         switch (msg_type(msg)) {
2200
2201         case RESET_MSG:
2202                 if (!link_working_unknown(l_ptr) &&
2203                     (l_ptr->peer_session != INVALID_SESSION)) {
2204                         if (msg_session(msg) == l_ptr->peer_session) {
2205                                 dbg("Duplicate RESET: %u<->%u\n",
2206                                     msg_session(msg), l_ptr->peer_session);
2207                                 break; /* duplicate: ignore */
2208                         }
2209                 }
2210                 /* fall thru' */
2211         case ACTIVATE_MSG:
2212                 /* Update link settings according other endpoint's values */
2213
2214                 strcpy((strrchr(l_ptr->name, ':') + 1), (char *)msg_data(msg));
2215
2216                 if ((msg_tol = msg_link_tolerance(msg)) &&
2217                     (msg_tol > l_ptr->tolerance))
2218                         link_set_supervision_props(l_ptr, msg_tol);
2219
2220                 if (msg_linkprio(msg) > l_ptr->priority)
2221                         l_ptr->priority = msg_linkprio(msg);
2222
2223                 max_pkt_info = msg_max_pkt(msg);
2224                 if (max_pkt_info) {
2225                         if (max_pkt_info < l_ptr->max_pkt_target)
2226                                 l_ptr->max_pkt_target = max_pkt_info;
2227                         if (l_ptr->max_pkt > l_ptr->max_pkt_target)
2228                                 l_ptr->max_pkt = l_ptr->max_pkt_target;
2229                 } else {
2230                         l_ptr->max_pkt = l_ptr->max_pkt_target;
2231                 }
2232                 l_ptr->owner->bclink.supported = (max_pkt_info != 0);
2233
2234                 link_state_event(l_ptr, msg_type(msg));
2235
2236                 l_ptr->peer_session = msg_session(msg);
2237                 l_ptr->peer_bearer_id = msg_bearer_id(msg);
2238
2239                 /* Synchronize broadcast sequence numbers */
2240                 if (!tipc_node_has_redundant_links(l_ptr->owner)) {
2241                         l_ptr->owner->bclink.last_in = mod(msg_last_bcast(msg));
2242                 }
2243                 break;
2244         case STATE_MSG:
2245
2246                 if ((msg_tol = msg_link_tolerance(msg)))
2247                         link_set_supervision_props(l_ptr, msg_tol);
2248
2249                 if (msg_linkprio(msg) &&
2250                     (msg_linkprio(msg) != l_ptr->priority)) {
2251                         warn("Resetting link <%s>, priority change %u->%u\n",
2252                              l_ptr->name, l_ptr->priority, msg_linkprio(msg));
2253                         l_ptr->priority = msg_linkprio(msg);
2254                         tipc_link_reset(l_ptr); /* Enforce change to take effect */
2255                         break;
2256                 }
2257                 link_state_event(l_ptr, TRAFFIC_MSG_EVT);
2258                 l_ptr->stats.recv_states++;
2259                 if (link_reset_unknown(l_ptr))
2260                         break;
2261
2262                 if (less_eq(mod(l_ptr->next_in_no), msg_next_sent(msg))) {
2263                         rec_gap = mod(msg_next_sent(msg) -
2264                                       mod(l_ptr->next_in_no));
2265                 }
2266
2267                 max_pkt_ack = msg_max_pkt(msg);
2268                 if (max_pkt_ack > l_ptr->max_pkt) {
2269                         dbg("Link <%s> updated MTU %u -> %u\n",
2270                             l_ptr->name, l_ptr->max_pkt, max_pkt_ack);
2271                         l_ptr->max_pkt = max_pkt_ack;
2272                         l_ptr->max_pkt_probes = 0;
2273                 }
2274
2275                 max_pkt_ack = 0;
2276                 if (msg_probe(msg)) {
2277                         l_ptr->stats.recv_probes++;
2278                         if (msg_size(msg) > sizeof(l_ptr->proto_msg)) {
2279                                 max_pkt_ack = msg_size(msg);
2280                         }
2281                 }
2282
2283                 /* Protocol message before retransmits, reduce loss risk */
2284
2285                 tipc_bclink_check_gap(l_ptr->owner, msg_last_bcast(msg));
2286
2287                 if (rec_gap || (msg_probe(msg))) {
2288                         tipc_link_send_proto_msg(l_ptr, STATE_MSG,
2289                                                  0, rec_gap, 0, 0, max_pkt_ack);
2290                 }
2291                 if (msg_seq_gap(msg)) {
2292                         msg_dbg(msg, "With Gap:");
2293                         l_ptr->stats.recv_nacks++;
2294                         tipc_link_retransmit(l_ptr, l_ptr->first_out,
2295                                              msg_seq_gap(msg));
2296                 }
2297                 break;
2298         default:
2299                 msg_dbg(buf_msg(buf), "<DISCARDING UNKNOWN<");
2300         }
2301 exit:
2302         buf_discard(buf);
2303 }
2304
2305
2306 /*
2307  * tipc_link_tunnel(): Send one message via a link belonging to
2308  * another bearer. Owner node is locked.
2309  */
2310 static void tipc_link_tunnel(struct link *l_ptr,
2311                              struct tipc_msg *tunnel_hdr,
2312                              struct tipc_msg  *msg,
2313                              u32 selector)
2314 {
2315         struct link *tunnel;
2316         struct sk_buff *buf;
2317         u32 length = msg_size(msg);
2318
2319         tunnel = l_ptr->owner->active_links[selector & 1];
2320         if (!tipc_link_is_up(tunnel)) {
2321                 warn("Link changeover error, "
2322                      "tunnel link no longer available\n");
2323                 return;
2324         }
2325         msg_set_size(tunnel_hdr, length + INT_H_SIZE);
2326         buf = tipc_buf_acquire(length + INT_H_SIZE);
2327         if (!buf) {
2328                 warn("Link changeover error, "
2329                      "unable to send tunnel msg\n");
2330                 return;
2331         }
2332         skb_copy_to_linear_data(buf, tunnel_hdr, INT_H_SIZE);
2333         skb_copy_to_linear_data_offset(buf, INT_H_SIZE, msg, length);
2334         dbg("%c->%c:", l_ptr->b_ptr->net_plane, tunnel->b_ptr->net_plane);
2335         msg_dbg(buf_msg(buf), ">SEND>");
2336         tipc_link_send_buf(tunnel, buf);
2337 }
2338
2339
2340
2341 /*
2342  * changeover(): Send whole message queue via the remaining link
2343  *               Owner node is locked.
2344  */
2345
2346 void tipc_link_changeover(struct link *l_ptr)
2347 {
2348         u32 msgcount = l_ptr->out_queue_size;
2349         struct sk_buff *crs = l_ptr->first_out;
2350         struct link *tunnel = l_ptr->owner->active_links[0];
2351         struct tipc_msg tunnel_hdr;
2352         int split_bundles;
2353
2354         if (!tunnel)
2355                 return;
2356
2357         if (!l_ptr->owner->permit_changeover) {
2358                 warn("Link changeover error, "
2359                      "peer did not permit changeover\n");
2360                 return;
2361         }
2362
2363         tipc_msg_init(&tunnel_hdr, CHANGEOVER_PROTOCOL,
2364                  ORIGINAL_MSG, INT_H_SIZE, l_ptr->addr);
2365         msg_set_bearer_id(&tunnel_hdr, l_ptr->peer_bearer_id);
2366         msg_set_msgcnt(&tunnel_hdr, msgcount);
2367         dbg("Link changeover requires %u tunnel messages\n", msgcount);
2368
2369         if (!l_ptr->first_out) {
2370                 struct sk_buff *buf;
2371
2372                 buf = tipc_buf_acquire(INT_H_SIZE);
2373                 if (buf) {
2374                         skb_copy_to_linear_data(buf, &tunnel_hdr, INT_H_SIZE);
2375                         msg_set_size(&tunnel_hdr, INT_H_SIZE);
2376                         dbg("%c->%c:", l_ptr->b_ptr->net_plane,
2377                             tunnel->b_ptr->net_plane);
2378                         msg_dbg(&tunnel_hdr, "EMPTY>SEND>");
2379                         tipc_link_send_buf(tunnel, buf);
2380                 } else {
2381                         warn("Link changeover error, "
2382                              "unable to send changeover msg\n");
2383                 }
2384                 return;
2385         }
2386
2387         split_bundles = (l_ptr->owner->active_links[0] !=
2388                          l_ptr->owner->active_links[1]);
2389
2390         while (crs) {
2391                 struct tipc_msg *msg = buf_msg(crs);
2392
2393                 if ((msg_user(msg) == MSG_BUNDLER) && split_bundles) {
2394                         struct tipc_msg *m = msg_get_wrapped(msg);
2395                         unchar* pos = (unchar*)m;
2396
2397                         msgcount = msg_msgcnt(msg);
2398                         while (msgcount--) {
2399                                 msg_set_seqno(m,msg_seqno(msg));
2400                                 tipc_link_tunnel(l_ptr, &tunnel_hdr, m,
2401                                                  msg_link_selector(m));
2402                                 pos += align(msg_size(m));
2403                                 m = (struct tipc_msg *)pos;
2404                         }
2405                 } else {
2406                         tipc_link_tunnel(l_ptr, &tunnel_hdr, msg,
2407                                          msg_link_selector(msg));
2408                 }
2409                 crs = crs->next;
2410         }
2411 }
2412
2413 void tipc_link_send_duplicate(struct link *l_ptr, struct link *tunnel)
2414 {
2415         struct sk_buff *iter;
2416         struct tipc_msg tunnel_hdr;
2417
2418         tipc_msg_init(&tunnel_hdr, CHANGEOVER_PROTOCOL,
2419                  DUPLICATE_MSG, INT_H_SIZE, l_ptr->addr);
2420         msg_set_msgcnt(&tunnel_hdr, l_ptr->out_queue_size);
2421         msg_set_bearer_id(&tunnel_hdr, l_ptr->peer_bearer_id);
2422         iter = l_ptr->first_out;
2423         while (iter) {
2424                 struct sk_buff *outbuf;
2425                 struct tipc_msg *msg = buf_msg(iter);
2426                 u32 length = msg_size(msg);
2427
2428                 if (msg_user(msg) == MSG_BUNDLER)
2429                         msg_set_type(msg, CLOSED_MSG);
2430                 msg_set_ack(msg, mod(l_ptr->next_in_no - 1));   /* Update */
2431                 msg_set_bcast_ack(msg, l_ptr->owner->bclink.last_in);
2432                 msg_set_size(&tunnel_hdr, length + INT_H_SIZE);
2433                 outbuf = tipc_buf_acquire(length + INT_H_SIZE);
2434                 if (outbuf == NULL) {
2435                         warn("Link changeover error, "
2436                              "unable to send duplicate msg\n");
2437                         return;
2438                 }
2439                 skb_copy_to_linear_data(outbuf, &tunnel_hdr, INT_H_SIZE);
2440                 skb_copy_to_linear_data_offset(outbuf, INT_H_SIZE, iter->data,
2441                                                length);
2442                 dbg("%c->%c:", l_ptr->b_ptr->net_plane,
2443                     tunnel->b_ptr->net_plane);
2444                 msg_dbg(buf_msg(outbuf), ">SEND>");
2445                 tipc_link_send_buf(tunnel, outbuf);
2446                 if (!tipc_link_is_up(l_ptr))
2447                         return;
2448                 iter = iter->next;
2449         }
2450 }
2451
2452
2453
2454 /**
2455  * buf_extract - extracts embedded TIPC message from another message
2456  * @skb: encapsulating message buffer
2457  * @from_pos: offset to extract from
2458  *
2459  * Returns a new message buffer containing an embedded message.  The
2460  * encapsulating message itself is left unchanged.
2461  */
2462
2463 static struct sk_buff *buf_extract(struct sk_buff *skb, u32 from_pos)
2464 {
2465         struct tipc_msg *msg = (struct tipc_msg *)(skb->data + from_pos);
2466         u32 size = msg_size(msg);
2467         struct sk_buff *eb;
2468
2469         eb = tipc_buf_acquire(size);
2470         if (eb)
2471                 skb_copy_to_linear_data(eb, msg, size);
2472         return eb;
2473 }
2474
2475 /*
2476  *  link_recv_changeover_msg(): Receive tunneled packet sent
2477  *  via other link. Node is locked. Return extracted buffer.
2478  */
2479
2480 static int link_recv_changeover_msg(struct link **l_ptr,
2481                                     struct sk_buff **buf)
2482 {
2483         struct sk_buff *tunnel_buf = *buf;
2484         struct link *dest_link;
2485         struct tipc_msg *msg;
2486         struct tipc_msg *tunnel_msg = buf_msg(tunnel_buf);
2487         u32 msg_typ = msg_type(tunnel_msg);
2488         u32 msg_count = msg_msgcnt(tunnel_msg);
2489
2490         dest_link = (*l_ptr)->owner->links[msg_bearer_id(tunnel_msg)];
2491         if (!dest_link) {
2492                 msg_dbg(tunnel_msg, "NOLINK/<REC<");
2493                 goto exit;
2494         }
2495         if (dest_link == *l_ptr) {
2496                 err("Unexpected changeover message on link <%s>\n",
2497                     (*l_ptr)->name);
2498                 goto exit;
2499         }
2500         dbg("%c<-%c:", dest_link->b_ptr->net_plane,
2501             (*l_ptr)->b_ptr->net_plane);
2502         *l_ptr = dest_link;
2503         msg = msg_get_wrapped(tunnel_msg);
2504
2505         if (msg_typ == DUPLICATE_MSG) {
2506                 if (less(msg_seqno(msg), mod(dest_link->next_in_no))) {
2507                         msg_dbg(tunnel_msg, "DROP/<REC<");
2508                         goto exit;
2509                 }
2510                 *buf = buf_extract(tunnel_buf,INT_H_SIZE);
2511                 if (*buf == NULL) {
2512                         warn("Link changeover error, duplicate msg dropped\n");
2513                         goto exit;
2514                 }
2515                 msg_dbg(tunnel_msg, "TNL<REC<");
2516                 buf_discard(tunnel_buf);
2517                 return 1;
2518         }
2519
2520         /* First original message ?: */
2521
2522         if (tipc_link_is_up(dest_link)) {
2523                 msg_dbg(tunnel_msg, "UP/FIRST/<REC<");
2524                 info("Resetting link <%s>, changeover initiated by peer\n",
2525                      dest_link->name);
2526                 tipc_link_reset(dest_link);
2527                 dest_link->exp_msg_count = msg_count;
2528                 dbg("Expecting %u tunnelled messages\n", msg_count);
2529                 if (!msg_count)
2530                         goto exit;
2531         } else if (dest_link->exp_msg_count == START_CHANGEOVER) {
2532                 msg_dbg(tunnel_msg, "BLK/FIRST/<REC<");
2533                 dest_link->exp_msg_count = msg_count;
2534                 dbg("Expecting %u tunnelled messages\n", msg_count);
2535                 if (!msg_count)
2536                         goto exit;
2537         }
2538
2539         /* Receive original message */
2540
2541         if (dest_link->exp_msg_count == 0) {
2542                 warn("Link switchover error, "
2543                      "got too many tunnelled messages\n");
2544                 msg_dbg(tunnel_msg, "OVERDUE/DROP/<REC<");
2545                 dbg_print_link(dest_link, "LINK:");
2546                 goto exit;
2547         }
2548         dest_link->exp_msg_count--;
2549         if (less(msg_seqno(msg), dest_link->reset_checkpoint)) {
2550                 msg_dbg(tunnel_msg, "DROP/DUPL/<REC<");
2551                 goto exit;
2552         } else {
2553                 *buf = buf_extract(tunnel_buf, INT_H_SIZE);
2554                 if (*buf != NULL) {
2555                         msg_dbg(tunnel_msg, "TNL<REC<");
2556                         buf_discard(tunnel_buf);
2557                         return 1;
2558                 } else {
2559                         warn("Link changeover error, original msg dropped\n");
2560                 }
2561         }
2562 exit:
2563         *buf = NULL;
2564         buf_discard(tunnel_buf);
2565         return 0;
2566 }
2567
2568 /*
2569  *  Bundler functionality:
2570  */
2571 void tipc_link_recv_bundle(struct sk_buff *buf)
2572 {
2573         u32 msgcount = msg_msgcnt(buf_msg(buf));
2574         u32 pos = INT_H_SIZE;
2575         struct sk_buff *obuf;
2576
2577         msg_dbg(buf_msg(buf), "<BNDL<: ");
2578         while (msgcount--) {
2579                 obuf = buf_extract(buf, pos);
2580                 if (obuf == NULL) {
2581                         warn("Link unable to unbundle message(s)\n");
2582                         break;
2583                 }
2584                 pos += align(msg_size(buf_msg(obuf)));
2585                 msg_dbg(buf_msg(obuf), "     /");
2586                 tipc_net_route_msg(obuf);
2587         }
2588         buf_discard(buf);
2589 }
2590
2591 /*
2592  *  Fragmentation/defragmentation:
2593  */
2594
2595
2596 /*
2597  * link_send_long_buf: Entry for buffers needing fragmentation.
2598  * The buffer is complete, inclusive total message length.
2599  * Returns user data length.
2600  */
2601 static int link_send_long_buf(struct link *l_ptr, struct sk_buff *buf)
2602 {
2603         struct tipc_msg *inmsg = buf_msg(buf);
2604         struct tipc_msg fragm_hdr;
2605         u32 insize = msg_size(inmsg);
2606         u32 dsz = msg_data_sz(inmsg);
2607         unchar *crs = buf->data;
2608         u32 rest = insize;
2609         u32 pack_sz = l_ptr->max_pkt;
2610         u32 fragm_sz = pack_sz - INT_H_SIZE;
2611         u32 fragm_no = 1;
2612         u32 destaddr;
2613
2614         if (msg_short(inmsg))
2615                 destaddr = l_ptr->addr;
2616         else
2617                 destaddr = msg_destnode(inmsg);
2618
2619         if (msg_routed(inmsg))
2620                 msg_set_prevnode(inmsg, tipc_own_addr);
2621
2622         /* Prepare reusable fragment header: */
2623
2624         tipc_msg_init(&fragm_hdr, MSG_FRAGMENTER, FIRST_FRAGMENT,
2625                  INT_H_SIZE, destaddr);
2626         msg_set_link_selector(&fragm_hdr, msg_link_selector(inmsg));
2627         msg_set_long_msgno(&fragm_hdr, mod(l_ptr->long_msg_seq_no++));
2628         msg_set_fragm_no(&fragm_hdr, fragm_no);
2629         l_ptr->stats.sent_fragmented++;
2630
2631         /* Chop up message: */
2632
2633         while (rest > 0) {
2634                 struct sk_buff *fragm;
2635
2636                 if (rest <= fragm_sz) {
2637                         fragm_sz = rest;
2638                         msg_set_type(&fragm_hdr, LAST_FRAGMENT);
2639                 }
2640                 fragm = tipc_buf_acquire(fragm_sz + INT_H_SIZE);
2641                 if (fragm == NULL) {
2642                         warn("Link unable to fragment message\n");
2643                         dsz = -ENOMEM;
2644                         goto exit;
2645                 }
2646                 msg_set_size(&fragm_hdr, fragm_sz + INT_H_SIZE);
2647                 skb_copy_to_linear_data(fragm, &fragm_hdr, INT_H_SIZE);
2648                 skb_copy_to_linear_data_offset(fragm, INT_H_SIZE, crs,
2649                                                fragm_sz);
2650                 /*  Send queued messages first, if any: */
2651
2652                 l_ptr->stats.sent_fragments++;
2653                 tipc_link_send_buf(l_ptr, fragm);
2654                 if (!tipc_link_is_up(l_ptr))
2655                         return dsz;
2656                 msg_set_fragm_no(&fragm_hdr, ++fragm_no);
2657                 rest -= fragm_sz;
2658                 crs += fragm_sz;
2659                 msg_set_type(&fragm_hdr, FRAGMENT);
2660         }
2661 exit:
2662         buf_discard(buf);
2663         return dsz;
2664 }
2665
2666 /*
2667  * A pending message being re-assembled must store certain values
2668  * to handle subsequent fragments correctly. The following functions
2669  * help storing these values in unused, available fields in the
2670  * pending message. This makes dynamic memory allocation unecessary.
2671  */
2672
2673 static void set_long_msg_seqno(struct sk_buff *buf, u32 seqno)
2674 {
2675         msg_set_seqno(buf_msg(buf), seqno);
2676 }
2677
2678 static u32 get_fragm_size(struct sk_buff *buf)
2679 {
2680         return msg_ack(buf_msg(buf));
2681 }
2682
2683 static void set_fragm_size(struct sk_buff *buf, u32 sz)
2684 {
2685         msg_set_ack(buf_msg(buf), sz);
2686 }
2687
2688 static u32 get_expected_frags(struct sk_buff *buf)
2689 {
2690         return msg_bcast_ack(buf_msg(buf));
2691 }
2692
2693 static void set_expected_frags(struct sk_buff *buf, u32 exp)
2694 {
2695         msg_set_bcast_ack(buf_msg(buf), exp);
2696 }
2697
2698 static u32 get_timer_cnt(struct sk_buff *buf)
2699 {
2700         return msg_reroute_cnt(buf_msg(buf));
2701 }
2702
2703 static void incr_timer_cnt(struct sk_buff *buf)
2704 {
2705         msg_incr_reroute_cnt(buf_msg(buf));
2706 }
2707
2708 /*
2709  * tipc_link_recv_fragment(): Called with node lock on. Returns
2710  * the reassembled buffer if message is complete.
2711  */
2712 int tipc_link_recv_fragment(struct sk_buff **pending, struct sk_buff **fb,
2713                             struct tipc_msg **m)
2714 {
2715         struct sk_buff *prev = NULL;
2716         struct sk_buff *fbuf = *fb;
2717         struct tipc_msg *fragm = buf_msg(fbuf);
2718         struct sk_buff *pbuf = *pending;
2719         u32 long_msg_seq_no = msg_long_msgno(fragm);
2720
2721         *fb = NULL;
2722         msg_dbg(fragm,"FRG<REC<");
2723
2724         /* Is there an incomplete message waiting for this fragment? */
2725
2726         while (pbuf && ((msg_seqno(buf_msg(pbuf)) != long_msg_seq_no) ||
2727                         (msg_orignode(fragm) != msg_orignode(buf_msg(pbuf))))) {
2728                 prev = pbuf;
2729                 pbuf = pbuf->next;
2730         }
2731
2732         if (!pbuf && (msg_type(fragm) == FIRST_FRAGMENT)) {
2733                 struct tipc_msg *imsg = (struct tipc_msg *)msg_data(fragm);
2734                 u32 msg_sz = msg_size(imsg);
2735                 u32 fragm_sz = msg_data_sz(fragm);
2736                 u32 exp_fragm_cnt = msg_sz/fragm_sz + !!(msg_sz % fragm_sz);
2737                 u32 max =  TIPC_MAX_USER_MSG_SIZE + LONG_H_SIZE;
2738                 if (msg_type(imsg) == TIPC_MCAST_MSG)
2739                         max = TIPC_MAX_USER_MSG_SIZE + MCAST_H_SIZE;
2740                 if (msg_size(imsg) > max) {
2741                         msg_dbg(fragm,"<REC<Oversized: ");
2742                         buf_discard(fbuf);
2743                         return 0;
2744                 }
2745                 pbuf = tipc_buf_acquire(msg_size(imsg));
2746                 if (pbuf != NULL) {
2747                         pbuf->next = *pending;
2748                         *pending = pbuf;
2749                         skb_copy_to_linear_data(pbuf, imsg,
2750                                                 msg_data_sz(fragm));
2751                         /*  Prepare buffer for subsequent fragments. */
2752
2753                         set_long_msg_seqno(pbuf, long_msg_seq_no);
2754                         set_fragm_size(pbuf,fragm_sz);
2755                         set_expected_frags(pbuf,exp_fragm_cnt - 1);
2756                 } else {
2757                         warn("Link unable to reassemble fragmented message\n");
2758                 }
2759                 buf_discard(fbuf);
2760                 return 0;
2761         } else if (pbuf && (msg_type(fragm) != FIRST_FRAGMENT)) {
2762                 u32 dsz = msg_data_sz(fragm);
2763                 u32 fsz = get_fragm_size(pbuf);
2764                 u32 crs = ((msg_fragm_no(fragm) - 1) * fsz);
2765                 u32 exp_frags = get_expected_frags(pbuf) - 1;
2766                 skb_copy_to_linear_data_offset(pbuf, crs,
2767                                                msg_data(fragm), dsz);
2768                 buf_discard(fbuf);
2769
2770                 /* Is message complete? */
2771
2772                 if (exp_frags == 0) {
2773                         if (prev)
2774                                 prev->next = pbuf->next;
2775                         else
2776                                 *pending = pbuf->next;
2777                         msg_reset_reroute_cnt(buf_msg(pbuf));
2778                         *fb = pbuf;
2779                         *m = buf_msg(pbuf);
2780                         return 1;
2781                 }
2782                 set_expected_frags(pbuf,exp_frags);
2783                 return 0;
2784         }
2785         dbg(" Discarding orphan fragment %x\n",fbuf);
2786         msg_dbg(fragm,"ORPHAN:");
2787         dbg("Pending long buffers:\n");
2788         dbg_print_buf_chain(*pending);
2789         buf_discard(fbuf);
2790         return 0;
2791 }
2792
2793 /**
2794  * link_check_defragm_bufs - flush stale incoming message fragments
2795  * @l_ptr: pointer to link
2796  */
2797
2798 static void link_check_defragm_bufs(struct link *l_ptr)
2799 {
2800         struct sk_buff *prev = NULL;
2801         struct sk_buff *next = NULL;
2802         struct sk_buff *buf = l_ptr->defragm_buf;
2803
2804         if (!buf)
2805                 return;
2806         if (!link_working_working(l_ptr))
2807                 return;
2808         while (buf) {
2809                 u32 cnt = get_timer_cnt(buf);
2810
2811                 next = buf->next;
2812                 if (cnt < 4) {
2813                         incr_timer_cnt(buf);
2814                         prev = buf;
2815                 } else {
2816                         dbg(" Discarding incomplete long buffer\n");
2817                         msg_dbg(buf_msg(buf), "LONG:");
2818                         dbg_print_link(l_ptr, "curr:");
2819                         dbg("Pending long buffers:\n");
2820                         dbg_print_buf_chain(l_ptr->defragm_buf);
2821                         if (prev)
2822                                 prev->next = buf->next;
2823                         else
2824                                 l_ptr->defragm_buf = buf->next;
2825                         buf_discard(buf);
2826                 }
2827                 buf = next;
2828         }
2829 }
2830
2831
2832
2833 static void link_set_supervision_props(struct link *l_ptr, u32 tolerance)
2834 {
2835         l_ptr->tolerance = tolerance;
2836         l_ptr->continuity_interval =
2837                 ((tolerance / 4) > 500) ? 500 : tolerance / 4;
2838         l_ptr->abort_limit = tolerance / (l_ptr->continuity_interval / 4);
2839 }
2840
2841
2842 void tipc_link_set_queue_limits(struct link *l_ptr, u32 window)
2843 {
2844         /* Data messages from this node, inclusive FIRST_FRAGM */
2845         l_ptr->queue_limit[TIPC_LOW_IMPORTANCE] = window;
2846         l_ptr->queue_limit[TIPC_MEDIUM_IMPORTANCE] = (window / 3) * 4;
2847         l_ptr->queue_limit[TIPC_HIGH_IMPORTANCE] = (window / 3) * 5;
2848         l_ptr->queue_limit[TIPC_CRITICAL_IMPORTANCE] = (window / 3) * 6;
2849         /* Transiting data messages,inclusive FIRST_FRAGM */
2850         l_ptr->queue_limit[TIPC_LOW_IMPORTANCE + 4] = 300;
2851         l_ptr->queue_limit[TIPC_MEDIUM_IMPORTANCE + 4] = 600;
2852         l_ptr->queue_limit[TIPC_HIGH_IMPORTANCE + 4] = 900;
2853         l_ptr->queue_limit[TIPC_CRITICAL_IMPORTANCE + 4] = 1200;
2854         l_ptr->queue_limit[CONN_MANAGER] = 1200;
2855         l_ptr->queue_limit[ROUTE_DISTRIBUTOR] = 1200;
2856         l_ptr->queue_limit[CHANGEOVER_PROTOCOL] = 2500;
2857         l_ptr->queue_limit[NAME_DISTRIBUTOR] = 3000;
2858         /* FRAGMENT and LAST_FRAGMENT packets */
2859         l_ptr->queue_limit[MSG_FRAGMENTER] = 4000;
2860 }
2861
2862 /**
2863  * link_find_link - locate link by name
2864  * @name - ptr to link name string
2865  * @node - ptr to area to be filled with ptr to associated node
2866  *
2867  * Caller must hold 'tipc_net_lock' to ensure node and bearer are not deleted;
2868  * this also prevents link deletion.
2869  *
2870  * Returns pointer to link (or 0 if invalid link name).
2871  */
2872
2873 static struct link *link_find_link(const char *name, struct tipc_node **node)
2874 {
2875         struct link_name link_name_parts;
2876         struct bearer *b_ptr;
2877         struct link *l_ptr;
2878
2879         if (!link_name_validate(name, &link_name_parts))
2880                 return NULL;
2881
2882         b_ptr = tipc_bearer_find_interface(link_name_parts.if_local);
2883         if (!b_ptr)
2884                 return NULL;
2885
2886         *node = tipc_node_find(link_name_parts.addr_peer);
2887         if (!*node)
2888                 return NULL;
2889
2890         l_ptr = (*node)->links[b_ptr->identity];
2891         if (!l_ptr || strcmp(l_ptr->name, name))
2892                 return NULL;
2893
2894         return l_ptr;
2895 }
2896
2897 struct sk_buff *tipc_link_cmd_config(const void *req_tlv_area, int req_tlv_space,
2898                                      u16 cmd)
2899 {
2900         struct tipc_link_config *args;
2901         u32 new_value;
2902         struct link *l_ptr;
2903         struct tipc_node *node;
2904         int res;
2905
2906         if (!TLV_CHECK(req_tlv_area, req_tlv_space, TIPC_TLV_LINK_CONFIG))
2907                 return tipc_cfg_reply_error_string(TIPC_CFG_TLV_ERROR);
2908
2909         args = (struct tipc_link_config *)TLV_DATA(req_tlv_area);
2910         new_value = ntohl(args->value);
2911
2912         if (!strcmp(args->name, tipc_bclink_name)) {
2913                 if ((cmd == TIPC_CMD_SET_LINK_WINDOW) &&
2914                     (tipc_bclink_set_queue_limits(new_value) == 0))
2915                         return tipc_cfg_reply_none();
2916                 return tipc_cfg_reply_error_string(TIPC_CFG_NOT_SUPPORTED
2917                                                    " (cannot change setting on broadcast link)");
2918         }
2919
2920         read_lock_bh(&tipc_net_lock);
2921         l_ptr = link_find_link(args->name, &node);
2922         if (!l_ptr) {
2923                 read_unlock_bh(&tipc_net_lock);
2924                 return tipc_cfg_reply_error_string("link not found");
2925         }
2926
2927         tipc_node_lock(node);
2928         res = -EINVAL;
2929         switch (cmd) {
2930         case TIPC_CMD_SET_LINK_TOL:
2931                 if ((new_value >= TIPC_MIN_LINK_TOL) &&
2932                     (new_value <= TIPC_MAX_LINK_TOL)) {
2933                         link_set_supervision_props(l_ptr, new_value);
2934                         tipc_link_send_proto_msg(l_ptr, STATE_MSG,
2935                                                  0, 0, new_value, 0, 0);
2936                         res = 0;
2937                 }
2938                 break;
2939         case TIPC_CMD_SET_LINK_PRI:
2940                 if ((new_value >= TIPC_MIN_LINK_PRI) &&
2941                     (new_value <= TIPC_MAX_LINK_PRI)) {
2942                         l_ptr->priority = new_value;
2943                         tipc_link_send_proto_msg(l_ptr, STATE_MSG,
2944                                                  0, 0, 0, new_value, 0);
2945                         res = 0;
2946                 }
2947                 break;
2948         case TIPC_CMD_SET_LINK_WINDOW:
2949                 if ((new_value >= TIPC_MIN_LINK_WIN) &&
2950                     (new_value <= TIPC_MAX_LINK_WIN)) {
2951                         tipc_link_set_queue_limits(l_ptr, new_value);
2952                         res = 0;
2953                 }
2954                 break;
2955         }
2956         tipc_node_unlock(node);
2957
2958         read_unlock_bh(&tipc_net_lock);
2959         if (res)
2960                 return tipc_cfg_reply_error_string("cannot change link setting");
2961
2962         return tipc_cfg_reply_none();
2963 }
2964
2965 /**
2966  * link_reset_statistics - reset link statistics
2967  * @l_ptr: pointer to link
2968  */
2969
2970 static void link_reset_statistics(struct link *l_ptr)
2971 {
2972         memset(&l_ptr->stats, 0, sizeof(l_ptr->stats));
2973         l_ptr->stats.sent_info = l_ptr->next_out_no;
2974         l_ptr->stats.recv_info = l_ptr->next_in_no;
2975 }
2976
2977 struct sk_buff *tipc_link_cmd_reset_stats(const void *req_tlv_area, int req_tlv_space)
2978 {
2979         char *link_name;
2980         struct link *l_ptr;
2981         struct tipc_node *node;
2982
2983         if (!TLV_CHECK(req_tlv_area, req_tlv_space, TIPC_TLV_LINK_NAME))
2984                 return tipc_cfg_reply_error_string(TIPC_CFG_TLV_ERROR);
2985
2986         link_name = (char *)TLV_DATA(req_tlv_area);
2987         if (!strcmp(link_name, tipc_bclink_name)) {
2988                 if (tipc_bclink_reset_stats())
2989                         return tipc_cfg_reply_error_string("link not found");
2990                 return tipc_cfg_reply_none();
2991         }
2992
2993         read_lock_bh(&tipc_net_lock);
2994         l_ptr = link_find_link(link_name, &node);
2995         if (!l_ptr) {
2996                 read_unlock_bh(&tipc_net_lock);
2997                 return tipc_cfg_reply_error_string("link not found");
2998         }
2999
3000         tipc_node_lock(node);
3001         link_reset_statistics(l_ptr);
3002         tipc_node_unlock(node);
3003         read_unlock_bh(&tipc_net_lock);
3004         return tipc_cfg_reply_none();
3005 }
3006
3007 /**
3008  * percent - convert count to a percentage of total (rounding up or down)
3009  */
3010
3011 static u32 percent(u32 count, u32 total)
3012 {
3013         return (count * 100 + (total / 2)) / total;
3014 }
3015
3016 /**
3017  * tipc_link_stats - print link statistics
3018  * @name: link name
3019  * @buf: print buffer area
3020  * @buf_size: size of print buffer area
3021  *
3022  * Returns length of print buffer data string (or 0 if error)
3023  */
3024
3025 static int tipc_link_stats(const char *name, char *buf, const u32 buf_size)
3026 {
3027         struct print_buf pb;
3028         struct link *l_ptr;
3029         struct tipc_node *node;
3030         char *status;
3031         u32 profile_total = 0;
3032
3033         if (!strcmp(name, tipc_bclink_name))
3034                 return tipc_bclink_stats(buf, buf_size);
3035
3036         tipc_printbuf_init(&pb, buf, buf_size);
3037
3038         read_lock_bh(&tipc_net_lock);
3039         l_ptr = link_find_link(name, &node);
3040         if (!l_ptr) {
3041                 read_unlock_bh(&tipc_net_lock);
3042                 return 0;
3043         }
3044         tipc_node_lock(node);
3045
3046         if (tipc_link_is_active(l_ptr))
3047                 status = "ACTIVE";
3048         else if (tipc_link_is_up(l_ptr))
3049                 status = "STANDBY";
3050         else
3051                 status = "DEFUNCT";
3052         tipc_printf(&pb, "Link <%s>\n"
3053                          "  %s  MTU:%u  Priority:%u  Tolerance:%u ms"
3054                          "  Window:%u packets\n",
3055                     l_ptr->name, status, l_ptr->max_pkt,
3056                     l_ptr->priority, l_ptr->tolerance, l_ptr->queue_limit[0]);
3057         tipc_printf(&pb, "  RX packets:%u fragments:%u/%u bundles:%u/%u\n",
3058                     l_ptr->next_in_no - l_ptr->stats.recv_info,
3059                     l_ptr->stats.recv_fragments,
3060                     l_ptr->stats.recv_fragmented,
3061                     l_ptr->stats.recv_bundles,
3062                     l_ptr->stats.recv_bundled);
3063         tipc_printf(&pb, "  TX packets:%u fragments:%u/%u bundles:%u/%u\n",
3064                     l_ptr->next_out_no - l_ptr->stats.sent_info,
3065                     l_ptr->stats.sent_fragments,
3066                     l_ptr->stats.sent_fragmented,
3067                     l_ptr->stats.sent_bundles,
3068                     l_ptr->stats.sent_bundled);
3069         profile_total = l_ptr->stats.msg_length_counts;
3070         if (!profile_total)
3071                 profile_total = 1;
3072         tipc_printf(&pb, "  TX profile sample:%u packets  average:%u octets\n"
3073                          "  0-64:%u%% -256:%u%% -1024:%u%% -4096:%u%% "
3074                          "-16354:%u%% -32768:%u%% -66000:%u%%\n",
3075                     l_ptr->stats.msg_length_counts,
3076                     l_ptr->stats.msg_lengths_total / profile_total,
3077                     percent(l_ptr->stats.msg_length_profile[0], profile_total),
3078                     percent(l_ptr->stats.msg_length_profile[1], profile_total),
3079                     percent(l_ptr->stats.msg_length_profile[2], profile_total),
3080                     percent(l_ptr->stats.msg_length_profile[3], profile_total),
3081                     percent(l_ptr->stats.msg_length_profile[4], profile_total),
3082                     percent(l_ptr->stats.msg_length_profile[5], profile_total),
3083                     percent(l_ptr->stats.msg_length_profile[6], profile_total));
3084         tipc_printf(&pb, "  RX states:%u probes:%u naks:%u defs:%u dups:%u\n",
3085                     l_ptr->stats.recv_states,
3086                     l_ptr->stats.recv_probes,
3087                     l_ptr->stats.recv_nacks,
3088                     l_ptr->stats.deferred_recv,
3089                     l_ptr->stats.duplicates);
3090         tipc_printf(&pb, "  TX states:%u probes:%u naks:%u acks:%u dups:%u\n",
3091                     l_ptr->stats.sent_states,
3092                     l_ptr->stats.sent_probes,
3093                     l_ptr->stats.sent_nacks,
3094                     l_ptr->stats.sent_acks,
3095                     l_ptr->stats.retransmitted);
3096         tipc_printf(&pb, "  Congestion bearer:%u link:%u  Send queue max:%u avg:%u\n",
3097                     l_ptr->stats.bearer_congs,
3098                     l_ptr->stats.link_congs,
3099                     l_ptr->stats.max_queue_sz,
3100                     l_ptr->stats.queue_sz_counts
3101                     ? (l_ptr->stats.accu_queue_sz / l_ptr->stats.queue_sz_counts)
3102                     : 0);
3103
3104         tipc_node_unlock(node);
3105         read_unlock_bh(&tipc_net_lock);
3106         return tipc_printbuf_validate(&pb);
3107 }
3108
3109 #define MAX_LINK_STATS_INFO 2000
3110
3111 struct sk_buff *tipc_link_cmd_show_stats(const void *req_tlv_area, int req_tlv_space)
3112 {
3113         struct sk_buff *buf;
3114         struct tlv_desc *rep_tlv;
3115         int str_len;
3116
3117         if (!TLV_CHECK(req_tlv_area, req_tlv_space, TIPC_TLV_LINK_NAME))
3118                 return tipc_cfg_reply_error_string(TIPC_CFG_TLV_ERROR);
3119
3120         buf = tipc_cfg_reply_alloc(TLV_SPACE(MAX_LINK_STATS_INFO));
3121         if (!buf)
3122                 return NULL;
3123
3124         rep_tlv = (struct tlv_desc *)buf->data;
3125
3126         str_len = tipc_link_stats((char *)TLV_DATA(req_tlv_area),
3127                                   (char *)TLV_DATA(rep_tlv), MAX_LINK_STATS_INFO);
3128         if (!str_len) {
3129                 buf_discard(buf);
3130                 return tipc_cfg_reply_error_string("link not found");
3131         }
3132
3133         skb_put(buf, TLV_SPACE(str_len));
3134         TLV_SET(rep_tlv, TIPC_TLV_ULTRA_STRING, NULL, str_len);
3135
3136         return buf;
3137 }
3138
3139 /**
3140  * tipc_link_get_max_pkt - get maximum packet size to use when sending to destination
3141  * @dest: network address of destination node
3142  * @selector: used to select from set of active links
3143  *
3144  * If no active link can be found, uses default maximum packet size.
3145  */
3146
3147 u32 tipc_link_get_max_pkt(u32 dest, u32 selector)
3148 {
3149         struct tipc_node *n_ptr;
3150         struct link *l_ptr;
3151         u32 res = MAX_PKT_DEFAULT;
3152
3153         if (dest == tipc_own_addr)
3154                 return MAX_MSG_SIZE;
3155
3156         read_lock_bh(&tipc_net_lock);
3157         n_ptr = tipc_node_select(dest, selector);
3158         if (n_ptr) {
3159                 tipc_node_lock(n_ptr);
3160                 l_ptr = n_ptr->active_links[selector & 1];
3161                 if (l_ptr)
3162                         res = l_ptr->max_pkt;
3163                 tipc_node_unlock(n_ptr);
3164         }
3165         read_unlock_bh(&tipc_net_lock);
3166         return res;
3167 }
3168
3169 static void link_dump_send_queue(struct link *l_ptr)
3170 {
3171         if (l_ptr->next_out) {
3172                 info("\nContents of unsent queue:\n");
3173                 dbg_print_buf_chain(l_ptr->next_out);
3174         }
3175         info("\nContents of send queue:\n");
3176         if (l_ptr->first_out) {
3177                 dbg_print_buf_chain(l_ptr->first_out);
3178         }
3179         info("Empty send queue\n");
3180 }
3181
3182 static void link_print(struct link *l_ptr, struct print_buf *buf,
3183                        const char *str)
3184 {
3185         tipc_printf(buf, str);
3186         if (link_reset_reset(l_ptr) || link_reset_unknown(l_ptr))
3187                 return;
3188         tipc_printf(buf, "Link %x<%s>:",
3189                     l_ptr->addr, l_ptr->b_ptr->publ.name);
3190         tipc_printf(buf, ": NXO(%u):", mod(l_ptr->next_out_no));
3191         tipc_printf(buf, "NXI(%u):", mod(l_ptr->next_in_no));
3192         tipc_printf(buf, "SQUE");
3193         if (l_ptr->first_out) {
3194                 tipc_printf(buf, "[%u..", msg_seqno(buf_msg(l_ptr->first_out)));
3195                 if (l_ptr->next_out)
3196                         tipc_printf(buf, "%u..",
3197                                     msg_seqno(buf_msg(l_ptr->next_out)));
3198                 tipc_printf(buf, "%u]", msg_seqno(buf_msg(l_ptr->last_out)));
3199                 if ((mod(msg_seqno(buf_msg(l_ptr->last_out)) -
3200                          msg_seqno(buf_msg(l_ptr->first_out)))
3201                      != (l_ptr->out_queue_size - 1)) ||
3202                     (l_ptr->last_out->next != NULL)) {
3203                         tipc_printf(buf, "\nSend queue inconsistency\n");
3204                         tipc_printf(buf, "first_out= %x ", l_ptr->first_out);
3205                         tipc_printf(buf, "next_out= %x ", l_ptr->next_out);
3206                         tipc_printf(buf, "last_out= %x ", l_ptr->last_out);
3207                         link_dump_send_queue(l_ptr);
3208                 }
3209         } else
3210                 tipc_printf(buf, "[]");
3211         tipc_printf(buf, "SQSIZ(%u)", l_ptr->out_queue_size);
3212         if (l_ptr->oldest_deferred_in) {
3213                 u32 o = msg_seqno(buf_msg(l_ptr->oldest_deferred_in));
3214                 u32 n = msg_seqno(buf_msg(l_ptr->newest_deferred_in));
3215                 tipc_printf(buf, ":RQUE[%u..%u]", o, n);
3216                 if (l_ptr->deferred_inqueue_sz != mod((n + 1) - o)) {
3217                         tipc_printf(buf, ":RQSIZ(%u)",
3218                                     l_ptr->deferred_inqueue_sz);
3219                 }
3220         }
3221         if (link_working_unknown(l_ptr))
3222                 tipc_printf(buf, ":WU");
3223         if (link_reset_reset(l_ptr))
3224                 tipc_printf(buf, ":RR");
3225         if (link_reset_unknown(l_ptr))
3226                 tipc_printf(buf, ":RU");
3227         if (link_working_working(l_ptr))
3228                 tipc_printf(buf, ":WW");
3229         tipc_printf(buf, "\n");
3230 }
3231