cb10d20caef32398c31a39b6b0d4fc9f59953c90
[linux-2.6.git] / net / tipc / link.c
1 /*
2  * net/tipc/link.c: TIPC link code
3  *
4  * Copyright (c) 1996-2007, Ericsson AB
5  * Copyright (c) 2004-2007, Wind River Systems
6  * All rights reserved.
7  *
8  * Redistribution and use in source and binary forms, with or without
9  * modification, are permitted provided that the following conditions are met:
10  *
11  * 1. Redistributions of source code must retain the above copyright
12  *    notice, this list of conditions and the following disclaimer.
13  * 2. Redistributions in binary form must reproduce the above copyright
14  *    notice, this list of conditions and the following disclaimer in the
15  *    documentation and/or other materials provided with the distribution.
16  * 3. Neither the names of the copyright holders nor the names of its
17  *    contributors may be used to endorse or promote products derived from
18  *    this software without specific prior written permission.
19  *
20  * Alternatively, this software may be distributed under the terms of the
21  * GNU General Public License ("GPL") version 2 as published by the Free
22  * Software Foundation.
23  *
24  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
25  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
26  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
27  * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
28  * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
29  * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
30  * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
31  * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
32  * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
33  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
34  * POSSIBILITY OF SUCH DAMAGE.
35  */
36
37 #include "core.h"
38 #include "link.h"
39 #include "port.h"
40 #include "name_distr.h"
41 #include "discover.h"
42 #include "config.h"
43
44
45 /*
46  * Out-of-range value for link session numbers
47  */
48
49 #define INVALID_SESSION 0x10000
50
51 /*
52  * Link state events:
53  */
54
55 #define  STARTING_EVT    856384768      /* link processing trigger */
56 #define  TRAFFIC_MSG_EVT 560815u        /* rx'd ??? */
57 #define  TIMEOUT_EVT     560817u        /* link timer expired */
58
59 /*
60  * The following two 'message types' is really just implementation
61  * data conveniently stored in the message header.
62  * They must not be considered part of the protocol
63  */
64 #define OPEN_MSG   0
65 #define CLOSED_MSG 1
66
67 /*
68  * State value stored in 'exp_msg_count'
69  */
70
71 #define START_CHANGEOVER 100000u
72
73 /**
74  * struct link_name - deconstructed link name
75  * @addr_local: network address of node at this end
76  * @if_local: name of interface at this end
77  * @addr_peer: network address of node at far end
78  * @if_peer: name of interface at far end
79  */
80
81 struct link_name {
82         u32 addr_local;
83         char if_local[TIPC_MAX_IF_NAME];
84         u32 addr_peer;
85         char if_peer[TIPC_MAX_IF_NAME];
86 };
87
88 static void link_handle_out_of_seq_msg(struct link *l_ptr,
89                                        struct sk_buff *buf);
90 static void link_recv_proto_msg(struct link *l_ptr, struct sk_buff *buf);
91 static int  link_recv_changeover_msg(struct link **l_ptr, struct sk_buff **buf);
92 static void link_set_supervision_props(struct link *l_ptr, u32 tolerance);
93 static int  link_send_sections_long(struct port *sender,
94                                     struct iovec const *msg_sect,
95                                     u32 num_sect, u32 destnode);
96 static void link_check_defragm_bufs(struct link *l_ptr);
97 static void link_state_event(struct link *l_ptr, u32 event);
98 static void link_reset_statistics(struct link *l_ptr);
99 static void link_print(struct link *l_ptr, struct print_buf *buf,
100                        const char *str);
101 static void link_start(struct link *l_ptr);
102 static int link_send_long_buf(struct link *l_ptr, struct sk_buff *buf);
103
104
105 /*
106  * Debugging code used by link routines only
107  *
108  * When debugging link problems on a system that has multiple links,
109  * the standard TIPC debugging routines may not be useful since they
110  * allow the output from multiple links to be intermixed.  For this reason
111  * routines of the form "dbg_link_XXX()" have been created that will capture
112  * debug info into a link's personal print buffer, which can then be dumped
113  * into the TIPC system log (TIPC_LOG) upon request.
114  *
115  * To enable per-link debugging, use LINK_LOG_BUF_SIZE to specify the size
116  * of the print buffer used by each link.  If LINK_LOG_BUF_SIZE is set to 0,
117  * the dbg_link_XXX() routines simply send their output to the standard
118  * debug print buffer (DBG_OUTPUT), if it has been defined; this can be useful
119  * when there is only a single link in the system being debugged.
120  *
121  * Notes:
122  * - When enabled, LINK_LOG_BUF_SIZE should be set to at least TIPC_PB_MIN_SIZE
123  * - "l_ptr" must be valid when using dbg_link_XXX() macros
124  */
125
126 #define LINK_LOG_BUF_SIZE 0
127
128 #define dbg_link(fmt, arg...) \
129         do { \
130                 if (LINK_LOG_BUF_SIZE) \
131                         tipc_printf(&l_ptr->print_buf, fmt, ## arg); \
132         } while (0)
133 #define dbg_link_msg(msg, txt) \
134         do { \
135                 if (LINK_LOG_BUF_SIZE) \
136                         tipc_msg_dbg(&l_ptr->print_buf, msg, txt); \
137         } while (0)
138 #define dbg_link_state(txt) \
139         do { \
140                 if (LINK_LOG_BUF_SIZE) \
141                         link_print(l_ptr, &l_ptr->print_buf, txt); \
142         } while (0)
143 #define dbg_link_dump() do { \
144         if (LINK_LOG_BUF_SIZE) { \
145                 tipc_printf(LOG, "\n\nDumping link <%s>:\n", l_ptr->name); \
146                 tipc_printbuf_move(LOG, &l_ptr->print_buf); \
147         } \
148 } while (0)
149
150 static void dbg_print_link(struct link *l_ptr, const char *str)
151 {
152         if (DBG_OUTPUT != TIPC_NULL)
153                 link_print(l_ptr, DBG_OUTPUT, str);
154 }
155
156 /*
157  *  Simple link routines
158  */
159
160 static unsigned int align(unsigned int i)
161 {
162         return (i + 3) & ~3u;
163 }
164
165 static void link_init_max_pkt(struct link *l_ptr)
166 {
167         u32 max_pkt;
168
169         max_pkt = (l_ptr->b_ptr->publ.mtu & ~3);
170         if (max_pkt > MAX_MSG_SIZE)
171                 max_pkt = MAX_MSG_SIZE;
172
173         l_ptr->max_pkt_target = max_pkt;
174         if (l_ptr->max_pkt_target < MAX_PKT_DEFAULT)
175                 l_ptr->max_pkt = l_ptr->max_pkt_target;
176         else
177                 l_ptr->max_pkt = MAX_PKT_DEFAULT;
178
179         l_ptr->max_pkt_probes = 0;
180 }
181
182 static u32 link_next_sent(struct link *l_ptr)
183 {
184         if (l_ptr->next_out)
185                 return msg_seqno(buf_msg(l_ptr->next_out));
186         return mod(l_ptr->next_out_no);
187 }
188
189 static u32 link_last_sent(struct link *l_ptr)
190 {
191         return mod(link_next_sent(l_ptr) - 1);
192 }
193
194 /*
195  *  Simple non-static link routines (i.e. referenced outside this file)
196  */
197
198 int tipc_link_is_up(struct link *l_ptr)
199 {
200         if (!l_ptr)
201                 return 0;
202         return link_working_working(l_ptr) || link_working_unknown(l_ptr);
203 }
204
205 int tipc_link_is_active(struct link *l_ptr)
206 {
207         return  (l_ptr->owner->active_links[0] == l_ptr) ||
208                 (l_ptr->owner->active_links[1] == l_ptr);
209 }
210
211 /**
212  * link_name_validate - validate & (optionally) deconstruct link name
213  * @name - ptr to link name string
214  * @name_parts - ptr to area for link name components (or NULL if not needed)
215  *
216  * Returns 1 if link name is valid, otherwise 0.
217  */
218
219 static int link_name_validate(const char *name, struct link_name *name_parts)
220 {
221         char name_copy[TIPC_MAX_LINK_NAME];
222         char *addr_local;
223         char *if_local;
224         char *addr_peer;
225         char *if_peer;
226         char dummy;
227         u32 z_local, c_local, n_local;
228         u32 z_peer, c_peer, n_peer;
229         u32 if_local_len;
230         u32 if_peer_len;
231
232         /* copy link name & ensure length is OK */
233
234         name_copy[TIPC_MAX_LINK_NAME - 1] = 0;
235         /* need above in case non-Posix strncpy() doesn't pad with nulls */
236         strncpy(name_copy, name, TIPC_MAX_LINK_NAME);
237         if (name_copy[TIPC_MAX_LINK_NAME - 1] != 0)
238                 return 0;
239
240         /* ensure all component parts of link name are present */
241
242         addr_local = name_copy;
243         if ((if_local = strchr(addr_local, ':')) == NULL)
244                 return 0;
245         *(if_local++) = 0;
246         if ((addr_peer = strchr(if_local, '-')) == NULL)
247                 return 0;
248         *(addr_peer++) = 0;
249         if_local_len = addr_peer - if_local;
250         if ((if_peer = strchr(addr_peer, ':')) == NULL)
251                 return 0;
252         *(if_peer++) = 0;
253         if_peer_len = strlen(if_peer) + 1;
254
255         /* validate component parts of link name */
256
257         if ((sscanf(addr_local, "%u.%u.%u%c",
258                     &z_local, &c_local, &n_local, &dummy) != 3) ||
259             (sscanf(addr_peer, "%u.%u.%u%c",
260                     &z_peer, &c_peer, &n_peer, &dummy) != 3) ||
261             (z_local > 255) || (c_local > 4095) || (n_local > 4095) ||
262             (z_peer  > 255) || (c_peer  > 4095) || (n_peer  > 4095) ||
263             (if_local_len <= 1) || (if_local_len > TIPC_MAX_IF_NAME) ||
264             (if_peer_len  <= 1) || (if_peer_len  > TIPC_MAX_IF_NAME) ||
265             (strspn(if_local, tipc_alphabet) != (if_local_len - 1)) ||
266             (strspn(if_peer, tipc_alphabet) != (if_peer_len - 1)))
267                 return 0;
268
269         /* return link name components, if necessary */
270
271         if (name_parts) {
272                 name_parts->addr_local = tipc_addr(z_local, c_local, n_local);
273                 strcpy(name_parts->if_local, if_local);
274                 name_parts->addr_peer = tipc_addr(z_peer, c_peer, n_peer);
275                 strcpy(name_parts->if_peer, if_peer);
276         }
277         return 1;
278 }
279
280 /**
281  * link_timeout - handle expiration of link timer
282  * @l_ptr: pointer to link
283  *
284  * This routine must not grab "tipc_net_lock" to avoid a potential deadlock conflict
285  * with tipc_link_delete().  (There is no risk that the node will be deleted by
286  * another thread because tipc_link_delete() always cancels the link timer before
287  * tipc_node_delete() is called.)
288  */
289
290 static void link_timeout(struct link *l_ptr)
291 {
292         tipc_node_lock(l_ptr->owner);
293
294         /* update counters used in statistical profiling of send traffic */
295
296         l_ptr->stats.accu_queue_sz += l_ptr->out_queue_size;
297         l_ptr->stats.queue_sz_counts++;
298
299         if (l_ptr->out_queue_size > l_ptr->stats.max_queue_sz)
300                 l_ptr->stats.max_queue_sz = l_ptr->out_queue_size;
301
302         if (l_ptr->first_out) {
303                 struct tipc_msg *msg = buf_msg(l_ptr->first_out);
304                 u32 length = msg_size(msg);
305
306                 if ((msg_user(msg) == MSG_FRAGMENTER) &&
307                     (msg_type(msg) == FIRST_FRAGMENT)) {
308                         length = msg_size(msg_get_wrapped(msg));
309                 }
310                 if (length) {
311                         l_ptr->stats.msg_lengths_total += length;
312                         l_ptr->stats.msg_length_counts++;
313                         if (length <= 64)
314                                 l_ptr->stats.msg_length_profile[0]++;
315                         else if (length <= 256)
316                                 l_ptr->stats.msg_length_profile[1]++;
317                         else if (length <= 1024)
318                                 l_ptr->stats.msg_length_profile[2]++;
319                         else if (length <= 4096)
320                                 l_ptr->stats.msg_length_profile[3]++;
321                         else if (length <= 16384)
322                                 l_ptr->stats.msg_length_profile[4]++;
323                         else if (length <= 32768)
324                                 l_ptr->stats.msg_length_profile[5]++;
325                         else
326                                 l_ptr->stats.msg_length_profile[6]++;
327                 }
328         }
329
330         /* do all other link processing performed on a periodic basis */
331
332         link_check_defragm_bufs(l_ptr);
333
334         link_state_event(l_ptr, TIMEOUT_EVT);
335
336         if (l_ptr->next_out)
337                 tipc_link_push_queue(l_ptr);
338
339         tipc_node_unlock(l_ptr->owner);
340 }
341
342 static void link_set_timer(struct link *l_ptr, u32 time)
343 {
344         k_start_timer(&l_ptr->timer, time);
345 }
346
347 /**
348  * tipc_link_create - create a new link
349  * @b_ptr: pointer to associated bearer
350  * @peer: network address of node at other end of link
351  * @media_addr: media address to use when sending messages over link
352  *
353  * Returns pointer to link.
354  */
355
356 struct link *tipc_link_create(struct bearer *b_ptr, const u32 peer,
357                               const struct tipc_media_addr *media_addr)
358 {
359         struct link *l_ptr;
360         struct tipc_msg *msg;
361         char *if_name;
362
363         l_ptr = kzalloc(sizeof(*l_ptr), GFP_ATOMIC);
364         if (!l_ptr) {
365                 warn("Link creation failed, no memory\n");
366                 return NULL;
367         }
368
369         if (LINK_LOG_BUF_SIZE) {
370                 char *pb = kmalloc(LINK_LOG_BUF_SIZE, GFP_ATOMIC);
371
372                 if (!pb) {
373                         kfree(l_ptr);
374                         warn("Link creation failed, no memory for print buffer\n");
375                         return NULL;
376                 }
377                 tipc_printbuf_init(&l_ptr->print_buf, pb, LINK_LOG_BUF_SIZE);
378         }
379
380         l_ptr->addr = peer;
381         if_name = strchr(b_ptr->publ.name, ':') + 1;
382         sprintf(l_ptr->name, "%u.%u.%u:%s-%u.%u.%u:",
383                 tipc_zone(tipc_own_addr), tipc_cluster(tipc_own_addr),
384                 tipc_node(tipc_own_addr),
385                 if_name,
386                 tipc_zone(peer), tipc_cluster(peer), tipc_node(peer));
387                 /* note: peer i/f is appended to link name by reset/activate */
388         memcpy(&l_ptr->media_addr, media_addr, sizeof(*media_addr));
389         l_ptr->checkpoint = 1;
390         l_ptr->b_ptr = b_ptr;
391         link_set_supervision_props(l_ptr, b_ptr->media->tolerance);
392         l_ptr->state = RESET_UNKNOWN;
393
394         l_ptr->pmsg = (struct tipc_msg *)&l_ptr->proto_msg;
395         msg = l_ptr->pmsg;
396         tipc_msg_init(msg, LINK_PROTOCOL, RESET_MSG, INT_H_SIZE, l_ptr->addr);
397         msg_set_size(msg, sizeof(l_ptr->proto_msg));
398         msg_set_session(msg, (tipc_random & 0xffff));
399         msg_set_bearer_id(msg, b_ptr->identity);
400         strcpy((char *)msg_data(msg), if_name);
401
402         l_ptr->priority = b_ptr->priority;
403         tipc_link_set_queue_limits(l_ptr, b_ptr->media->window);
404
405         link_init_max_pkt(l_ptr);
406
407         l_ptr->next_out_no = 1;
408         INIT_LIST_HEAD(&l_ptr->waiting_ports);
409
410         link_reset_statistics(l_ptr);
411
412         l_ptr->owner = tipc_node_attach_link(l_ptr);
413         if (!l_ptr->owner) {
414                 if (LINK_LOG_BUF_SIZE)
415                         kfree(l_ptr->print_buf.buf);
416                 kfree(l_ptr);
417                 return NULL;
418         }
419
420         k_init_timer(&l_ptr->timer, (Handler)link_timeout, (unsigned long)l_ptr);
421         list_add_tail(&l_ptr->link_list, &b_ptr->links);
422         tipc_k_signal((Handler)link_start, (unsigned long)l_ptr);
423
424         return l_ptr;
425 }
426
427 /**
428  * tipc_link_delete - delete a link
429  * @l_ptr: pointer to link
430  *
431  * Note: 'tipc_net_lock' is write_locked, bearer is locked.
432  * This routine must not grab the node lock until after link timer cancellation
433  * to avoid a potential deadlock situation.
434  */
435
436 void tipc_link_delete(struct link *l_ptr)
437 {
438         if (!l_ptr) {
439                 err("Attempt to delete non-existent link\n");
440                 return;
441         }
442
443         k_cancel_timer(&l_ptr->timer);
444
445         tipc_node_lock(l_ptr->owner);
446         tipc_link_reset(l_ptr);
447         tipc_node_detach_link(l_ptr->owner, l_ptr);
448         tipc_link_stop(l_ptr);
449         list_del_init(&l_ptr->link_list);
450         if (LINK_LOG_BUF_SIZE)
451                 kfree(l_ptr->print_buf.buf);
452         tipc_node_unlock(l_ptr->owner);
453         k_term_timer(&l_ptr->timer);
454         kfree(l_ptr);
455 }
456
457 static void link_start(struct link *l_ptr)
458 {
459         link_state_event(l_ptr, STARTING_EVT);
460 }
461
462 /**
463  * link_schedule_port - schedule port for deferred sending
464  * @l_ptr: pointer to link
465  * @origport: reference to sending port
466  * @sz: amount of data to be sent
467  *
468  * Schedules port for renewed sending of messages after link congestion
469  * has abated.
470  */
471
472 static int link_schedule_port(struct link *l_ptr, u32 origport, u32 sz)
473 {
474         struct port *p_ptr;
475
476         spin_lock_bh(&tipc_port_list_lock);
477         p_ptr = tipc_port_lock(origport);
478         if (p_ptr) {
479                 if (!p_ptr->wakeup)
480                         goto exit;
481                 if (!list_empty(&p_ptr->wait_list))
482                         goto exit;
483                 p_ptr->publ.congested = 1;
484                 p_ptr->waiting_pkts = 1 + ((sz - 1) / l_ptr->max_pkt);
485                 list_add_tail(&p_ptr->wait_list, &l_ptr->waiting_ports);
486                 l_ptr->stats.link_congs++;
487 exit:
488                 tipc_port_unlock(p_ptr);
489         }
490         spin_unlock_bh(&tipc_port_list_lock);
491         return -ELINKCONG;
492 }
493
494 void tipc_link_wakeup_ports(struct link *l_ptr, int all)
495 {
496         struct port *p_ptr;
497         struct port *temp_p_ptr;
498         int win = l_ptr->queue_limit[0] - l_ptr->out_queue_size;
499
500         if (all)
501                 win = 100000;
502         if (win <= 0)
503                 return;
504         if (!spin_trylock_bh(&tipc_port_list_lock))
505                 return;
506         if (link_congested(l_ptr))
507                 goto exit;
508         list_for_each_entry_safe(p_ptr, temp_p_ptr, &l_ptr->waiting_ports,
509                                  wait_list) {
510                 if (win <= 0)
511                         break;
512                 list_del_init(&p_ptr->wait_list);
513                 spin_lock_bh(p_ptr->publ.lock);
514                 p_ptr->publ.congested = 0;
515                 p_ptr->wakeup(&p_ptr->publ);
516                 win -= p_ptr->waiting_pkts;
517                 spin_unlock_bh(p_ptr->publ.lock);
518         }
519
520 exit:
521         spin_unlock_bh(&tipc_port_list_lock);
522 }
523
524 /**
525  * link_release_outqueue - purge link's outbound message queue
526  * @l_ptr: pointer to link
527  */
528
529 static void link_release_outqueue(struct link *l_ptr)
530 {
531         struct sk_buff *buf = l_ptr->first_out;
532         struct sk_buff *next;
533
534         while (buf) {
535                 next = buf->next;
536                 buf_discard(buf);
537                 buf = next;
538         }
539         l_ptr->first_out = NULL;
540         l_ptr->out_queue_size = 0;
541 }
542
543 /**
544  * tipc_link_reset_fragments - purge link's inbound message fragments queue
545  * @l_ptr: pointer to link
546  */
547
548 void tipc_link_reset_fragments(struct link *l_ptr)
549 {
550         struct sk_buff *buf = l_ptr->defragm_buf;
551         struct sk_buff *next;
552
553         while (buf) {
554                 next = buf->next;
555                 buf_discard(buf);
556                 buf = next;
557         }
558         l_ptr->defragm_buf = NULL;
559 }
560
561 /**
562  * tipc_link_stop - purge all inbound and outbound messages associated with link
563  * @l_ptr: pointer to link
564  */
565
566 void tipc_link_stop(struct link *l_ptr)
567 {
568         struct sk_buff *buf;
569         struct sk_buff *next;
570
571         buf = l_ptr->oldest_deferred_in;
572         while (buf) {
573                 next = buf->next;
574                 buf_discard(buf);
575                 buf = next;
576         }
577
578         buf = l_ptr->first_out;
579         while (buf) {
580                 next = buf->next;
581                 buf_discard(buf);
582                 buf = next;
583         }
584
585         tipc_link_reset_fragments(l_ptr);
586
587         buf_discard(l_ptr->proto_msg_queue);
588         l_ptr->proto_msg_queue = NULL;
589 }
590
591 /* LINK EVENT CODE IS NOT SUPPORTED AT PRESENT */
592 #define link_send_event(fcn, l_ptr, up) do { } while (0)
593
594 void tipc_link_reset(struct link *l_ptr)
595 {
596         struct sk_buff *buf;
597         u32 prev_state = l_ptr->state;
598         u32 checkpoint = l_ptr->next_in_no;
599         int was_active_link = tipc_link_is_active(l_ptr);
600
601         msg_set_session(l_ptr->pmsg, ((msg_session(l_ptr->pmsg) + 1) & 0xffff));
602
603         /* Link is down, accept any session */
604         l_ptr->peer_session = INVALID_SESSION;
605
606         /* Prepare for max packet size negotiation */
607         link_init_max_pkt(l_ptr);
608
609         l_ptr->state = RESET_UNKNOWN;
610         dbg_link_state("Resetting Link\n");
611
612         if ((prev_state == RESET_UNKNOWN) || (prev_state == RESET_RESET))
613                 return;
614
615         tipc_node_link_down(l_ptr->owner, l_ptr);
616         tipc_bearer_remove_dest(l_ptr->b_ptr, l_ptr->addr);
617
618         if (was_active_link && tipc_node_has_active_links(l_ptr->owner) &&
619             l_ptr->owner->permit_changeover) {
620                 l_ptr->reset_checkpoint = checkpoint;
621                 l_ptr->exp_msg_count = START_CHANGEOVER;
622         }
623
624         /* Clean up all queues: */
625
626         link_release_outqueue(l_ptr);
627         buf_discard(l_ptr->proto_msg_queue);
628         l_ptr->proto_msg_queue = NULL;
629         buf = l_ptr->oldest_deferred_in;
630         while (buf) {
631                 struct sk_buff *next = buf->next;
632                 buf_discard(buf);
633                 buf = next;
634         }
635         if (!list_empty(&l_ptr->waiting_ports))
636                 tipc_link_wakeup_ports(l_ptr, 1);
637
638         l_ptr->retransm_queue_head = 0;
639         l_ptr->retransm_queue_size = 0;
640         l_ptr->last_out = NULL;
641         l_ptr->first_out = NULL;
642         l_ptr->next_out = NULL;
643         l_ptr->unacked_window = 0;
644         l_ptr->checkpoint = 1;
645         l_ptr->next_out_no = 1;
646         l_ptr->deferred_inqueue_sz = 0;
647         l_ptr->oldest_deferred_in = NULL;
648         l_ptr->newest_deferred_in = NULL;
649         l_ptr->fsm_msg_cnt = 0;
650         l_ptr->stale_count = 0;
651         link_reset_statistics(l_ptr);
652
653         link_send_event(tipc_cfg_link_event, l_ptr, 0);
654         if (!in_own_cluster(l_ptr->addr))
655                 link_send_event(tipc_disc_link_event, l_ptr, 0);
656 }
657
658
659 static void link_activate(struct link *l_ptr)
660 {
661         l_ptr->next_in_no = l_ptr->stats.recv_info = 1;
662         tipc_node_link_up(l_ptr->owner, l_ptr);
663         tipc_bearer_add_dest(l_ptr->b_ptr, l_ptr->addr);
664         link_send_event(tipc_cfg_link_event, l_ptr, 1);
665         if (!in_own_cluster(l_ptr->addr))
666                 link_send_event(tipc_disc_link_event, l_ptr, 1);
667 }
668
669 /**
670  * link_state_event - link finite state machine
671  * @l_ptr: pointer to link
672  * @event: state machine event to process
673  */
674
675 static void link_state_event(struct link *l_ptr, unsigned event)
676 {
677         struct link *other;
678         u32 cont_intv = l_ptr->continuity_interval;
679
680         if (!l_ptr->started && (event != STARTING_EVT))
681                 return;         /* Not yet. */
682
683         if (link_blocked(l_ptr)) {
684                 if (event == TIMEOUT_EVT) {
685                         link_set_timer(l_ptr, cont_intv);
686                 }
687                 return;   /* Changeover going on */
688         }
689         dbg_link("STATE_EV: <%s> ", l_ptr->name);
690
691         switch (l_ptr->state) {
692         case WORKING_WORKING:
693                 dbg_link("WW/");
694                 switch (event) {
695                 case TRAFFIC_MSG_EVT:
696                         dbg_link("TRF-");
697                         /* fall through */
698                 case ACTIVATE_MSG:
699                         dbg_link("ACT\n");
700                         break;
701                 case TIMEOUT_EVT:
702                         dbg_link("TIM ");
703                         if (l_ptr->next_in_no != l_ptr->checkpoint) {
704                                 l_ptr->checkpoint = l_ptr->next_in_no;
705                                 if (tipc_bclink_acks_missing(l_ptr->owner)) {
706                                         tipc_link_send_proto_msg(l_ptr, STATE_MSG,
707                                                                  0, 0, 0, 0, 0);
708                                         l_ptr->fsm_msg_cnt++;
709                                 } else if (l_ptr->max_pkt < l_ptr->max_pkt_target) {
710                                         tipc_link_send_proto_msg(l_ptr, STATE_MSG,
711                                                                  1, 0, 0, 0, 0);
712                                         l_ptr->fsm_msg_cnt++;
713                                 }
714                                 link_set_timer(l_ptr, cont_intv);
715                                 break;
716                         }
717                         dbg_link(" -> WU\n");
718                         l_ptr->state = WORKING_UNKNOWN;
719                         l_ptr->fsm_msg_cnt = 0;
720                         tipc_link_send_proto_msg(l_ptr, STATE_MSG, 1, 0, 0, 0, 0);
721                         l_ptr->fsm_msg_cnt++;
722                         link_set_timer(l_ptr, cont_intv / 4);
723                         break;
724                 case RESET_MSG:
725                         dbg_link("RES -> RR\n");
726                         info("Resetting link <%s>, requested by peer\n",
727                              l_ptr->name);
728                         tipc_link_reset(l_ptr);
729                         l_ptr->state = RESET_RESET;
730                         l_ptr->fsm_msg_cnt = 0;
731                         tipc_link_send_proto_msg(l_ptr, ACTIVATE_MSG, 0, 0, 0, 0, 0);
732                         l_ptr->fsm_msg_cnt++;
733                         link_set_timer(l_ptr, cont_intv);
734                         break;
735                 default:
736                         err("Unknown link event %u in WW state\n", event);
737                 }
738                 break;
739         case WORKING_UNKNOWN:
740                 dbg_link("WU/");
741                 switch (event) {
742                 case TRAFFIC_MSG_EVT:
743                         dbg_link("TRF-");
744                 case ACTIVATE_MSG:
745                         dbg_link("ACT -> WW\n");
746                         l_ptr->state = WORKING_WORKING;
747                         l_ptr->fsm_msg_cnt = 0;
748                         link_set_timer(l_ptr, cont_intv);
749                         break;
750                 case RESET_MSG:
751                         dbg_link("RES -> RR\n");
752                         info("Resetting link <%s>, requested by peer "
753                              "while probing\n", l_ptr->name);
754                         tipc_link_reset(l_ptr);
755                         l_ptr->state = RESET_RESET;
756                         l_ptr->fsm_msg_cnt = 0;
757                         tipc_link_send_proto_msg(l_ptr, ACTIVATE_MSG, 0, 0, 0, 0, 0);
758                         l_ptr->fsm_msg_cnt++;
759                         link_set_timer(l_ptr, cont_intv);
760                         break;
761                 case TIMEOUT_EVT:
762                         dbg_link("TIM ");
763                         if (l_ptr->next_in_no != l_ptr->checkpoint) {
764                                 dbg_link("-> WW\n");
765                                 l_ptr->state = WORKING_WORKING;
766                                 l_ptr->fsm_msg_cnt = 0;
767                                 l_ptr->checkpoint = l_ptr->next_in_no;
768                                 if (tipc_bclink_acks_missing(l_ptr->owner)) {
769                                         tipc_link_send_proto_msg(l_ptr, STATE_MSG,
770                                                                  0, 0, 0, 0, 0);
771                                         l_ptr->fsm_msg_cnt++;
772                                 }
773                                 link_set_timer(l_ptr, cont_intv);
774                         } else if (l_ptr->fsm_msg_cnt < l_ptr->abort_limit) {
775                                 dbg_link("Probing %u/%u,timer = %u ms)\n",
776                                          l_ptr->fsm_msg_cnt, l_ptr->abort_limit,
777                                          cont_intv / 4);
778                                 tipc_link_send_proto_msg(l_ptr, STATE_MSG,
779                                                          1, 0, 0, 0, 0);
780                                 l_ptr->fsm_msg_cnt++;
781                                 link_set_timer(l_ptr, cont_intv / 4);
782                         } else {        /* Link has failed */
783                                 dbg_link("-> RU (%u probes unanswered)\n",
784                                          l_ptr->fsm_msg_cnt);
785                                 warn("Resetting link <%s>, peer not responding\n",
786                                      l_ptr->name);
787                                 tipc_link_reset(l_ptr);
788                                 l_ptr->state = RESET_UNKNOWN;
789                                 l_ptr->fsm_msg_cnt = 0;
790                                 tipc_link_send_proto_msg(l_ptr, RESET_MSG,
791                                                          0, 0, 0, 0, 0);
792                                 l_ptr->fsm_msg_cnt++;
793                                 link_set_timer(l_ptr, cont_intv);
794                         }
795                         break;
796                 default:
797                         err("Unknown link event %u in WU state\n", event);
798                 }
799                 break;
800         case RESET_UNKNOWN:
801                 dbg_link("RU/");
802                 switch (event) {
803                 case TRAFFIC_MSG_EVT:
804                         dbg_link("TRF-\n");
805                         break;
806                 case ACTIVATE_MSG:
807                         other = l_ptr->owner->active_links[0];
808                         if (other && link_working_unknown(other)) {
809                                 dbg_link("ACT\n");
810                                 break;
811                         }
812                         dbg_link("ACT -> WW\n");
813                         l_ptr->state = WORKING_WORKING;
814                         l_ptr->fsm_msg_cnt = 0;
815                         link_activate(l_ptr);
816                         tipc_link_send_proto_msg(l_ptr, STATE_MSG, 1, 0, 0, 0, 0);
817                         l_ptr->fsm_msg_cnt++;
818                         link_set_timer(l_ptr, cont_intv);
819                         break;
820                 case RESET_MSG:
821                         dbg_link("RES\n");
822                         dbg_link(" -> RR\n");
823                         l_ptr->state = RESET_RESET;
824                         l_ptr->fsm_msg_cnt = 0;
825                         tipc_link_send_proto_msg(l_ptr, ACTIVATE_MSG, 1, 0, 0, 0, 0);
826                         l_ptr->fsm_msg_cnt++;
827                         link_set_timer(l_ptr, cont_intv);
828                         break;
829                 case STARTING_EVT:
830                         dbg_link("START-");
831                         l_ptr->started = 1;
832                         /* fall through */
833                 case TIMEOUT_EVT:
834                         dbg_link("TIM\n");
835                         tipc_link_send_proto_msg(l_ptr, RESET_MSG, 0, 0, 0, 0, 0);
836                         l_ptr->fsm_msg_cnt++;
837                         link_set_timer(l_ptr, cont_intv);
838                         break;
839                 default:
840                         err("Unknown link event %u in RU state\n", event);
841                 }
842                 break;
843         case RESET_RESET:
844                 dbg_link("RR/ ");
845                 switch (event) {
846                 case TRAFFIC_MSG_EVT:
847                         dbg_link("TRF-");
848                         /* fall through */
849                 case ACTIVATE_MSG:
850                         other = l_ptr->owner->active_links[0];
851                         if (other && link_working_unknown(other)) {
852                                 dbg_link("ACT\n");
853                                 break;
854                         }
855                         dbg_link("ACT -> WW\n");
856                         l_ptr->state = WORKING_WORKING;
857                         l_ptr->fsm_msg_cnt = 0;
858                         link_activate(l_ptr);
859                         tipc_link_send_proto_msg(l_ptr, STATE_MSG, 1, 0, 0, 0, 0);
860                         l_ptr->fsm_msg_cnt++;
861                         link_set_timer(l_ptr, cont_intv);
862                         break;
863                 case RESET_MSG:
864                         dbg_link("RES\n");
865                         break;
866                 case TIMEOUT_EVT:
867                         dbg_link("TIM\n");
868                         tipc_link_send_proto_msg(l_ptr, ACTIVATE_MSG, 0, 0, 0, 0, 0);
869                         l_ptr->fsm_msg_cnt++;
870                         link_set_timer(l_ptr, cont_intv);
871                         dbg_link("fsm_msg_cnt %u\n", l_ptr->fsm_msg_cnt);
872                         break;
873                 default:
874                         err("Unknown link event %u in RR state\n", event);
875                 }
876                 break;
877         default:
878                 err("Unknown link state %u/%u\n", l_ptr->state, event);
879         }
880 }
881
882 /*
883  * link_bundle_buf(): Append contents of a buffer to
884  * the tail of an existing one.
885  */
886
887 static int link_bundle_buf(struct link *l_ptr,
888                            struct sk_buff *bundler,
889                            struct sk_buff *buf)
890 {
891         struct tipc_msg *bundler_msg = buf_msg(bundler);
892         struct tipc_msg *msg = buf_msg(buf);
893         u32 size = msg_size(msg);
894         u32 bundle_size = msg_size(bundler_msg);
895         u32 to_pos = align(bundle_size);
896         u32 pad = to_pos - bundle_size;
897
898         if (msg_user(bundler_msg) != MSG_BUNDLER)
899                 return 0;
900         if (msg_type(bundler_msg) != OPEN_MSG)
901                 return 0;
902         if (skb_tailroom(bundler) < (pad + size))
903                 return 0;
904         if (l_ptr->max_pkt < (to_pos + size))
905                 return 0;
906
907         skb_put(bundler, pad + size);
908         skb_copy_to_linear_data_offset(bundler, to_pos, buf->data, size);
909         msg_set_size(bundler_msg, to_pos + size);
910         msg_set_msgcnt(bundler_msg, msg_msgcnt(bundler_msg) + 1);
911         buf_discard(buf);
912         l_ptr->stats.sent_bundled++;
913         return 1;
914 }
915
916 static void link_add_to_outqueue(struct link *l_ptr,
917                                  struct sk_buff *buf,
918                                  struct tipc_msg *msg)
919 {
920         u32 ack = mod(l_ptr->next_in_no - 1);
921         u32 seqno = mod(l_ptr->next_out_no++);
922
923         msg_set_word(msg, 2, ((ack << 16) | seqno));
924         msg_set_bcast_ack(msg, l_ptr->owner->bclink.last_in);
925         buf->next = NULL;
926         if (l_ptr->first_out) {
927                 l_ptr->last_out->next = buf;
928                 l_ptr->last_out = buf;
929         } else
930                 l_ptr->first_out = l_ptr->last_out = buf;
931         l_ptr->out_queue_size++;
932 }
933
934 /*
935  * tipc_link_send_buf() is the 'full path' for messages, called from
936  * inside TIPC when the 'fast path' in tipc_send_buf
937  * has failed, and from link_send()
938  */
939
940 int tipc_link_send_buf(struct link *l_ptr, struct sk_buff *buf)
941 {
942         struct tipc_msg *msg = buf_msg(buf);
943         u32 size = msg_size(msg);
944         u32 dsz = msg_data_sz(msg);
945         u32 queue_size = l_ptr->out_queue_size;
946         u32 imp = tipc_msg_tot_importance(msg);
947         u32 queue_limit = l_ptr->queue_limit[imp];
948         u32 max_packet = l_ptr->max_pkt;
949
950         msg_set_prevnode(msg, tipc_own_addr);   /* If routed message */
951
952         /* Match msg importance against queue limits: */
953
954         if (unlikely(queue_size >= queue_limit)) {
955                 if (imp <= TIPC_CRITICAL_IMPORTANCE) {
956                         return link_schedule_port(l_ptr, msg_origport(msg),
957                                                   size);
958                 }
959                 buf_discard(buf);
960                 if (imp > CONN_MANAGER) {
961                         warn("Resetting link <%s>, send queue full", l_ptr->name);
962                         tipc_link_reset(l_ptr);
963                 }
964                 return dsz;
965         }
966
967         /* Fragmentation needed ? */
968
969         if (size > max_packet)
970                 return link_send_long_buf(l_ptr, buf);
971
972         /* Packet can be queued or sent: */
973
974         if (queue_size > l_ptr->stats.max_queue_sz)
975                 l_ptr->stats.max_queue_sz = queue_size;
976
977         if (likely(!tipc_bearer_congested(l_ptr->b_ptr, l_ptr) &&
978                    !link_congested(l_ptr))) {
979                 link_add_to_outqueue(l_ptr, buf, msg);
980
981                 if (likely(tipc_bearer_send(l_ptr->b_ptr, buf, &l_ptr->media_addr))) {
982                         l_ptr->unacked_window = 0;
983                 } else {
984                         tipc_bearer_schedule(l_ptr->b_ptr, l_ptr);
985                         l_ptr->stats.bearer_congs++;
986                         l_ptr->next_out = buf;
987                 }
988                 return dsz;
989         }
990         /* Congestion: can message be bundled ?: */
991
992         if ((msg_user(msg) != CHANGEOVER_PROTOCOL) &&
993             (msg_user(msg) != MSG_FRAGMENTER)) {
994
995                 /* Try adding message to an existing bundle */
996
997                 if (l_ptr->next_out &&
998                     link_bundle_buf(l_ptr, l_ptr->last_out, buf)) {
999                         tipc_bearer_resolve_congestion(l_ptr->b_ptr, l_ptr);
1000                         return dsz;
1001                 }
1002
1003                 /* Try creating a new bundle */
1004
1005                 if (size <= max_packet * 2 / 3) {
1006                         struct sk_buff *bundler = tipc_buf_acquire(max_packet);
1007                         struct tipc_msg bundler_hdr;
1008
1009                         if (bundler) {
1010                                 tipc_msg_init(&bundler_hdr, MSG_BUNDLER, OPEN_MSG,
1011                                          INT_H_SIZE, l_ptr->addr);
1012                                 skb_copy_to_linear_data(bundler, &bundler_hdr,
1013                                                         INT_H_SIZE);
1014                                 skb_trim(bundler, INT_H_SIZE);
1015                                 link_bundle_buf(l_ptr, bundler, buf);
1016                                 buf = bundler;
1017                                 msg = buf_msg(buf);
1018                                 l_ptr->stats.sent_bundles++;
1019                         }
1020                 }
1021         }
1022         if (!l_ptr->next_out)
1023                 l_ptr->next_out = buf;
1024         link_add_to_outqueue(l_ptr, buf, msg);
1025         tipc_bearer_resolve_congestion(l_ptr->b_ptr, l_ptr);
1026         return dsz;
1027 }
1028
1029 /*
1030  * tipc_link_send(): same as tipc_link_send_buf(), but the link to use has
1031  * not been selected yet, and the the owner node is not locked
1032  * Called by TIPC internal users, e.g. the name distributor
1033  */
1034
1035 int tipc_link_send(struct sk_buff *buf, u32 dest, u32 selector)
1036 {
1037         struct link *l_ptr;
1038         struct tipc_node *n_ptr;
1039         int res = -ELINKCONG;
1040
1041         read_lock_bh(&tipc_net_lock);
1042         n_ptr = tipc_node_find(dest);
1043         if (n_ptr) {
1044                 tipc_node_lock(n_ptr);
1045                 l_ptr = n_ptr->active_links[selector & 1];
1046                 if (l_ptr) {
1047                         res = tipc_link_send_buf(l_ptr, buf);
1048                 } else {
1049                         buf_discard(buf);
1050                 }
1051                 tipc_node_unlock(n_ptr);
1052         } else {
1053                 buf_discard(buf);
1054         }
1055         read_unlock_bh(&tipc_net_lock);
1056         return res;
1057 }
1058
1059 /*
1060  * link_send_buf_fast: Entry for data messages where the
1061  * destination link is known and the header is complete,
1062  * inclusive total message length. Very time critical.
1063  * Link is locked. Returns user data length.
1064  */
1065
1066 static int link_send_buf_fast(struct link *l_ptr, struct sk_buff *buf,
1067                               u32 *used_max_pkt)
1068 {
1069         struct tipc_msg *msg = buf_msg(buf);
1070         int res = msg_data_sz(msg);
1071
1072         if (likely(!link_congested(l_ptr))) {
1073                 if (likely(msg_size(msg) <= l_ptr->max_pkt)) {
1074                         if (likely(list_empty(&l_ptr->b_ptr->cong_links))) {
1075                                 link_add_to_outqueue(l_ptr, buf, msg);
1076                                 if (likely(tipc_bearer_send(l_ptr->b_ptr, buf,
1077                                                             &l_ptr->media_addr))) {
1078                                         l_ptr->unacked_window = 0;
1079                                         return res;
1080                                 }
1081                                 tipc_bearer_schedule(l_ptr->b_ptr, l_ptr);
1082                                 l_ptr->stats.bearer_congs++;
1083                                 l_ptr->next_out = buf;
1084                                 return res;
1085                         }
1086                 }
1087                 else
1088                         *used_max_pkt = l_ptr->max_pkt;
1089         }
1090         return tipc_link_send_buf(l_ptr, buf);  /* All other cases */
1091 }
1092
1093 /*
1094  * tipc_send_buf_fast: Entry for data messages where the
1095  * destination node is known and the header is complete,
1096  * inclusive total message length.
1097  * Returns user data length.
1098  */
1099 int tipc_send_buf_fast(struct sk_buff *buf, u32 destnode)
1100 {
1101         struct link *l_ptr;
1102         struct tipc_node *n_ptr;
1103         int res;
1104         u32 selector = msg_origport(buf_msg(buf)) & 1;
1105         u32 dummy;
1106
1107         if (destnode == tipc_own_addr)
1108                 return tipc_port_recv_msg(buf);
1109
1110         read_lock_bh(&tipc_net_lock);
1111         n_ptr = tipc_node_find(destnode);
1112         if (likely(n_ptr)) {
1113                 tipc_node_lock(n_ptr);
1114                 l_ptr = n_ptr->active_links[selector];
1115                 if (likely(l_ptr)) {
1116                         res = link_send_buf_fast(l_ptr, buf, &dummy);
1117                         tipc_node_unlock(n_ptr);
1118                         read_unlock_bh(&tipc_net_lock);
1119                         return res;
1120                 }
1121                 tipc_node_unlock(n_ptr);
1122         }
1123         read_unlock_bh(&tipc_net_lock);
1124         res = msg_data_sz(buf_msg(buf));
1125         tipc_reject_msg(buf, TIPC_ERR_NO_NODE);
1126         return res;
1127 }
1128
1129
1130 /*
1131  * tipc_link_send_sections_fast: Entry for messages where the
1132  * destination processor is known and the header is complete,
1133  * except for total message length.
1134  * Returns user data length or errno.
1135  */
1136 int tipc_link_send_sections_fast(struct port *sender,
1137                                  struct iovec const *msg_sect,
1138                                  const u32 num_sect,
1139                                  u32 destaddr)
1140 {
1141         struct tipc_msg *hdr = &sender->publ.phdr;
1142         struct link *l_ptr;
1143         struct sk_buff *buf;
1144         struct tipc_node *node;
1145         int res;
1146         u32 selector = msg_origport(hdr) & 1;
1147
1148 again:
1149         /*
1150          * Try building message using port's max_pkt hint.
1151          * (Must not hold any locks while building message.)
1152          */
1153
1154         res = tipc_msg_build(hdr, msg_sect, num_sect, sender->publ.max_pkt,
1155                         !sender->user_port, &buf);
1156
1157         read_lock_bh(&tipc_net_lock);
1158         node = tipc_node_find(destaddr);
1159         if (likely(node)) {
1160                 tipc_node_lock(node);
1161                 l_ptr = node->active_links[selector];
1162                 if (likely(l_ptr)) {
1163                         if (likely(buf)) {
1164                                 res = link_send_buf_fast(l_ptr, buf,
1165                                                          &sender->publ.max_pkt);
1166                                 if (unlikely(res < 0))
1167                                         buf_discard(buf);
1168 exit:
1169                                 tipc_node_unlock(node);
1170                                 read_unlock_bh(&tipc_net_lock);
1171                                 return res;
1172                         }
1173
1174                         /* Exit if build request was invalid */
1175
1176                         if (unlikely(res < 0))
1177                                 goto exit;
1178
1179                         /* Exit if link (or bearer) is congested */
1180
1181                         if (link_congested(l_ptr) ||
1182                             !list_empty(&l_ptr->b_ptr->cong_links)) {
1183                                 res = link_schedule_port(l_ptr,
1184                                                          sender->publ.ref, res);
1185                                 goto exit;
1186                         }
1187
1188                         /*
1189                          * Message size exceeds max_pkt hint; update hint,
1190                          * then re-try fast path or fragment the message
1191                          */
1192
1193                         sender->publ.max_pkt = l_ptr->max_pkt;
1194                         tipc_node_unlock(node);
1195                         read_unlock_bh(&tipc_net_lock);
1196
1197
1198                         if ((msg_hdr_sz(hdr) + res) <= sender->publ.max_pkt)
1199                                 goto again;
1200
1201                         return link_send_sections_long(sender, msg_sect,
1202                                                        num_sect, destaddr);
1203                 }
1204                 tipc_node_unlock(node);
1205         }
1206         read_unlock_bh(&tipc_net_lock);
1207
1208         /* Couldn't find a link to the destination node */
1209
1210         if (buf)
1211                 return tipc_reject_msg(buf, TIPC_ERR_NO_NODE);
1212         if (res >= 0)
1213                 return tipc_port_reject_sections(sender, hdr, msg_sect, num_sect,
1214                                                  TIPC_ERR_NO_NODE);
1215         return res;
1216 }
1217
1218 /*
1219  * link_send_sections_long(): Entry for long messages where the
1220  * destination node is known and the header is complete,
1221  * inclusive total message length.
1222  * Link and bearer congestion status have been checked to be ok,
1223  * and are ignored if they change.
1224  *
1225  * Note that fragments do not use the full link MTU so that they won't have
1226  * to undergo refragmentation if link changeover causes them to be sent
1227  * over another link with an additional tunnel header added as prefix.
1228  * (Refragmentation will still occur if the other link has a smaller MTU.)
1229  *
1230  * Returns user data length or errno.
1231  */
1232 static int link_send_sections_long(struct port *sender,
1233                                    struct iovec const *msg_sect,
1234                                    u32 num_sect,
1235                                    u32 destaddr)
1236 {
1237         struct link *l_ptr;
1238         struct tipc_node *node;
1239         struct tipc_msg *hdr = &sender->publ.phdr;
1240         u32 dsz = msg_data_sz(hdr);
1241         u32 max_pkt,fragm_sz,rest;
1242         struct tipc_msg fragm_hdr;
1243         struct sk_buff *buf,*buf_chain,*prev;
1244         u32 fragm_crs,fragm_rest,hsz,sect_rest;
1245         const unchar *sect_crs;
1246         int curr_sect;
1247         u32 fragm_no;
1248
1249 again:
1250         fragm_no = 1;
1251         max_pkt = sender->publ.max_pkt - INT_H_SIZE;
1252                 /* leave room for tunnel header in case of link changeover */
1253         fragm_sz = max_pkt - INT_H_SIZE;
1254                 /* leave room for fragmentation header in each fragment */
1255         rest = dsz;
1256         fragm_crs = 0;
1257         fragm_rest = 0;
1258         sect_rest = 0;
1259         sect_crs = NULL;
1260         curr_sect = -1;
1261
1262         /* Prepare reusable fragment header: */
1263
1264         tipc_msg_init(&fragm_hdr, MSG_FRAGMENTER, FIRST_FRAGMENT,
1265                  INT_H_SIZE, msg_destnode(hdr));
1266         msg_set_link_selector(&fragm_hdr, sender->publ.ref);
1267         msg_set_size(&fragm_hdr, max_pkt);
1268         msg_set_fragm_no(&fragm_hdr, 1);
1269
1270         /* Prepare header of first fragment: */
1271
1272         buf_chain = buf = tipc_buf_acquire(max_pkt);
1273         if (!buf)
1274                 return -ENOMEM;
1275         buf->next = NULL;
1276         skb_copy_to_linear_data(buf, &fragm_hdr, INT_H_SIZE);
1277         hsz = msg_hdr_sz(hdr);
1278         skb_copy_to_linear_data_offset(buf, INT_H_SIZE, hdr, hsz);
1279
1280         /* Chop up message: */
1281
1282         fragm_crs = INT_H_SIZE + hsz;
1283         fragm_rest = fragm_sz - hsz;
1284
1285         do {            /* For all sections */
1286                 u32 sz;
1287
1288                 if (!sect_rest) {
1289                         sect_rest = msg_sect[++curr_sect].iov_len;
1290                         sect_crs = (const unchar *)msg_sect[curr_sect].iov_base;
1291                 }
1292
1293                 if (sect_rest < fragm_rest)
1294                         sz = sect_rest;
1295                 else
1296                         sz = fragm_rest;
1297
1298                 if (likely(!sender->user_port)) {
1299                         if (copy_from_user(buf->data + fragm_crs, sect_crs, sz)) {
1300 error:
1301                                 for (; buf_chain; buf_chain = buf) {
1302                                         buf = buf_chain->next;
1303                                         buf_discard(buf_chain);
1304                                 }
1305                                 return -EFAULT;
1306                         }
1307                 } else
1308                         skb_copy_to_linear_data_offset(buf, fragm_crs,
1309                                                        sect_crs, sz);
1310                 sect_crs += sz;
1311                 sect_rest -= sz;
1312                 fragm_crs += sz;
1313                 fragm_rest -= sz;
1314                 rest -= sz;
1315
1316                 if (!fragm_rest && rest) {
1317
1318                         /* Initiate new fragment: */
1319                         if (rest <= fragm_sz) {
1320                                 fragm_sz = rest;
1321                                 msg_set_type(&fragm_hdr,LAST_FRAGMENT);
1322                         } else {
1323                                 msg_set_type(&fragm_hdr, FRAGMENT);
1324                         }
1325                         msg_set_size(&fragm_hdr, fragm_sz + INT_H_SIZE);
1326                         msg_set_fragm_no(&fragm_hdr, ++fragm_no);
1327                         prev = buf;
1328                         buf = tipc_buf_acquire(fragm_sz + INT_H_SIZE);
1329                         if (!buf)
1330                                 goto error;
1331
1332                         buf->next = NULL;
1333                         prev->next = buf;
1334                         skb_copy_to_linear_data(buf, &fragm_hdr, INT_H_SIZE);
1335                         fragm_crs = INT_H_SIZE;
1336                         fragm_rest = fragm_sz;
1337                 }
1338         }
1339         while (rest > 0);
1340
1341         /*
1342          * Now we have a buffer chain. Select a link and check
1343          * that packet size is still OK
1344          */
1345         node = tipc_node_find(destaddr);
1346         if (likely(node)) {
1347                 tipc_node_lock(node);
1348                 l_ptr = node->active_links[sender->publ.ref & 1];
1349                 if (!l_ptr) {
1350                         tipc_node_unlock(node);
1351                         goto reject;
1352                 }
1353                 if (l_ptr->max_pkt < max_pkt) {
1354                         sender->publ.max_pkt = l_ptr->max_pkt;
1355                         tipc_node_unlock(node);
1356                         for (; buf_chain; buf_chain = buf) {
1357                                 buf = buf_chain->next;
1358                                 buf_discard(buf_chain);
1359                         }
1360                         goto again;
1361                 }
1362         } else {
1363 reject:
1364                 for (; buf_chain; buf_chain = buf) {
1365                         buf = buf_chain->next;
1366                         buf_discard(buf_chain);
1367                 }
1368                 return tipc_port_reject_sections(sender, hdr, msg_sect, num_sect,
1369                                                  TIPC_ERR_NO_NODE);
1370         }
1371
1372         /* Append whole chain to send queue: */
1373
1374         buf = buf_chain;
1375         l_ptr->long_msg_seq_no = mod(l_ptr->long_msg_seq_no + 1);
1376         if (!l_ptr->next_out)
1377                 l_ptr->next_out = buf_chain;
1378         l_ptr->stats.sent_fragmented++;
1379         while (buf) {
1380                 struct sk_buff *next = buf->next;
1381                 struct tipc_msg *msg = buf_msg(buf);
1382
1383                 l_ptr->stats.sent_fragments++;
1384                 msg_set_long_msgno(msg, l_ptr->long_msg_seq_no);
1385                 link_add_to_outqueue(l_ptr, buf, msg);
1386                 buf = next;
1387         }
1388
1389         /* Send it, if possible: */
1390
1391         tipc_link_push_queue(l_ptr);
1392         tipc_node_unlock(node);
1393         return dsz;
1394 }
1395
1396 /*
1397  * tipc_link_push_packet: Push one unsent packet to the media
1398  */
1399 u32 tipc_link_push_packet(struct link *l_ptr)
1400 {
1401         struct sk_buff *buf = l_ptr->first_out;
1402         u32 r_q_size = l_ptr->retransm_queue_size;
1403         u32 r_q_head = l_ptr->retransm_queue_head;
1404
1405         /* Step to position where retransmission failed, if any,    */
1406         /* consider that buffers may have been released in meantime */
1407
1408         if (r_q_size && buf) {
1409                 u32 last = lesser(mod(r_q_head + r_q_size),
1410                                   link_last_sent(l_ptr));
1411                 u32 first = msg_seqno(buf_msg(buf));
1412
1413                 while (buf && less(first, r_q_head)) {
1414                         first = mod(first + 1);
1415                         buf = buf->next;
1416                 }
1417                 l_ptr->retransm_queue_head = r_q_head = first;
1418                 l_ptr->retransm_queue_size = r_q_size = mod(last - first);
1419         }
1420
1421         /* Continue retransmission now, if there is anything: */
1422
1423         if (r_q_size && buf) {
1424                 msg_set_ack(buf_msg(buf), mod(l_ptr->next_in_no - 1));
1425                 msg_set_bcast_ack(buf_msg(buf), l_ptr->owner->bclink.last_in);
1426                 if (tipc_bearer_send(l_ptr->b_ptr, buf, &l_ptr->media_addr)) {
1427                         l_ptr->retransm_queue_head = mod(++r_q_head);
1428                         l_ptr->retransm_queue_size = --r_q_size;
1429                         l_ptr->stats.retransmitted++;
1430                         return 0;
1431                 } else {
1432                         l_ptr->stats.bearer_congs++;
1433                         return PUSH_FAILED;
1434                 }
1435         }
1436
1437         /* Send deferred protocol message, if any: */
1438
1439         buf = l_ptr->proto_msg_queue;
1440         if (buf) {
1441                 msg_set_ack(buf_msg(buf), mod(l_ptr->next_in_no - 1));
1442                 msg_set_bcast_ack(buf_msg(buf),l_ptr->owner->bclink.last_in);
1443                 if (tipc_bearer_send(l_ptr->b_ptr, buf, &l_ptr->media_addr)) {
1444                         l_ptr->unacked_window = 0;
1445                         buf_discard(buf);
1446                         l_ptr->proto_msg_queue = NULL;
1447                         return 0;
1448                 } else {
1449                         l_ptr->stats.bearer_congs++;
1450                         return PUSH_FAILED;
1451                 }
1452         }
1453
1454         /* Send one deferred data message, if send window not full: */
1455
1456         buf = l_ptr->next_out;
1457         if (buf) {
1458                 struct tipc_msg *msg = buf_msg(buf);
1459                 u32 next = msg_seqno(msg);
1460                 u32 first = msg_seqno(buf_msg(l_ptr->first_out));
1461
1462                 if (mod(next - first) < l_ptr->queue_limit[0]) {
1463                         msg_set_ack(msg, mod(l_ptr->next_in_no - 1));
1464                         msg_set_bcast_ack(msg, l_ptr->owner->bclink.last_in);
1465                         if (tipc_bearer_send(l_ptr->b_ptr, buf, &l_ptr->media_addr)) {
1466                                 if (msg_user(msg) == MSG_BUNDLER)
1467                                         msg_set_type(msg, CLOSED_MSG);
1468                                 l_ptr->next_out = buf->next;
1469                                 return 0;
1470                         } else {
1471                                 l_ptr->stats.bearer_congs++;
1472                                 return PUSH_FAILED;
1473                         }
1474                 }
1475         }
1476         return PUSH_FINISHED;
1477 }
1478
1479 /*
1480  * push_queue(): push out the unsent messages of a link where
1481  *               congestion has abated. Node is locked
1482  */
1483 void tipc_link_push_queue(struct link *l_ptr)
1484 {
1485         u32 res;
1486
1487         if (tipc_bearer_congested(l_ptr->b_ptr, l_ptr))
1488                 return;
1489
1490         do {
1491                 res = tipc_link_push_packet(l_ptr);
1492         } while (!res);
1493
1494         if (res == PUSH_FAILED)
1495                 tipc_bearer_schedule(l_ptr->b_ptr, l_ptr);
1496 }
1497
1498 static void link_reset_all(unsigned long addr)
1499 {
1500         struct tipc_node *n_ptr;
1501         char addr_string[16];
1502         u32 i;
1503
1504         read_lock_bh(&tipc_net_lock);
1505         n_ptr = tipc_node_find((u32)addr);
1506         if (!n_ptr) {
1507                 read_unlock_bh(&tipc_net_lock);
1508                 return; /* node no longer exists */
1509         }
1510
1511         tipc_node_lock(n_ptr);
1512
1513         warn("Resetting all links to %s\n",
1514              tipc_addr_string_fill(addr_string, n_ptr->addr));
1515
1516         for (i = 0; i < MAX_BEARERS; i++) {
1517                 if (n_ptr->links[i]) {
1518                         link_print(n_ptr->links[i], TIPC_OUTPUT,
1519                                    "Resetting link\n");
1520                         tipc_link_reset(n_ptr->links[i]);
1521                 }
1522         }
1523
1524         tipc_node_unlock(n_ptr);
1525         read_unlock_bh(&tipc_net_lock);
1526 }
1527
1528 static void link_retransmit_failure(struct link *l_ptr, struct sk_buff *buf)
1529 {
1530         struct tipc_msg *msg = buf_msg(buf);
1531
1532         warn("Retransmission failure on link <%s>\n", l_ptr->name);
1533
1534         if (l_ptr->addr) {
1535
1536                 /* Handle failure on standard link */
1537
1538                 link_print(l_ptr, TIPC_OUTPUT, "Resetting link\n");
1539                 tipc_link_reset(l_ptr);
1540
1541         } else {
1542
1543                 /* Handle failure on broadcast link */
1544
1545                 struct tipc_node *n_ptr;
1546                 char addr_string[16];
1547
1548                 tipc_printf(TIPC_OUTPUT, "Msg seq number: %u,  ", msg_seqno(msg));
1549                 tipc_printf(TIPC_OUTPUT, "Outstanding acks: %lu\n",
1550                                      (unsigned long) TIPC_SKB_CB(buf)->handle);
1551
1552                 n_ptr = l_ptr->owner->next;
1553                 tipc_node_lock(n_ptr);
1554
1555                 tipc_addr_string_fill(addr_string, n_ptr->addr);
1556                 tipc_printf(TIPC_OUTPUT, "Multicast link info for %s\n", addr_string);
1557                 tipc_printf(TIPC_OUTPUT, "Supported: %d,  ", n_ptr->bclink.supported);
1558                 tipc_printf(TIPC_OUTPUT, "Acked: %u\n", n_ptr->bclink.acked);
1559                 tipc_printf(TIPC_OUTPUT, "Last in: %u,  ", n_ptr->bclink.last_in);
1560                 tipc_printf(TIPC_OUTPUT, "Gap after: %u,  ", n_ptr->bclink.gap_after);
1561                 tipc_printf(TIPC_OUTPUT, "Gap to: %u\n", n_ptr->bclink.gap_to);
1562                 tipc_printf(TIPC_OUTPUT, "Nack sync: %u\n\n", n_ptr->bclink.nack_sync);
1563
1564                 tipc_k_signal((Handler)link_reset_all, (unsigned long)n_ptr->addr);
1565
1566                 tipc_node_unlock(n_ptr);
1567
1568                 l_ptr->stale_count = 0;
1569         }
1570 }
1571
1572 void tipc_link_retransmit(struct link *l_ptr, struct sk_buff *buf,
1573                           u32 retransmits)
1574 {
1575         struct tipc_msg *msg;
1576
1577         if (!buf)
1578                 return;
1579
1580         msg = buf_msg(buf);
1581
1582         if (tipc_bearer_congested(l_ptr->b_ptr, l_ptr)) {
1583                 if (l_ptr->retransm_queue_size == 0) {
1584                         dbg_print_link(l_ptr, "   ");
1585                         l_ptr->retransm_queue_head = msg_seqno(msg);
1586                         l_ptr->retransm_queue_size = retransmits;
1587                 } else {
1588                         err("Unexpected retransmit on link %s (qsize=%d)\n",
1589                             l_ptr->name, l_ptr->retransm_queue_size);
1590                 }
1591                 return;
1592         } else {
1593                 /* Detect repeated retransmit failures on uncongested bearer */
1594
1595                 if (l_ptr->last_retransmitted == msg_seqno(msg)) {
1596                         if (++l_ptr->stale_count > 100) {
1597                                 link_retransmit_failure(l_ptr, buf);
1598                                 return;
1599                         }
1600                 } else {
1601                         l_ptr->last_retransmitted = msg_seqno(msg);
1602                         l_ptr->stale_count = 1;
1603                 }
1604         }
1605
1606         while (retransmits && (buf != l_ptr->next_out) && buf) {
1607                 msg = buf_msg(buf);
1608                 msg_set_ack(msg, mod(l_ptr->next_in_no - 1));
1609                 msg_set_bcast_ack(msg, l_ptr->owner->bclink.last_in);
1610                 if (tipc_bearer_send(l_ptr->b_ptr, buf, &l_ptr->media_addr)) {
1611                         buf = buf->next;
1612                         retransmits--;
1613                         l_ptr->stats.retransmitted++;
1614                 } else {
1615                         tipc_bearer_schedule(l_ptr->b_ptr, l_ptr);
1616                         l_ptr->stats.bearer_congs++;
1617                         l_ptr->retransm_queue_head = msg_seqno(buf_msg(buf));
1618                         l_ptr->retransm_queue_size = retransmits;
1619                         return;
1620                 }
1621         }
1622
1623         l_ptr->retransm_queue_head = l_ptr->retransm_queue_size = 0;
1624 }
1625
1626 /**
1627  * link_insert_deferred_queue - insert deferred messages back into receive chain
1628  */
1629
1630 static struct sk_buff *link_insert_deferred_queue(struct link *l_ptr,
1631                                                   struct sk_buff *buf)
1632 {
1633         u32 seq_no;
1634
1635         if (l_ptr->oldest_deferred_in == NULL)
1636                 return buf;
1637
1638         seq_no = msg_seqno(buf_msg(l_ptr->oldest_deferred_in));
1639         if (seq_no == mod(l_ptr->next_in_no)) {
1640                 l_ptr->newest_deferred_in->next = buf;
1641                 buf = l_ptr->oldest_deferred_in;
1642                 l_ptr->oldest_deferred_in = NULL;
1643                 l_ptr->deferred_inqueue_sz = 0;
1644         }
1645         return buf;
1646 }
1647
1648 /**
1649  * link_recv_buf_validate - validate basic format of received message
1650  *
1651  * This routine ensures a TIPC message has an acceptable header, and at least
1652  * as much data as the header indicates it should.  The routine also ensures
1653  * that the entire message header is stored in the main fragment of the message
1654  * buffer, to simplify future access to message header fields.
1655  *
1656  * Note: Having extra info present in the message header or data areas is OK.
1657  * TIPC will ignore the excess, under the assumption that it is optional info
1658  * introduced by a later release of the protocol.
1659  */
1660
1661 static int link_recv_buf_validate(struct sk_buff *buf)
1662 {
1663         static u32 min_data_hdr_size[8] = {
1664                 SHORT_H_SIZE, MCAST_H_SIZE, LONG_H_SIZE, DIR_MSG_H_SIZE,
1665                 MAX_H_SIZE, MAX_H_SIZE, MAX_H_SIZE, MAX_H_SIZE
1666                 };
1667
1668         struct tipc_msg *msg;
1669         u32 tipc_hdr[2];
1670         u32 size;
1671         u32 hdr_size;
1672         u32 min_hdr_size;
1673
1674         if (unlikely(buf->len < MIN_H_SIZE))
1675                 return 0;
1676
1677         msg = skb_header_pointer(buf, 0, sizeof(tipc_hdr), tipc_hdr);
1678         if (msg == NULL)
1679                 return 0;
1680
1681         if (unlikely(msg_version(msg) != TIPC_VERSION))
1682                 return 0;
1683
1684         size = msg_size(msg);
1685         hdr_size = msg_hdr_sz(msg);
1686         min_hdr_size = msg_isdata(msg) ?
1687                 min_data_hdr_size[msg_type(msg)] : INT_H_SIZE;
1688
1689         if (unlikely((hdr_size < min_hdr_size) ||
1690                      (size < hdr_size) ||
1691                      (buf->len < size) ||
1692                      (size - hdr_size > TIPC_MAX_USER_MSG_SIZE)))
1693                 return 0;
1694
1695         return pskb_may_pull(buf, hdr_size);
1696 }
1697
1698 /**
1699  * tipc_recv_msg - process TIPC messages arriving from off-node
1700  * @head: pointer to message buffer chain
1701  * @tb_ptr: pointer to bearer message arrived on
1702  *
1703  * Invoked with no locks held.  Bearer pointer must point to a valid bearer
1704  * structure (i.e. cannot be NULL), but bearer can be inactive.
1705  */
1706
1707 void tipc_recv_msg(struct sk_buff *head, struct tipc_bearer *tb_ptr)
1708 {
1709         read_lock_bh(&tipc_net_lock);
1710         while (head) {
1711                 struct bearer *b_ptr = (struct bearer *)tb_ptr;
1712                 struct tipc_node *n_ptr;
1713                 struct link *l_ptr;
1714                 struct sk_buff *crs;
1715                 struct sk_buff *buf = head;
1716                 struct tipc_msg *msg;
1717                 u32 seq_no;
1718                 u32 ackd;
1719                 u32 released = 0;
1720                 int type;
1721
1722                 head = head->next;
1723
1724                 /* Ensure bearer is still enabled */
1725
1726                 if (unlikely(!b_ptr->active))
1727                         goto cont;
1728
1729                 /* Ensure message is well-formed */
1730
1731                 if (unlikely(!link_recv_buf_validate(buf)))
1732                         goto cont;
1733
1734                 /* Ensure message data is a single contiguous unit */
1735
1736                 if (unlikely(buf_linearize(buf))) {
1737                         goto cont;
1738                 }
1739
1740                 /* Handle arrival of a non-unicast link message */
1741
1742                 msg = buf_msg(buf);
1743
1744                 if (unlikely(msg_non_seq(msg))) {
1745                         if (msg_user(msg) ==  LINK_CONFIG)
1746                                 tipc_disc_recv_msg(buf, b_ptr);
1747                         else
1748                                 tipc_bclink_recv_pkt(buf);
1749                         continue;
1750                 }
1751
1752                 if (unlikely(!msg_short(msg) &&
1753                              (msg_destnode(msg) != tipc_own_addr)))
1754                         goto cont;
1755
1756                 /* Discard non-routeable messages destined for another node */
1757
1758                 if (unlikely(!msg_isdata(msg) &&
1759                              (msg_destnode(msg) != tipc_own_addr))) {
1760                         if ((msg_user(msg) != CONN_MANAGER) &&
1761                             (msg_user(msg) != MSG_FRAGMENTER))
1762                                 goto cont;
1763                 }
1764
1765                 /* Locate neighboring node that sent message */
1766
1767                 n_ptr = tipc_node_find(msg_prevnode(msg));
1768                 if (unlikely(!n_ptr))
1769                         goto cont;
1770                 tipc_node_lock(n_ptr);
1771
1772                 /* Don't talk to neighbor during cleanup after last session */
1773
1774                 if (n_ptr->cleanup_required) {
1775                         tipc_node_unlock(n_ptr);
1776                         goto cont;
1777                 }
1778
1779                 /* Locate unicast link endpoint that should handle message */
1780
1781                 l_ptr = n_ptr->links[b_ptr->identity];
1782                 if (unlikely(!l_ptr)) {
1783                         tipc_node_unlock(n_ptr);
1784                         goto cont;
1785                 }
1786
1787                 /* Validate message sequence number info */
1788
1789                 seq_no = msg_seqno(msg);
1790                 ackd = msg_ack(msg);
1791
1792                 /* Release acked messages */
1793
1794                 if (less(n_ptr->bclink.acked, msg_bcast_ack(msg))) {
1795                         if (tipc_node_is_up(n_ptr) && n_ptr->bclink.supported)
1796                                 tipc_bclink_acknowledge(n_ptr, msg_bcast_ack(msg));
1797                 }
1798
1799                 crs = l_ptr->first_out;
1800                 while ((crs != l_ptr->next_out) &&
1801                        less_eq(msg_seqno(buf_msg(crs)), ackd)) {
1802                         struct sk_buff *next = crs->next;
1803
1804                         buf_discard(crs);
1805                         crs = next;
1806                         released++;
1807                 }
1808                 if (released) {
1809                         l_ptr->first_out = crs;
1810                         l_ptr->out_queue_size -= released;
1811                 }
1812
1813                 /* Try sending any messages link endpoint has pending */
1814
1815                 if (unlikely(l_ptr->next_out))
1816                         tipc_link_push_queue(l_ptr);
1817                 if (unlikely(!list_empty(&l_ptr->waiting_ports)))
1818                         tipc_link_wakeup_ports(l_ptr, 0);
1819                 if (unlikely(++l_ptr->unacked_window >= TIPC_MIN_LINK_WIN)) {
1820                         l_ptr->stats.sent_acks++;
1821                         tipc_link_send_proto_msg(l_ptr, STATE_MSG, 0, 0, 0, 0, 0);
1822                 }
1823
1824                 /* Now (finally!) process the incoming message */
1825
1826 protocol_check:
1827                 if (likely(link_working_working(l_ptr))) {
1828                         if (likely(seq_no == mod(l_ptr->next_in_no))) {
1829                                 l_ptr->next_in_no++;
1830                                 if (unlikely(l_ptr->oldest_deferred_in))
1831                                         head = link_insert_deferred_queue(l_ptr,
1832                                                                           head);
1833                                 if (likely(msg_is_dest(msg, tipc_own_addr))) {
1834 deliver:
1835                                         if (likely(msg_isdata(msg))) {
1836                                                 tipc_node_unlock(n_ptr);
1837                                                 tipc_port_recv_msg(buf);
1838                                                 continue;
1839                                         }
1840                                         switch (msg_user(msg)) {
1841                                         case MSG_BUNDLER:
1842                                                 l_ptr->stats.recv_bundles++;
1843                                                 l_ptr->stats.recv_bundled +=
1844                                                         msg_msgcnt(msg);
1845                                                 tipc_node_unlock(n_ptr);
1846                                                 tipc_link_recv_bundle(buf);
1847                                                 continue;
1848                                         case ROUTE_DISTRIBUTOR:
1849                                                 tipc_node_unlock(n_ptr);
1850                                                 buf_discard(buf);
1851                                                 continue;
1852                                         case NAME_DISTRIBUTOR:
1853                                                 tipc_node_unlock(n_ptr);
1854                                                 tipc_named_recv(buf);
1855                                                 continue;
1856                                         case CONN_MANAGER:
1857                                                 tipc_node_unlock(n_ptr);
1858                                                 tipc_port_recv_proto_msg(buf);
1859                                                 continue;
1860                                         case MSG_FRAGMENTER:
1861                                                 l_ptr->stats.recv_fragments++;
1862                                                 if (tipc_link_recv_fragment(&l_ptr->defragm_buf,
1863                                                                             &buf, &msg)) {
1864                                                         l_ptr->stats.recv_fragmented++;
1865                                                         goto deliver;
1866                                                 }
1867                                                 break;
1868                                         case CHANGEOVER_PROTOCOL:
1869                                                 type = msg_type(msg);
1870                                                 if (link_recv_changeover_msg(&l_ptr, &buf)) {
1871                                                         msg = buf_msg(buf);
1872                                                         seq_no = msg_seqno(msg);
1873                                                         if (type == ORIGINAL_MSG)
1874                                                                 goto deliver;
1875                                                         goto protocol_check;
1876                                                 }
1877                                                 break;
1878                                         }
1879                                 }
1880                                 tipc_node_unlock(n_ptr);
1881                                 tipc_net_route_msg(buf);
1882                                 continue;
1883                         }
1884                         link_handle_out_of_seq_msg(l_ptr, buf);
1885                         head = link_insert_deferred_queue(l_ptr, head);
1886                         tipc_node_unlock(n_ptr);
1887                         continue;
1888                 }
1889
1890                 if (msg_user(msg) == LINK_PROTOCOL) {
1891                         link_recv_proto_msg(l_ptr, buf);
1892                         head = link_insert_deferred_queue(l_ptr, head);
1893                         tipc_node_unlock(n_ptr);
1894                         continue;
1895                 }
1896                 link_state_event(l_ptr, TRAFFIC_MSG_EVT);
1897
1898                 if (link_working_working(l_ptr)) {
1899                         /* Re-insert in front of queue */
1900                         buf->next = head;
1901                         head = buf;
1902                         tipc_node_unlock(n_ptr);
1903                         continue;
1904                 }
1905                 tipc_node_unlock(n_ptr);
1906 cont:
1907                 buf_discard(buf);
1908         }
1909         read_unlock_bh(&tipc_net_lock);
1910 }
1911
1912 /*
1913  * link_defer_buf(): Sort a received out-of-sequence packet
1914  *                   into the deferred reception queue.
1915  * Returns the increase of the queue length,i.e. 0 or 1
1916  */
1917
1918 u32 tipc_link_defer_pkt(struct sk_buff **head,
1919                         struct sk_buff **tail,
1920                         struct sk_buff *buf)
1921 {
1922         struct sk_buff *prev = NULL;
1923         struct sk_buff *crs = *head;
1924         u32 seq_no = msg_seqno(buf_msg(buf));
1925
1926         buf->next = NULL;
1927
1928         /* Empty queue ? */
1929         if (*head == NULL) {
1930                 *head = *tail = buf;
1931                 return 1;
1932         }
1933
1934         /* Last ? */
1935         if (less(msg_seqno(buf_msg(*tail)), seq_no)) {
1936                 (*tail)->next = buf;
1937                 *tail = buf;
1938                 return 1;
1939         }
1940
1941         /* Scan through queue and sort it in */
1942         do {
1943                 struct tipc_msg *msg = buf_msg(crs);
1944
1945                 if (less(seq_no, msg_seqno(msg))) {
1946                         buf->next = crs;
1947                         if (prev)
1948                                 prev->next = buf;
1949                         else
1950                                 *head = buf;
1951                         return 1;
1952                 }
1953                 if (seq_no == msg_seqno(msg)) {
1954                         break;
1955                 }
1956                 prev = crs;
1957                 crs = crs->next;
1958         }
1959         while (crs);
1960
1961         /* Message is a duplicate of an existing message */
1962
1963         buf_discard(buf);
1964         return 0;
1965 }
1966
1967 /**
1968  * link_handle_out_of_seq_msg - handle arrival of out-of-sequence packet
1969  */
1970
1971 static void link_handle_out_of_seq_msg(struct link *l_ptr,
1972                                        struct sk_buff *buf)
1973 {
1974         u32 seq_no = msg_seqno(buf_msg(buf));
1975
1976         if (likely(msg_user(buf_msg(buf)) == LINK_PROTOCOL)) {
1977                 link_recv_proto_msg(l_ptr, buf);
1978                 return;
1979         }
1980
1981         /* Record OOS packet arrival (force mismatch on next timeout) */
1982
1983         l_ptr->checkpoint--;
1984
1985         /*
1986          * Discard packet if a duplicate; otherwise add it to deferred queue
1987          * and notify peer of gap as per protocol specification
1988          */
1989
1990         if (less(seq_no, mod(l_ptr->next_in_no))) {
1991                 l_ptr->stats.duplicates++;
1992                 buf_discard(buf);
1993                 return;
1994         }
1995
1996         if (tipc_link_defer_pkt(&l_ptr->oldest_deferred_in,
1997                                 &l_ptr->newest_deferred_in, buf)) {
1998                 l_ptr->deferred_inqueue_sz++;
1999                 l_ptr->stats.deferred_recv++;
2000                 if ((l_ptr->deferred_inqueue_sz % 16) == 1)
2001                         tipc_link_send_proto_msg(l_ptr, STATE_MSG, 0, 0, 0, 0, 0);
2002         } else
2003                 l_ptr->stats.duplicates++;
2004 }
2005
2006 /*
2007  * Send protocol message to the other endpoint.
2008  */
2009 void tipc_link_send_proto_msg(struct link *l_ptr, u32 msg_typ, int probe_msg,
2010                               u32 gap, u32 tolerance, u32 priority, u32 ack_mtu)
2011 {
2012         struct sk_buff *buf = NULL;
2013         struct tipc_msg *msg = l_ptr->pmsg;
2014         u32 msg_size = sizeof(l_ptr->proto_msg);
2015
2016         if (link_blocked(l_ptr))
2017                 return;
2018         msg_set_type(msg, msg_typ);
2019         msg_set_net_plane(msg, l_ptr->b_ptr->net_plane);
2020         msg_set_bcast_ack(msg, mod(l_ptr->owner->bclink.last_in));
2021         msg_set_last_bcast(msg, tipc_bclink_get_last_sent());
2022
2023         if (msg_typ == STATE_MSG) {
2024                 u32 next_sent = mod(l_ptr->next_out_no);
2025
2026                 if (!tipc_link_is_up(l_ptr))
2027                         return;
2028                 if (l_ptr->next_out)
2029                         next_sent = msg_seqno(buf_msg(l_ptr->next_out));
2030                 msg_set_next_sent(msg, next_sent);
2031                 if (l_ptr->oldest_deferred_in) {
2032                         u32 rec = msg_seqno(buf_msg(l_ptr->oldest_deferred_in));
2033                         gap = mod(rec - mod(l_ptr->next_in_no));
2034                 }
2035                 msg_set_seq_gap(msg, gap);
2036                 if (gap)
2037                         l_ptr->stats.sent_nacks++;
2038                 msg_set_link_tolerance(msg, tolerance);
2039                 msg_set_linkprio(msg, priority);
2040                 msg_set_max_pkt(msg, ack_mtu);
2041                 msg_set_ack(msg, mod(l_ptr->next_in_no - 1));
2042                 msg_set_probe(msg, probe_msg != 0);
2043                 if (probe_msg) {
2044                         u32 mtu = l_ptr->max_pkt;
2045
2046                         if ((mtu < l_ptr->max_pkt_target) &&
2047                             link_working_working(l_ptr) &&
2048                             l_ptr->fsm_msg_cnt) {
2049                                 msg_size = (mtu + (l_ptr->max_pkt_target - mtu)/2 + 2) & ~3;
2050                                 if (l_ptr->max_pkt_probes == 10) {
2051                                         l_ptr->max_pkt_target = (msg_size - 4);
2052                                         l_ptr->max_pkt_probes = 0;
2053                                         msg_size = (mtu + (l_ptr->max_pkt_target - mtu)/2 + 2) & ~3;
2054                                 }
2055                                 l_ptr->max_pkt_probes++;
2056                         }
2057
2058                         l_ptr->stats.sent_probes++;
2059                 }
2060                 l_ptr->stats.sent_states++;
2061         } else {                /* RESET_MSG or ACTIVATE_MSG */
2062                 msg_set_ack(msg, mod(l_ptr->reset_checkpoint - 1));
2063                 msg_set_seq_gap(msg, 0);
2064                 msg_set_next_sent(msg, 1);
2065                 msg_set_link_tolerance(msg, l_ptr->tolerance);
2066                 msg_set_linkprio(msg, l_ptr->priority);
2067                 msg_set_max_pkt(msg, l_ptr->max_pkt_target);
2068         }
2069
2070         if (tipc_node_has_redundant_links(l_ptr->owner)) {
2071                 msg_set_redundant_link(msg);
2072         } else {
2073                 msg_clear_redundant_link(msg);
2074         }
2075         msg_set_linkprio(msg, l_ptr->priority);
2076
2077         /* Ensure sequence number will not fit : */
2078
2079         msg_set_seqno(msg, mod(l_ptr->next_out_no + (0xffff/2)));
2080
2081         /* Congestion? */
2082
2083         if (tipc_bearer_congested(l_ptr->b_ptr, l_ptr)) {
2084                 if (!l_ptr->proto_msg_queue) {
2085                         l_ptr->proto_msg_queue =
2086                                 tipc_buf_acquire(sizeof(l_ptr->proto_msg));
2087                 }
2088                 buf = l_ptr->proto_msg_queue;
2089                 if (!buf)
2090                         return;
2091                 skb_copy_to_linear_data(buf, msg, sizeof(l_ptr->proto_msg));
2092                 return;
2093         }
2094         msg_set_timestamp(msg, jiffies_to_msecs(jiffies));
2095
2096         /* Message can be sent */
2097
2098         buf = tipc_buf_acquire(msg_size);
2099         if (!buf)
2100                 return;
2101
2102         skb_copy_to_linear_data(buf, msg, sizeof(l_ptr->proto_msg));
2103         msg_set_size(buf_msg(buf), msg_size);
2104
2105         if (tipc_bearer_send(l_ptr->b_ptr, buf, &l_ptr->media_addr)) {
2106                 l_ptr->unacked_window = 0;
2107                 buf_discard(buf);
2108                 return;
2109         }
2110
2111         /* New congestion */
2112         tipc_bearer_schedule(l_ptr->b_ptr, l_ptr);
2113         l_ptr->proto_msg_queue = buf;
2114         l_ptr->stats.bearer_congs++;
2115 }
2116
2117 /*
2118  * Receive protocol message :
2119  * Note that network plane id propagates through the network, and may
2120  * change at any time. The node with lowest address rules
2121  */
2122
2123 static void link_recv_proto_msg(struct link *l_ptr, struct sk_buff *buf)
2124 {
2125         u32 rec_gap = 0;
2126         u32 max_pkt_info;
2127         u32 max_pkt_ack;
2128         u32 msg_tol;
2129         struct tipc_msg *msg = buf_msg(buf);
2130
2131         if (link_blocked(l_ptr))
2132                 goto exit;
2133
2134         /* record unnumbered packet arrival (force mismatch on next timeout) */
2135
2136         l_ptr->checkpoint--;
2137
2138         if (l_ptr->b_ptr->net_plane != msg_net_plane(msg))
2139                 if (tipc_own_addr > msg_prevnode(msg))
2140                         l_ptr->b_ptr->net_plane = msg_net_plane(msg);
2141
2142         l_ptr->owner->permit_changeover = msg_redundant_link(msg);
2143
2144         switch (msg_type(msg)) {
2145
2146         case RESET_MSG:
2147                 if (!link_working_unknown(l_ptr) &&
2148                     (l_ptr->peer_session != INVALID_SESSION)) {
2149                         if (msg_session(msg) == l_ptr->peer_session)
2150                                 break; /* duplicate: ignore */
2151                 }
2152                 /* fall thru' */
2153         case ACTIVATE_MSG:
2154                 /* Update link settings according other endpoint's values */
2155
2156                 strcpy((strrchr(l_ptr->name, ':') + 1), (char *)msg_data(msg));
2157
2158                 if ((msg_tol = msg_link_tolerance(msg)) &&
2159                     (msg_tol > l_ptr->tolerance))
2160                         link_set_supervision_props(l_ptr, msg_tol);
2161
2162                 if (msg_linkprio(msg) > l_ptr->priority)
2163                         l_ptr->priority = msg_linkprio(msg);
2164
2165                 max_pkt_info = msg_max_pkt(msg);
2166                 if (max_pkt_info) {
2167                         if (max_pkt_info < l_ptr->max_pkt_target)
2168                                 l_ptr->max_pkt_target = max_pkt_info;
2169                         if (l_ptr->max_pkt > l_ptr->max_pkt_target)
2170                                 l_ptr->max_pkt = l_ptr->max_pkt_target;
2171                 } else {
2172                         l_ptr->max_pkt = l_ptr->max_pkt_target;
2173                 }
2174                 l_ptr->owner->bclink.supported = (max_pkt_info != 0);
2175
2176                 link_state_event(l_ptr, msg_type(msg));
2177
2178                 l_ptr->peer_session = msg_session(msg);
2179                 l_ptr->peer_bearer_id = msg_bearer_id(msg);
2180
2181                 /* Synchronize broadcast sequence numbers */
2182                 if (!tipc_node_has_redundant_links(l_ptr->owner)) {
2183                         l_ptr->owner->bclink.last_in = mod(msg_last_bcast(msg));
2184                 }
2185                 break;
2186         case STATE_MSG:
2187
2188                 if ((msg_tol = msg_link_tolerance(msg)))
2189                         link_set_supervision_props(l_ptr, msg_tol);
2190
2191                 if (msg_linkprio(msg) &&
2192                     (msg_linkprio(msg) != l_ptr->priority)) {
2193                         warn("Resetting link <%s>, priority change %u->%u\n",
2194                              l_ptr->name, l_ptr->priority, msg_linkprio(msg));
2195                         l_ptr->priority = msg_linkprio(msg);
2196                         tipc_link_reset(l_ptr); /* Enforce change to take effect */
2197                         break;
2198                 }
2199                 link_state_event(l_ptr, TRAFFIC_MSG_EVT);
2200                 l_ptr->stats.recv_states++;
2201                 if (link_reset_unknown(l_ptr))
2202                         break;
2203
2204                 if (less_eq(mod(l_ptr->next_in_no), msg_next_sent(msg))) {
2205                         rec_gap = mod(msg_next_sent(msg) -
2206                                       mod(l_ptr->next_in_no));
2207                 }
2208
2209                 max_pkt_ack = msg_max_pkt(msg);
2210                 if (max_pkt_ack > l_ptr->max_pkt) {
2211                         l_ptr->max_pkt = max_pkt_ack;
2212                         l_ptr->max_pkt_probes = 0;
2213                 }
2214
2215                 max_pkt_ack = 0;
2216                 if (msg_probe(msg)) {
2217                         l_ptr->stats.recv_probes++;
2218                         if (msg_size(msg) > sizeof(l_ptr->proto_msg)) {
2219                                 max_pkt_ack = msg_size(msg);
2220                         }
2221                 }
2222
2223                 /* Protocol message before retransmits, reduce loss risk */
2224
2225                 tipc_bclink_check_gap(l_ptr->owner, msg_last_bcast(msg));
2226
2227                 if (rec_gap || (msg_probe(msg))) {
2228                         tipc_link_send_proto_msg(l_ptr, STATE_MSG,
2229                                                  0, rec_gap, 0, 0, max_pkt_ack);
2230                 }
2231                 if (msg_seq_gap(msg)) {
2232                         l_ptr->stats.recv_nacks++;
2233                         tipc_link_retransmit(l_ptr, l_ptr->first_out,
2234                                              msg_seq_gap(msg));
2235                 }
2236                 break;
2237         }
2238 exit:
2239         buf_discard(buf);
2240 }
2241
2242
2243 /*
2244  * tipc_link_tunnel(): Send one message via a link belonging to
2245  * another bearer. Owner node is locked.
2246  */
2247 static void tipc_link_tunnel(struct link *l_ptr,
2248                              struct tipc_msg *tunnel_hdr,
2249                              struct tipc_msg  *msg,
2250                              u32 selector)
2251 {
2252         struct link *tunnel;
2253         struct sk_buff *buf;
2254         u32 length = msg_size(msg);
2255
2256         tunnel = l_ptr->owner->active_links[selector & 1];
2257         if (!tipc_link_is_up(tunnel)) {
2258                 warn("Link changeover error, "
2259                      "tunnel link no longer available\n");
2260                 return;
2261         }
2262         msg_set_size(tunnel_hdr, length + INT_H_SIZE);
2263         buf = tipc_buf_acquire(length + INT_H_SIZE);
2264         if (!buf) {
2265                 warn("Link changeover error, "
2266                      "unable to send tunnel msg\n");
2267                 return;
2268         }
2269         skb_copy_to_linear_data(buf, tunnel_hdr, INT_H_SIZE);
2270         skb_copy_to_linear_data_offset(buf, INT_H_SIZE, msg, length);
2271         tipc_link_send_buf(tunnel, buf);
2272 }
2273
2274
2275
2276 /*
2277  * changeover(): Send whole message queue via the remaining link
2278  *               Owner node is locked.
2279  */
2280
2281 void tipc_link_changeover(struct link *l_ptr)
2282 {
2283         u32 msgcount = l_ptr->out_queue_size;
2284         struct sk_buff *crs = l_ptr->first_out;
2285         struct link *tunnel = l_ptr->owner->active_links[0];
2286         struct tipc_msg tunnel_hdr;
2287         int split_bundles;
2288
2289         if (!tunnel)
2290                 return;
2291
2292         if (!l_ptr->owner->permit_changeover) {
2293                 warn("Link changeover error, "
2294                      "peer did not permit changeover\n");
2295                 return;
2296         }
2297
2298         tipc_msg_init(&tunnel_hdr, CHANGEOVER_PROTOCOL,
2299                  ORIGINAL_MSG, INT_H_SIZE, l_ptr->addr);
2300         msg_set_bearer_id(&tunnel_hdr, l_ptr->peer_bearer_id);
2301         msg_set_msgcnt(&tunnel_hdr, msgcount);
2302
2303         if (!l_ptr->first_out) {
2304                 struct sk_buff *buf;
2305
2306                 buf = tipc_buf_acquire(INT_H_SIZE);
2307                 if (buf) {
2308                         skb_copy_to_linear_data(buf, &tunnel_hdr, INT_H_SIZE);
2309                         msg_set_size(&tunnel_hdr, INT_H_SIZE);
2310                         tipc_link_send_buf(tunnel, buf);
2311                 } else {
2312                         warn("Link changeover error, "
2313                              "unable to send changeover msg\n");
2314                 }
2315                 return;
2316         }
2317
2318         split_bundles = (l_ptr->owner->active_links[0] !=
2319                          l_ptr->owner->active_links[1]);
2320
2321         while (crs) {
2322                 struct tipc_msg *msg = buf_msg(crs);
2323
2324                 if ((msg_user(msg) == MSG_BUNDLER) && split_bundles) {
2325                         struct tipc_msg *m = msg_get_wrapped(msg);
2326                         unchar* pos = (unchar*)m;
2327
2328                         msgcount = msg_msgcnt(msg);
2329                         while (msgcount--) {
2330                                 msg_set_seqno(m,msg_seqno(msg));
2331                                 tipc_link_tunnel(l_ptr, &tunnel_hdr, m,
2332                                                  msg_link_selector(m));
2333                                 pos += align(msg_size(m));
2334                                 m = (struct tipc_msg *)pos;
2335                         }
2336                 } else {
2337                         tipc_link_tunnel(l_ptr, &tunnel_hdr, msg,
2338                                          msg_link_selector(msg));
2339                 }
2340                 crs = crs->next;
2341         }
2342 }
2343
2344 void tipc_link_send_duplicate(struct link *l_ptr, struct link *tunnel)
2345 {
2346         struct sk_buff *iter;
2347         struct tipc_msg tunnel_hdr;
2348
2349         tipc_msg_init(&tunnel_hdr, CHANGEOVER_PROTOCOL,
2350                  DUPLICATE_MSG, INT_H_SIZE, l_ptr->addr);
2351         msg_set_msgcnt(&tunnel_hdr, l_ptr->out_queue_size);
2352         msg_set_bearer_id(&tunnel_hdr, l_ptr->peer_bearer_id);
2353         iter = l_ptr->first_out;
2354         while (iter) {
2355                 struct sk_buff *outbuf;
2356                 struct tipc_msg *msg = buf_msg(iter);
2357                 u32 length = msg_size(msg);
2358
2359                 if (msg_user(msg) == MSG_BUNDLER)
2360                         msg_set_type(msg, CLOSED_MSG);
2361                 msg_set_ack(msg, mod(l_ptr->next_in_no - 1));   /* Update */
2362                 msg_set_bcast_ack(msg, l_ptr->owner->bclink.last_in);
2363                 msg_set_size(&tunnel_hdr, length + INT_H_SIZE);
2364                 outbuf = tipc_buf_acquire(length + INT_H_SIZE);
2365                 if (outbuf == NULL) {
2366                         warn("Link changeover error, "
2367                              "unable to send duplicate msg\n");
2368                         return;
2369                 }
2370                 skb_copy_to_linear_data(outbuf, &tunnel_hdr, INT_H_SIZE);
2371                 skb_copy_to_linear_data_offset(outbuf, INT_H_SIZE, iter->data,
2372                                                length);
2373                 tipc_link_send_buf(tunnel, outbuf);
2374                 if (!tipc_link_is_up(l_ptr))
2375                         return;
2376                 iter = iter->next;
2377         }
2378 }
2379
2380
2381
2382 /**
2383  * buf_extract - extracts embedded TIPC message from another message
2384  * @skb: encapsulating message buffer
2385  * @from_pos: offset to extract from
2386  *
2387  * Returns a new message buffer containing an embedded message.  The
2388  * encapsulating message itself is left unchanged.
2389  */
2390
2391 static struct sk_buff *buf_extract(struct sk_buff *skb, u32 from_pos)
2392 {
2393         struct tipc_msg *msg = (struct tipc_msg *)(skb->data + from_pos);
2394         u32 size = msg_size(msg);
2395         struct sk_buff *eb;
2396
2397         eb = tipc_buf_acquire(size);
2398         if (eb)
2399                 skb_copy_to_linear_data(eb, msg, size);
2400         return eb;
2401 }
2402
2403 /*
2404  *  link_recv_changeover_msg(): Receive tunneled packet sent
2405  *  via other link. Node is locked. Return extracted buffer.
2406  */
2407
2408 static int link_recv_changeover_msg(struct link **l_ptr,
2409                                     struct sk_buff **buf)
2410 {
2411         struct sk_buff *tunnel_buf = *buf;
2412         struct link *dest_link;
2413         struct tipc_msg *msg;
2414         struct tipc_msg *tunnel_msg = buf_msg(tunnel_buf);
2415         u32 msg_typ = msg_type(tunnel_msg);
2416         u32 msg_count = msg_msgcnt(tunnel_msg);
2417
2418         dest_link = (*l_ptr)->owner->links[msg_bearer_id(tunnel_msg)];
2419         if (!dest_link)
2420                 goto exit;
2421         if (dest_link == *l_ptr) {
2422                 err("Unexpected changeover message on link <%s>\n",
2423                     (*l_ptr)->name);
2424                 goto exit;
2425         }
2426         *l_ptr = dest_link;
2427         msg = msg_get_wrapped(tunnel_msg);
2428
2429         if (msg_typ == DUPLICATE_MSG) {
2430                 if (less(msg_seqno(msg), mod(dest_link->next_in_no)))
2431                         goto exit;
2432                 *buf = buf_extract(tunnel_buf,INT_H_SIZE);
2433                 if (*buf == NULL) {
2434                         warn("Link changeover error, duplicate msg dropped\n");
2435                         goto exit;
2436                 }
2437                 buf_discard(tunnel_buf);
2438                 return 1;
2439         }
2440
2441         /* First original message ?: */
2442
2443         if (tipc_link_is_up(dest_link)) {
2444                 info("Resetting link <%s>, changeover initiated by peer\n",
2445                      dest_link->name);
2446                 tipc_link_reset(dest_link);
2447                 dest_link->exp_msg_count = msg_count;
2448                 if (!msg_count)
2449                         goto exit;
2450         } else if (dest_link->exp_msg_count == START_CHANGEOVER) {
2451                 dest_link->exp_msg_count = msg_count;
2452                 if (!msg_count)
2453                         goto exit;
2454         }
2455
2456         /* Receive original message */
2457
2458         if (dest_link->exp_msg_count == 0) {
2459                 warn("Link switchover error, "
2460                      "got too many tunnelled messages\n");
2461                 dbg_print_link(dest_link, "LINK:");
2462                 goto exit;
2463         }
2464         dest_link->exp_msg_count--;
2465         if (less(msg_seqno(msg), dest_link->reset_checkpoint)) {
2466                 goto exit;
2467         } else {
2468                 *buf = buf_extract(tunnel_buf, INT_H_SIZE);
2469                 if (*buf != NULL) {
2470                         buf_discard(tunnel_buf);
2471                         return 1;
2472                 } else {
2473                         warn("Link changeover error, original msg dropped\n");
2474                 }
2475         }
2476 exit:
2477         *buf = NULL;
2478         buf_discard(tunnel_buf);
2479         return 0;
2480 }
2481
2482 /*
2483  *  Bundler functionality:
2484  */
2485 void tipc_link_recv_bundle(struct sk_buff *buf)
2486 {
2487         u32 msgcount = msg_msgcnt(buf_msg(buf));
2488         u32 pos = INT_H_SIZE;
2489         struct sk_buff *obuf;
2490
2491         while (msgcount--) {
2492                 obuf = buf_extract(buf, pos);
2493                 if (obuf == NULL) {
2494                         warn("Link unable to unbundle message(s)\n");
2495                         break;
2496                 }
2497                 pos += align(msg_size(buf_msg(obuf)));
2498                 tipc_net_route_msg(obuf);
2499         }
2500         buf_discard(buf);
2501 }
2502
2503 /*
2504  *  Fragmentation/defragmentation:
2505  */
2506
2507
2508 /*
2509  * link_send_long_buf: Entry for buffers needing fragmentation.
2510  * The buffer is complete, inclusive total message length.
2511  * Returns user data length.
2512  */
2513 static int link_send_long_buf(struct link *l_ptr, struct sk_buff *buf)
2514 {
2515         struct tipc_msg *inmsg = buf_msg(buf);
2516         struct tipc_msg fragm_hdr;
2517         u32 insize = msg_size(inmsg);
2518         u32 dsz = msg_data_sz(inmsg);
2519         unchar *crs = buf->data;
2520         u32 rest = insize;
2521         u32 pack_sz = l_ptr->max_pkt;
2522         u32 fragm_sz = pack_sz - INT_H_SIZE;
2523         u32 fragm_no = 1;
2524         u32 destaddr;
2525
2526         if (msg_short(inmsg))
2527                 destaddr = l_ptr->addr;
2528         else
2529                 destaddr = msg_destnode(inmsg);
2530
2531         if (msg_routed(inmsg))
2532                 msg_set_prevnode(inmsg, tipc_own_addr);
2533
2534         /* Prepare reusable fragment header: */
2535
2536         tipc_msg_init(&fragm_hdr, MSG_FRAGMENTER, FIRST_FRAGMENT,
2537                  INT_H_SIZE, destaddr);
2538         msg_set_link_selector(&fragm_hdr, msg_link_selector(inmsg));
2539         msg_set_long_msgno(&fragm_hdr, mod(l_ptr->long_msg_seq_no++));
2540         msg_set_fragm_no(&fragm_hdr, fragm_no);
2541         l_ptr->stats.sent_fragmented++;
2542
2543         /* Chop up message: */
2544
2545         while (rest > 0) {
2546                 struct sk_buff *fragm;
2547
2548                 if (rest <= fragm_sz) {
2549                         fragm_sz = rest;
2550                         msg_set_type(&fragm_hdr, LAST_FRAGMENT);
2551                 }
2552                 fragm = tipc_buf_acquire(fragm_sz + INT_H_SIZE);
2553                 if (fragm == NULL) {
2554                         warn("Link unable to fragment message\n");
2555                         dsz = -ENOMEM;
2556                         goto exit;
2557                 }
2558                 msg_set_size(&fragm_hdr, fragm_sz + INT_H_SIZE);
2559                 skb_copy_to_linear_data(fragm, &fragm_hdr, INT_H_SIZE);
2560                 skb_copy_to_linear_data_offset(fragm, INT_H_SIZE, crs,
2561                                                fragm_sz);
2562                 /*  Send queued messages first, if any: */
2563
2564                 l_ptr->stats.sent_fragments++;
2565                 tipc_link_send_buf(l_ptr, fragm);
2566                 if (!tipc_link_is_up(l_ptr))
2567                         return dsz;
2568                 msg_set_fragm_no(&fragm_hdr, ++fragm_no);
2569                 rest -= fragm_sz;
2570                 crs += fragm_sz;
2571                 msg_set_type(&fragm_hdr, FRAGMENT);
2572         }
2573 exit:
2574         buf_discard(buf);
2575         return dsz;
2576 }
2577
2578 /*
2579  * A pending message being re-assembled must store certain values
2580  * to handle subsequent fragments correctly. The following functions
2581  * help storing these values in unused, available fields in the
2582  * pending message. This makes dynamic memory allocation unecessary.
2583  */
2584
2585 static void set_long_msg_seqno(struct sk_buff *buf, u32 seqno)
2586 {
2587         msg_set_seqno(buf_msg(buf), seqno);
2588 }
2589
2590 static u32 get_fragm_size(struct sk_buff *buf)
2591 {
2592         return msg_ack(buf_msg(buf));
2593 }
2594
2595 static void set_fragm_size(struct sk_buff *buf, u32 sz)
2596 {
2597         msg_set_ack(buf_msg(buf), sz);
2598 }
2599
2600 static u32 get_expected_frags(struct sk_buff *buf)
2601 {
2602         return msg_bcast_ack(buf_msg(buf));
2603 }
2604
2605 static void set_expected_frags(struct sk_buff *buf, u32 exp)
2606 {
2607         msg_set_bcast_ack(buf_msg(buf), exp);
2608 }
2609
2610 static u32 get_timer_cnt(struct sk_buff *buf)
2611 {
2612         return msg_reroute_cnt(buf_msg(buf));
2613 }
2614
2615 static void incr_timer_cnt(struct sk_buff *buf)
2616 {
2617         msg_incr_reroute_cnt(buf_msg(buf));
2618 }
2619
2620 /*
2621  * tipc_link_recv_fragment(): Called with node lock on. Returns
2622  * the reassembled buffer if message is complete.
2623  */
2624 int tipc_link_recv_fragment(struct sk_buff **pending, struct sk_buff **fb,
2625                             struct tipc_msg **m)
2626 {
2627         struct sk_buff *prev = NULL;
2628         struct sk_buff *fbuf = *fb;
2629         struct tipc_msg *fragm = buf_msg(fbuf);
2630         struct sk_buff *pbuf = *pending;
2631         u32 long_msg_seq_no = msg_long_msgno(fragm);
2632
2633         *fb = NULL;
2634
2635         /* Is there an incomplete message waiting for this fragment? */
2636
2637         while (pbuf && ((msg_seqno(buf_msg(pbuf)) != long_msg_seq_no) ||
2638                         (msg_orignode(fragm) != msg_orignode(buf_msg(pbuf))))) {
2639                 prev = pbuf;
2640                 pbuf = pbuf->next;
2641         }
2642
2643         if (!pbuf && (msg_type(fragm) == FIRST_FRAGMENT)) {
2644                 struct tipc_msg *imsg = (struct tipc_msg *)msg_data(fragm);
2645                 u32 msg_sz = msg_size(imsg);
2646                 u32 fragm_sz = msg_data_sz(fragm);
2647                 u32 exp_fragm_cnt = msg_sz/fragm_sz + !!(msg_sz % fragm_sz);
2648                 u32 max =  TIPC_MAX_USER_MSG_SIZE + LONG_H_SIZE;
2649                 if (msg_type(imsg) == TIPC_MCAST_MSG)
2650                         max = TIPC_MAX_USER_MSG_SIZE + MCAST_H_SIZE;
2651                 if (msg_size(imsg) > max) {
2652                         buf_discard(fbuf);
2653                         return 0;
2654                 }
2655                 pbuf = tipc_buf_acquire(msg_size(imsg));
2656                 if (pbuf != NULL) {
2657                         pbuf->next = *pending;
2658                         *pending = pbuf;
2659                         skb_copy_to_linear_data(pbuf, imsg,
2660                                                 msg_data_sz(fragm));
2661                         /*  Prepare buffer for subsequent fragments. */
2662
2663                         set_long_msg_seqno(pbuf, long_msg_seq_no);
2664                         set_fragm_size(pbuf,fragm_sz);
2665                         set_expected_frags(pbuf,exp_fragm_cnt - 1);
2666                 } else {
2667                         warn("Link unable to reassemble fragmented message\n");
2668                 }
2669                 buf_discard(fbuf);
2670                 return 0;
2671         } else if (pbuf && (msg_type(fragm) != FIRST_FRAGMENT)) {
2672                 u32 dsz = msg_data_sz(fragm);
2673                 u32 fsz = get_fragm_size(pbuf);
2674                 u32 crs = ((msg_fragm_no(fragm) - 1) * fsz);
2675                 u32 exp_frags = get_expected_frags(pbuf) - 1;
2676                 skb_copy_to_linear_data_offset(pbuf, crs,
2677                                                msg_data(fragm), dsz);
2678                 buf_discard(fbuf);
2679
2680                 /* Is message complete? */
2681
2682                 if (exp_frags == 0) {
2683                         if (prev)
2684                                 prev->next = pbuf->next;
2685                         else
2686                                 *pending = pbuf->next;
2687                         msg_reset_reroute_cnt(buf_msg(pbuf));
2688                         *fb = pbuf;
2689                         *m = buf_msg(pbuf);
2690                         return 1;
2691                 }
2692                 set_expected_frags(pbuf,exp_frags);
2693                 return 0;
2694         }
2695         buf_discard(fbuf);
2696         return 0;
2697 }
2698
2699 /**
2700  * link_check_defragm_bufs - flush stale incoming message fragments
2701  * @l_ptr: pointer to link
2702  */
2703
2704 static void link_check_defragm_bufs(struct link *l_ptr)
2705 {
2706         struct sk_buff *prev = NULL;
2707         struct sk_buff *next = NULL;
2708         struct sk_buff *buf = l_ptr->defragm_buf;
2709
2710         if (!buf)
2711                 return;
2712         if (!link_working_working(l_ptr))
2713                 return;
2714         while (buf) {
2715                 u32 cnt = get_timer_cnt(buf);
2716
2717                 next = buf->next;
2718                 if (cnt < 4) {
2719                         incr_timer_cnt(buf);
2720                         prev = buf;
2721                 } else {
2722                         if (prev)
2723                                 prev->next = buf->next;
2724                         else
2725                                 l_ptr->defragm_buf = buf->next;
2726                         buf_discard(buf);
2727                 }
2728                 buf = next;
2729         }
2730 }
2731
2732
2733
2734 static void link_set_supervision_props(struct link *l_ptr, u32 tolerance)
2735 {
2736         l_ptr->tolerance = tolerance;
2737         l_ptr->continuity_interval =
2738                 ((tolerance / 4) > 500) ? 500 : tolerance / 4;
2739         l_ptr->abort_limit = tolerance / (l_ptr->continuity_interval / 4);
2740 }
2741
2742
2743 void tipc_link_set_queue_limits(struct link *l_ptr, u32 window)
2744 {
2745         /* Data messages from this node, inclusive FIRST_FRAGM */
2746         l_ptr->queue_limit[TIPC_LOW_IMPORTANCE] = window;
2747         l_ptr->queue_limit[TIPC_MEDIUM_IMPORTANCE] = (window / 3) * 4;
2748         l_ptr->queue_limit[TIPC_HIGH_IMPORTANCE] = (window / 3) * 5;
2749         l_ptr->queue_limit[TIPC_CRITICAL_IMPORTANCE] = (window / 3) * 6;
2750         /* Transiting data messages,inclusive FIRST_FRAGM */
2751         l_ptr->queue_limit[TIPC_LOW_IMPORTANCE + 4] = 300;
2752         l_ptr->queue_limit[TIPC_MEDIUM_IMPORTANCE + 4] = 600;
2753         l_ptr->queue_limit[TIPC_HIGH_IMPORTANCE + 4] = 900;
2754         l_ptr->queue_limit[TIPC_CRITICAL_IMPORTANCE + 4] = 1200;
2755         l_ptr->queue_limit[CONN_MANAGER] = 1200;
2756         l_ptr->queue_limit[CHANGEOVER_PROTOCOL] = 2500;
2757         l_ptr->queue_limit[NAME_DISTRIBUTOR] = 3000;
2758         /* FRAGMENT and LAST_FRAGMENT packets */
2759         l_ptr->queue_limit[MSG_FRAGMENTER] = 4000;
2760 }
2761
2762 /**
2763  * link_find_link - locate link by name
2764  * @name - ptr to link name string
2765  * @node - ptr to area to be filled with ptr to associated node
2766  *
2767  * Caller must hold 'tipc_net_lock' to ensure node and bearer are not deleted;
2768  * this also prevents link deletion.
2769  *
2770  * Returns pointer to link (or 0 if invalid link name).
2771  */
2772
2773 static struct link *link_find_link(const char *name, struct tipc_node **node)
2774 {
2775         struct link_name link_name_parts;
2776         struct bearer *b_ptr;
2777         struct link *l_ptr;
2778
2779         if (!link_name_validate(name, &link_name_parts))
2780                 return NULL;
2781
2782         b_ptr = tipc_bearer_find_interface(link_name_parts.if_local);
2783         if (!b_ptr)
2784                 return NULL;
2785
2786         *node = tipc_node_find(link_name_parts.addr_peer);
2787         if (!*node)
2788                 return NULL;
2789
2790         l_ptr = (*node)->links[b_ptr->identity];
2791         if (!l_ptr || strcmp(l_ptr->name, name))
2792                 return NULL;
2793
2794         return l_ptr;
2795 }
2796
2797 struct sk_buff *tipc_link_cmd_config(const void *req_tlv_area, int req_tlv_space,
2798                                      u16 cmd)
2799 {
2800         struct tipc_link_config *args;
2801         u32 new_value;
2802         struct link *l_ptr;
2803         struct tipc_node *node;
2804         int res;
2805
2806         if (!TLV_CHECK(req_tlv_area, req_tlv_space, TIPC_TLV_LINK_CONFIG))
2807                 return tipc_cfg_reply_error_string(TIPC_CFG_TLV_ERROR);
2808
2809         args = (struct tipc_link_config *)TLV_DATA(req_tlv_area);
2810         new_value = ntohl(args->value);
2811
2812         if (!strcmp(args->name, tipc_bclink_name)) {
2813                 if ((cmd == TIPC_CMD_SET_LINK_WINDOW) &&
2814                     (tipc_bclink_set_queue_limits(new_value) == 0))
2815                         return tipc_cfg_reply_none();
2816                 return tipc_cfg_reply_error_string(TIPC_CFG_NOT_SUPPORTED
2817                                                    " (cannot change setting on broadcast link)");
2818         }
2819
2820         read_lock_bh(&tipc_net_lock);
2821         l_ptr = link_find_link(args->name, &node);
2822         if (!l_ptr) {
2823                 read_unlock_bh(&tipc_net_lock);
2824                 return tipc_cfg_reply_error_string("link not found");
2825         }
2826
2827         tipc_node_lock(node);
2828         res = -EINVAL;
2829         switch (cmd) {
2830         case TIPC_CMD_SET_LINK_TOL:
2831                 if ((new_value >= TIPC_MIN_LINK_TOL) &&
2832                     (new_value <= TIPC_MAX_LINK_TOL)) {
2833                         link_set_supervision_props(l_ptr, new_value);
2834                         tipc_link_send_proto_msg(l_ptr, STATE_MSG,
2835                                                  0, 0, new_value, 0, 0);
2836                         res = 0;
2837                 }
2838                 break;
2839         case TIPC_CMD_SET_LINK_PRI:
2840                 if ((new_value >= TIPC_MIN_LINK_PRI) &&
2841                     (new_value <= TIPC_MAX_LINK_PRI)) {
2842                         l_ptr->priority = new_value;
2843                         tipc_link_send_proto_msg(l_ptr, STATE_MSG,
2844                                                  0, 0, 0, new_value, 0);
2845                         res = 0;
2846                 }
2847                 break;
2848         case TIPC_CMD_SET_LINK_WINDOW:
2849                 if ((new_value >= TIPC_MIN_LINK_WIN) &&
2850                     (new_value <= TIPC_MAX_LINK_WIN)) {
2851                         tipc_link_set_queue_limits(l_ptr, new_value);
2852                         res = 0;
2853                 }
2854                 break;
2855         }
2856         tipc_node_unlock(node);
2857
2858         read_unlock_bh(&tipc_net_lock);
2859         if (res)
2860                 return tipc_cfg_reply_error_string("cannot change link setting");
2861
2862         return tipc_cfg_reply_none();
2863 }
2864
2865 /**
2866  * link_reset_statistics - reset link statistics
2867  * @l_ptr: pointer to link
2868  */
2869
2870 static void link_reset_statistics(struct link *l_ptr)
2871 {
2872         memset(&l_ptr->stats, 0, sizeof(l_ptr->stats));
2873         l_ptr->stats.sent_info = l_ptr->next_out_no;
2874         l_ptr->stats.recv_info = l_ptr->next_in_no;
2875 }
2876
2877 struct sk_buff *tipc_link_cmd_reset_stats(const void *req_tlv_area, int req_tlv_space)
2878 {
2879         char *link_name;
2880         struct link *l_ptr;
2881         struct tipc_node *node;
2882
2883         if (!TLV_CHECK(req_tlv_area, req_tlv_space, TIPC_TLV_LINK_NAME))
2884                 return tipc_cfg_reply_error_string(TIPC_CFG_TLV_ERROR);
2885
2886         link_name = (char *)TLV_DATA(req_tlv_area);
2887         if (!strcmp(link_name, tipc_bclink_name)) {
2888                 if (tipc_bclink_reset_stats())
2889                         return tipc_cfg_reply_error_string("link not found");
2890                 return tipc_cfg_reply_none();
2891         }
2892
2893         read_lock_bh(&tipc_net_lock);
2894         l_ptr = link_find_link(link_name, &node);
2895         if (!l_ptr) {
2896                 read_unlock_bh(&tipc_net_lock);
2897                 return tipc_cfg_reply_error_string("link not found");
2898         }
2899
2900         tipc_node_lock(node);
2901         link_reset_statistics(l_ptr);
2902         tipc_node_unlock(node);
2903         read_unlock_bh(&tipc_net_lock);
2904         return tipc_cfg_reply_none();
2905 }
2906
2907 /**
2908  * percent - convert count to a percentage of total (rounding up or down)
2909  */
2910
2911 static u32 percent(u32 count, u32 total)
2912 {
2913         return (count * 100 + (total / 2)) / total;
2914 }
2915
2916 /**
2917  * tipc_link_stats - print link statistics
2918  * @name: link name
2919  * @buf: print buffer area
2920  * @buf_size: size of print buffer area
2921  *
2922  * Returns length of print buffer data string (or 0 if error)
2923  */
2924
2925 static int tipc_link_stats(const char *name, char *buf, const u32 buf_size)
2926 {
2927         struct print_buf pb;
2928         struct link *l_ptr;
2929         struct tipc_node *node;
2930         char *status;
2931         u32 profile_total = 0;
2932
2933         if (!strcmp(name, tipc_bclink_name))
2934                 return tipc_bclink_stats(buf, buf_size);
2935
2936         tipc_printbuf_init(&pb, buf, buf_size);
2937
2938         read_lock_bh(&tipc_net_lock);
2939         l_ptr = link_find_link(name, &node);
2940         if (!l_ptr) {
2941                 read_unlock_bh(&tipc_net_lock);
2942                 return 0;
2943         }
2944         tipc_node_lock(node);
2945
2946         if (tipc_link_is_active(l_ptr))
2947                 status = "ACTIVE";
2948         else if (tipc_link_is_up(l_ptr))
2949                 status = "STANDBY";
2950         else
2951                 status = "DEFUNCT";
2952         tipc_printf(&pb, "Link <%s>\n"
2953                          "  %s  MTU:%u  Priority:%u  Tolerance:%u ms"
2954                          "  Window:%u packets\n",
2955                     l_ptr->name, status, l_ptr->max_pkt,
2956                     l_ptr->priority, l_ptr->tolerance, l_ptr->queue_limit[0]);
2957         tipc_printf(&pb, "  RX packets:%u fragments:%u/%u bundles:%u/%u\n",
2958                     l_ptr->next_in_no - l_ptr->stats.recv_info,
2959                     l_ptr->stats.recv_fragments,
2960                     l_ptr->stats.recv_fragmented,
2961                     l_ptr->stats.recv_bundles,
2962                     l_ptr->stats.recv_bundled);
2963         tipc_printf(&pb, "  TX packets:%u fragments:%u/%u bundles:%u/%u\n",
2964                     l_ptr->next_out_no - l_ptr->stats.sent_info,
2965                     l_ptr->stats.sent_fragments,
2966                     l_ptr->stats.sent_fragmented,
2967                     l_ptr->stats.sent_bundles,
2968                     l_ptr->stats.sent_bundled);
2969         profile_total = l_ptr->stats.msg_length_counts;
2970         if (!profile_total)
2971                 profile_total = 1;
2972         tipc_printf(&pb, "  TX profile sample:%u packets  average:%u octets\n"
2973                          "  0-64:%u%% -256:%u%% -1024:%u%% -4096:%u%% "
2974                          "-16354:%u%% -32768:%u%% -66000:%u%%\n",
2975                     l_ptr->stats.msg_length_counts,
2976                     l_ptr->stats.msg_lengths_total / profile_total,
2977                     percent(l_ptr->stats.msg_length_profile[0], profile_total),
2978                     percent(l_ptr->stats.msg_length_profile[1], profile_total),
2979                     percent(l_ptr->stats.msg_length_profile[2], profile_total),
2980                     percent(l_ptr->stats.msg_length_profile[3], profile_total),
2981                     percent(l_ptr->stats.msg_length_profile[4], profile_total),
2982                     percent(l_ptr->stats.msg_length_profile[5], profile_total),
2983                     percent(l_ptr->stats.msg_length_profile[6], profile_total));
2984         tipc_printf(&pb, "  RX states:%u probes:%u naks:%u defs:%u dups:%u\n",
2985                     l_ptr->stats.recv_states,
2986                     l_ptr->stats.recv_probes,
2987                     l_ptr->stats.recv_nacks,
2988                     l_ptr->stats.deferred_recv,
2989                     l_ptr->stats.duplicates);
2990         tipc_printf(&pb, "  TX states:%u probes:%u naks:%u acks:%u dups:%u\n",
2991                     l_ptr->stats.sent_states,
2992                     l_ptr->stats.sent_probes,
2993                     l_ptr->stats.sent_nacks,
2994                     l_ptr->stats.sent_acks,
2995                     l_ptr->stats.retransmitted);
2996         tipc_printf(&pb, "  Congestion bearer:%u link:%u  Send queue max:%u avg:%u\n",
2997                     l_ptr->stats.bearer_congs,
2998                     l_ptr->stats.link_congs,
2999                     l_ptr->stats.max_queue_sz,
3000                     l_ptr->stats.queue_sz_counts
3001                     ? (l_ptr->stats.accu_queue_sz / l_ptr->stats.queue_sz_counts)
3002                     : 0);
3003
3004         tipc_node_unlock(node);
3005         read_unlock_bh(&tipc_net_lock);
3006         return tipc_printbuf_validate(&pb);
3007 }
3008
3009 #define MAX_LINK_STATS_INFO 2000
3010
3011 struct sk_buff *tipc_link_cmd_show_stats(const void *req_tlv_area, int req_tlv_space)
3012 {
3013         struct sk_buff *buf;
3014         struct tlv_desc *rep_tlv;
3015         int str_len;
3016
3017         if (!TLV_CHECK(req_tlv_area, req_tlv_space, TIPC_TLV_LINK_NAME))
3018                 return tipc_cfg_reply_error_string(TIPC_CFG_TLV_ERROR);
3019
3020         buf = tipc_cfg_reply_alloc(TLV_SPACE(MAX_LINK_STATS_INFO));
3021         if (!buf)
3022                 return NULL;
3023
3024         rep_tlv = (struct tlv_desc *)buf->data;
3025
3026         str_len = tipc_link_stats((char *)TLV_DATA(req_tlv_area),
3027                                   (char *)TLV_DATA(rep_tlv), MAX_LINK_STATS_INFO);
3028         if (!str_len) {
3029                 buf_discard(buf);
3030                 return tipc_cfg_reply_error_string("link not found");
3031         }
3032
3033         skb_put(buf, TLV_SPACE(str_len));
3034         TLV_SET(rep_tlv, TIPC_TLV_ULTRA_STRING, NULL, str_len);
3035
3036         return buf;
3037 }
3038
3039 /**
3040  * tipc_link_get_max_pkt - get maximum packet size to use when sending to destination
3041  * @dest: network address of destination node
3042  * @selector: used to select from set of active links
3043  *
3044  * If no active link can be found, uses default maximum packet size.
3045  */
3046
3047 u32 tipc_link_get_max_pkt(u32 dest, u32 selector)
3048 {
3049         struct tipc_node *n_ptr;
3050         struct link *l_ptr;
3051         u32 res = MAX_PKT_DEFAULT;
3052
3053         if (dest == tipc_own_addr)
3054                 return MAX_MSG_SIZE;
3055
3056         read_lock_bh(&tipc_net_lock);
3057         n_ptr = tipc_node_find(dest);
3058         if (n_ptr) {
3059                 tipc_node_lock(n_ptr);
3060                 l_ptr = n_ptr->active_links[selector & 1];
3061                 if (l_ptr)
3062                         res = l_ptr->max_pkt;
3063                 tipc_node_unlock(n_ptr);
3064         }
3065         read_unlock_bh(&tipc_net_lock);
3066         return res;
3067 }
3068
3069 static void link_print(struct link *l_ptr, struct print_buf *buf,
3070                        const char *str)
3071 {
3072         tipc_printf(buf, str);
3073         if (link_reset_reset(l_ptr) || link_reset_unknown(l_ptr))
3074                 return;
3075         tipc_printf(buf, "Link %x<%s>:",
3076                     l_ptr->addr, l_ptr->b_ptr->publ.name);
3077         tipc_printf(buf, ": NXO(%u):", mod(l_ptr->next_out_no));
3078         tipc_printf(buf, "NXI(%u):", mod(l_ptr->next_in_no));
3079         tipc_printf(buf, "SQUE");
3080         if (l_ptr->first_out) {
3081                 tipc_printf(buf, "[%u..", msg_seqno(buf_msg(l_ptr->first_out)));
3082                 if (l_ptr->next_out)
3083                         tipc_printf(buf, "%u..",
3084                                     msg_seqno(buf_msg(l_ptr->next_out)));
3085                 tipc_printf(buf, "%u]", msg_seqno(buf_msg(l_ptr->last_out)));
3086                 if ((mod(msg_seqno(buf_msg(l_ptr->last_out)) -
3087                          msg_seqno(buf_msg(l_ptr->first_out)))
3088                      != (l_ptr->out_queue_size - 1)) ||
3089                     (l_ptr->last_out->next != NULL)) {
3090                         tipc_printf(buf, "\nSend queue inconsistency\n");
3091                         tipc_printf(buf, "first_out= %x ", l_ptr->first_out);
3092                         tipc_printf(buf, "next_out= %x ", l_ptr->next_out);
3093                         tipc_printf(buf, "last_out= %x ", l_ptr->last_out);
3094                 }
3095         } else
3096                 tipc_printf(buf, "[]");
3097         tipc_printf(buf, "SQSIZ(%u)", l_ptr->out_queue_size);
3098         if (l_ptr->oldest_deferred_in) {
3099                 u32 o = msg_seqno(buf_msg(l_ptr->oldest_deferred_in));
3100                 u32 n = msg_seqno(buf_msg(l_ptr->newest_deferred_in));
3101                 tipc_printf(buf, ":RQUE[%u..%u]", o, n);
3102                 if (l_ptr->deferred_inqueue_sz != mod((n + 1) - o)) {
3103                         tipc_printf(buf, ":RQSIZ(%u)",
3104                                     l_ptr->deferred_inqueue_sz);
3105                 }
3106         }
3107         if (link_working_unknown(l_ptr))
3108                 tipc_printf(buf, ":WU");
3109         if (link_reset_reset(l_ptr))
3110                 tipc_printf(buf, ":RR");
3111         if (link_reset_unknown(l_ptr))
3112                 tipc_printf(buf, ":RU");
3113         if (link_working_working(l_ptr))
3114                 tipc_printf(buf, ":WW");
3115         tipc_printf(buf, "\n");
3116 }
3117