tipc: Reject connection protocol message sent to unconnected port
[linux-2.6.git] / net / tipc / port.c
1 /*
2  * net/tipc/port.c: TIPC port code
3  *
4  * Copyright (c) 1992-2007, Ericsson AB
5  * Copyright (c) 2004-2008, 2010-2011, Wind River Systems
6  * All rights reserved.
7  *
8  * Redistribution and use in source and binary forms, with or without
9  * modification, are permitted provided that the following conditions are met:
10  *
11  * 1. Redistributions of source code must retain the above copyright
12  *    notice, this list of conditions and the following disclaimer.
13  * 2. Redistributions in binary form must reproduce the above copyright
14  *    notice, this list of conditions and the following disclaimer in the
15  *    documentation and/or other materials provided with the distribution.
16  * 3. Neither the names of the copyright holders nor the names of its
17  *    contributors may be used to endorse or promote products derived from
18  *    this software without specific prior written permission.
19  *
20  * Alternatively, this software may be distributed under the terms of the
21  * GNU General Public License ("GPL") version 2 as published by the Free
22  * Software Foundation.
23  *
24  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
25  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
26  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
27  * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
28  * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
29  * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
30  * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
31  * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
32  * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
33  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
34  * POSSIBILITY OF SUCH DAMAGE.
35  */
36
37 #include "core.h"
38 #include "config.h"
39 #include "port.h"
40 #include "name_table.h"
41
42 /* Connection management: */
43 #define PROBING_INTERVAL 3600000        /* [ms] => 1 h */
44 #define CONFIRMED 0
45 #define PROBING 1
46
47 #define MAX_REJECT_SIZE 1024
48
49 static struct sk_buff *msg_queue_head;
50 static struct sk_buff *msg_queue_tail;
51
52 DEFINE_SPINLOCK(tipc_port_list_lock);
53 static DEFINE_SPINLOCK(queue_lock);
54
55 static LIST_HEAD(ports);
56 static void port_handle_node_down(unsigned long ref);
57 static struct sk_buff *port_build_self_abort_msg(struct tipc_port *, u32 err);
58 static struct sk_buff *port_build_peer_abort_msg(struct tipc_port *, u32 err);
59 static void port_timeout(unsigned long ref);
60
61
62 static u32 port_peernode(struct tipc_port *p_ptr)
63 {
64         return msg_destnode(&p_ptr->phdr);
65 }
66
67 static u32 port_peerport(struct tipc_port *p_ptr)
68 {
69         return msg_destport(&p_ptr->phdr);
70 }
71
72 /**
73  * tipc_multicast - send a multicast message to local and remote destinations
74  */
75
76 int tipc_multicast(u32 ref, struct tipc_name_seq const *seq,
77                    u32 num_sect, struct iovec const *msg_sect,
78                    unsigned int total_len)
79 {
80         struct tipc_msg *hdr;
81         struct sk_buff *buf;
82         struct sk_buff *ibuf = NULL;
83         struct port_list dports = {0, NULL, };
84         struct tipc_port *oport = tipc_port_deref(ref);
85         int ext_targets;
86         int res;
87
88         if (unlikely(!oport))
89                 return -EINVAL;
90
91         /* Create multicast message */
92
93         hdr = &oport->phdr;
94         msg_set_type(hdr, TIPC_MCAST_MSG);
95         msg_set_lookup_scope(hdr, TIPC_CLUSTER_SCOPE);
96         msg_set_destport(hdr, 0);
97         msg_set_destnode(hdr, 0);
98         msg_set_nametype(hdr, seq->type);
99         msg_set_namelower(hdr, seq->lower);
100         msg_set_nameupper(hdr, seq->upper);
101         msg_set_hdr_sz(hdr, MCAST_H_SIZE);
102         res = tipc_msg_build(hdr, msg_sect, num_sect, total_len, MAX_MSG_SIZE,
103                         !oport->user_port, &buf);
104         if (unlikely(!buf))
105                 return res;
106
107         /* Figure out where to send multicast message */
108
109         ext_targets = tipc_nametbl_mc_translate(seq->type, seq->lower, seq->upper,
110                                                 TIPC_NODE_SCOPE, &dports);
111
112         /* Send message to destinations (duplicate it only if necessary) */
113
114         if (ext_targets) {
115                 if (dports.count != 0) {
116                         ibuf = skb_copy(buf, GFP_ATOMIC);
117                         if (ibuf == NULL) {
118                                 tipc_port_list_free(&dports);
119                                 buf_discard(buf);
120                                 return -ENOMEM;
121                         }
122                 }
123                 res = tipc_bclink_send_msg(buf);
124                 if ((res < 0) && (dports.count != 0))
125                         buf_discard(ibuf);
126         } else {
127                 ibuf = buf;
128         }
129
130         if (res >= 0) {
131                 if (ibuf)
132                         tipc_port_recv_mcast(ibuf, &dports);
133         } else {
134                 tipc_port_list_free(&dports);
135         }
136         return res;
137 }
138
139 /**
140  * tipc_port_recv_mcast - deliver multicast message to all destination ports
141  *
142  * If there is no port list, perform a lookup to create one
143  */
144
145 void tipc_port_recv_mcast(struct sk_buff *buf, struct port_list *dp)
146 {
147         struct tipc_msg *msg;
148         struct port_list dports = {0, NULL, };
149         struct port_list *item = dp;
150         int cnt = 0;
151
152         msg = buf_msg(buf);
153
154         /* Create destination port list, if one wasn't supplied */
155
156         if (dp == NULL) {
157                 tipc_nametbl_mc_translate(msg_nametype(msg),
158                                      msg_namelower(msg),
159                                      msg_nameupper(msg),
160                                      TIPC_CLUSTER_SCOPE,
161                                      &dports);
162                 item = dp = &dports;
163         }
164
165         /* Deliver a copy of message to each destination port */
166
167         if (dp->count != 0) {
168                 msg_set_destnode(msg, tipc_own_addr);
169                 if (dp->count == 1) {
170                         msg_set_destport(msg, dp->ports[0]);
171                         tipc_port_recv_msg(buf);
172                         tipc_port_list_free(dp);
173                         return;
174                 }
175                 for (; cnt < dp->count; cnt++) {
176                         int index = cnt % PLSIZE;
177                         struct sk_buff *b = skb_clone(buf, GFP_ATOMIC);
178
179                         if (b == NULL) {
180                                 warn("Unable to deliver multicast message(s)\n");
181                                 goto exit;
182                         }
183                         if ((index == 0) && (cnt != 0))
184                                 item = item->next;
185                         msg_set_destport(buf_msg(b), item->ports[index]);
186                         tipc_port_recv_msg(b);
187                 }
188         }
189 exit:
190         buf_discard(buf);
191         tipc_port_list_free(dp);
192 }
193
194 /**
195  * tipc_createport_raw - create a generic TIPC port
196  *
197  * Returns pointer to (locked) TIPC port, or NULL if unable to create it
198  */
199
200 struct tipc_port *tipc_createport_raw(void *usr_handle,
201                         u32 (*dispatcher)(struct tipc_port *, struct sk_buff *),
202                         void (*wakeup)(struct tipc_port *),
203                         const u32 importance)
204 {
205         struct tipc_port *p_ptr;
206         struct tipc_msg *msg;
207         u32 ref;
208
209         p_ptr = kzalloc(sizeof(*p_ptr), GFP_ATOMIC);
210         if (!p_ptr) {
211                 warn("Port creation failed, no memory\n");
212                 return NULL;
213         }
214         ref = tipc_ref_acquire(p_ptr, &p_ptr->lock);
215         if (!ref) {
216                 warn("Port creation failed, reference table exhausted\n");
217                 kfree(p_ptr);
218                 return NULL;
219         }
220
221         p_ptr->usr_handle = usr_handle;
222         p_ptr->max_pkt = MAX_PKT_DEFAULT;
223         p_ptr->ref = ref;
224         msg = &p_ptr->phdr;
225         tipc_msg_init(msg, importance, TIPC_NAMED_MSG, NAMED_H_SIZE, 0);
226         msg_set_origport(msg, ref);
227         INIT_LIST_HEAD(&p_ptr->wait_list);
228         INIT_LIST_HEAD(&p_ptr->subscription.nodesub_list);
229         p_ptr->dispatcher = dispatcher;
230         p_ptr->wakeup = wakeup;
231         p_ptr->user_port = NULL;
232         k_init_timer(&p_ptr->timer, (Handler)port_timeout, ref);
233         spin_lock_bh(&tipc_port_list_lock);
234         INIT_LIST_HEAD(&p_ptr->publications);
235         INIT_LIST_HEAD(&p_ptr->port_list);
236         list_add_tail(&p_ptr->port_list, &ports);
237         spin_unlock_bh(&tipc_port_list_lock);
238         return p_ptr;
239 }
240
241 int tipc_deleteport(u32 ref)
242 {
243         struct tipc_port *p_ptr;
244         struct sk_buff *buf = NULL;
245
246         tipc_withdraw(ref, 0, NULL);
247         p_ptr = tipc_port_lock(ref);
248         if (!p_ptr)
249                 return -EINVAL;
250
251         tipc_ref_discard(ref);
252         tipc_port_unlock(p_ptr);
253
254         k_cancel_timer(&p_ptr->timer);
255         if (p_ptr->connected) {
256                 buf = port_build_peer_abort_msg(p_ptr, TIPC_ERR_NO_PORT);
257                 tipc_nodesub_unsubscribe(&p_ptr->subscription);
258         }
259         kfree(p_ptr->user_port);
260
261         spin_lock_bh(&tipc_port_list_lock);
262         list_del(&p_ptr->port_list);
263         list_del(&p_ptr->wait_list);
264         spin_unlock_bh(&tipc_port_list_lock);
265         k_term_timer(&p_ptr->timer);
266         kfree(p_ptr);
267         tipc_net_route_msg(buf);
268         return 0;
269 }
270
271 static int port_unreliable(struct tipc_port *p_ptr)
272 {
273         return msg_src_droppable(&p_ptr->phdr);
274 }
275
276 int tipc_portunreliable(u32 ref, unsigned int *isunreliable)
277 {
278         struct tipc_port *p_ptr;
279
280         p_ptr = tipc_port_lock(ref);
281         if (!p_ptr)
282                 return -EINVAL;
283         *isunreliable = port_unreliable(p_ptr);
284         tipc_port_unlock(p_ptr);
285         return 0;
286 }
287
288 int tipc_set_portunreliable(u32 ref, unsigned int isunreliable)
289 {
290         struct tipc_port *p_ptr;
291
292         p_ptr = tipc_port_lock(ref);
293         if (!p_ptr)
294                 return -EINVAL;
295         msg_set_src_droppable(&p_ptr->phdr, (isunreliable != 0));
296         tipc_port_unlock(p_ptr);
297         return 0;
298 }
299
300 static int port_unreturnable(struct tipc_port *p_ptr)
301 {
302         return msg_dest_droppable(&p_ptr->phdr);
303 }
304
305 int tipc_portunreturnable(u32 ref, unsigned int *isunrejectable)
306 {
307         struct tipc_port *p_ptr;
308
309         p_ptr = tipc_port_lock(ref);
310         if (!p_ptr)
311                 return -EINVAL;
312         *isunrejectable = port_unreturnable(p_ptr);
313         tipc_port_unlock(p_ptr);
314         return 0;
315 }
316
317 int tipc_set_portunreturnable(u32 ref, unsigned int isunrejectable)
318 {
319         struct tipc_port *p_ptr;
320
321         p_ptr = tipc_port_lock(ref);
322         if (!p_ptr)
323                 return -EINVAL;
324         msg_set_dest_droppable(&p_ptr->phdr, (isunrejectable != 0));
325         tipc_port_unlock(p_ptr);
326         return 0;
327 }
328
329 /*
330  * port_build_proto_msg(): build a port level protocol
331  * or a connection abortion message. Called with
332  * tipc_port lock on.
333  */
334 static struct sk_buff *port_build_proto_msg(u32 destport, u32 destnode,
335                                             u32 origport, u32 orignode,
336                                             u32 usr, u32 type, u32 err,
337                                             u32 ack)
338 {
339         struct sk_buff *buf;
340         struct tipc_msg *msg;
341
342         buf = tipc_buf_acquire(INT_H_SIZE);
343         if (buf) {
344                 msg = buf_msg(buf);
345                 tipc_msg_init(msg, usr, type, INT_H_SIZE, destnode);
346                 msg_set_errcode(msg, err);
347                 msg_set_destport(msg, destport);
348                 msg_set_origport(msg, origport);
349                 msg_set_orignode(msg, orignode);
350                 msg_set_msgcnt(msg, ack);
351         }
352         return buf;
353 }
354
355 int tipc_reject_msg(struct sk_buff *buf, u32 err)
356 {
357         struct tipc_msg *msg = buf_msg(buf);
358         struct sk_buff *rbuf;
359         struct tipc_msg *rmsg;
360         int hdr_sz;
361         u32 imp;
362         u32 data_sz = msg_data_sz(msg);
363         u32 src_node;
364         u32 rmsg_sz;
365
366         /* discard rejected message if it shouldn't be returned to sender */
367
368         if (WARN(!msg_isdata(msg),
369                  "attempt to reject message with user=%u", msg_user(msg))) {
370                 dump_stack();
371                 goto exit;
372         }
373         if (msg_errcode(msg) || msg_dest_droppable(msg))
374                 goto exit;
375
376         /*
377          * construct returned message by copying rejected message header and
378          * data (or subset), then updating header fields that need adjusting
379          */
380
381         hdr_sz = msg_hdr_sz(msg);
382         rmsg_sz = hdr_sz + min_t(u32, data_sz, MAX_REJECT_SIZE);
383
384         rbuf = tipc_buf_acquire(rmsg_sz);
385         if (rbuf == NULL)
386                 goto exit;
387
388         rmsg = buf_msg(rbuf);
389         skb_copy_to_linear_data(rbuf, msg, rmsg_sz);
390
391         if (msg_connected(rmsg)) {
392                 imp = msg_importance(rmsg);
393                 if (imp < TIPC_CRITICAL_IMPORTANCE)
394                         msg_set_importance(rmsg, ++imp);
395         }
396         msg_set_non_seq(rmsg, 0);
397         msg_set_size(rmsg, rmsg_sz);
398         msg_set_errcode(rmsg, err);
399         msg_set_prevnode(rmsg, tipc_own_addr);
400         msg_swap_words(rmsg, 4, 5);
401         if (!msg_short(rmsg))
402                 msg_swap_words(rmsg, 6, 7);
403
404         /* send self-abort message when rejecting on a connected port */
405         if (msg_connected(msg)) {
406                 struct sk_buff *abuf = NULL;
407                 struct tipc_port *p_ptr = tipc_port_lock(msg_destport(msg));
408
409                 if (p_ptr) {
410                         if (p_ptr->connected)
411                                 abuf = port_build_self_abort_msg(p_ptr, err);
412                         tipc_port_unlock(p_ptr);
413                 }
414                 tipc_net_route_msg(abuf);
415         }
416
417         /* send returned message & dispose of rejected message */
418
419         src_node = msg_prevnode(msg);
420         if (src_node == tipc_own_addr)
421                 tipc_port_recv_msg(rbuf);
422         else
423                 tipc_link_send(rbuf, src_node, msg_link_selector(rmsg));
424 exit:
425         buf_discard(buf);
426         return data_sz;
427 }
428
429 int tipc_port_reject_sections(struct tipc_port *p_ptr, struct tipc_msg *hdr,
430                               struct iovec const *msg_sect, u32 num_sect,
431                               unsigned int total_len, int err)
432 {
433         struct sk_buff *buf;
434         int res;
435
436         res = tipc_msg_build(hdr, msg_sect, num_sect, total_len, MAX_MSG_SIZE,
437                         !p_ptr->user_port, &buf);
438         if (!buf)
439                 return res;
440
441         return tipc_reject_msg(buf, err);
442 }
443
444 static void port_timeout(unsigned long ref)
445 {
446         struct tipc_port *p_ptr = tipc_port_lock(ref);
447         struct sk_buff *buf = NULL;
448
449         if (!p_ptr)
450                 return;
451
452         if (!p_ptr->connected) {
453                 tipc_port_unlock(p_ptr);
454                 return;
455         }
456
457         /* Last probe answered ? */
458         if (p_ptr->probing_state == PROBING) {
459                 buf = port_build_self_abort_msg(p_ptr, TIPC_ERR_NO_PORT);
460         } else {
461                 buf = port_build_proto_msg(port_peerport(p_ptr),
462                                            port_peernode(p_ptr),
463                                            p_ptr->ref,
464                                            tipc_own_addr,
465                                            CONN_MANAGER,
466                                            CONN_PROBE,
467                                            TIPC_OK,
468                                            0);
469                 p_ptr->probing_state = PROBING;
470                 k_start_timer(&p_ptr->timer, p_ptr->probing_interval);
471         }
472         tipc_port_unlock(p_ptr);
473         tipc_net_route_msg(buf);
474 }
475
476
477 static void port_handle_node_down(unsigned long ref)
478 {
479         struct tipc_port *p_ptr = tipc_port_lock(ref);
480         struct sk_buff *buf = NULL;
481
482         if (!p_ptr)
483                 return;
484         buf = port_build_self_abort_msg(p_ptr, TIPC_ERR_NO_NODE);
485         tipc_port_unlock(p_ptr);
486         tipc_net_route_msg(buf);
487 }
488
489
490 static struct sk_buff *port_build_self_abort_msg(struct tipc_port *p_ptr, u32 err)
491 {
492         struct sk_buff *buf = port_build_peer_abort_msg(p_ptr, err);
493
494         if (buf) {
495                 struct tipc_msg *msg = buf_msg(buf);
496                 msg_swap_words(msg, 4, 5);
497                 msg_swap_words(msg, 6, 7);
498         }
499         return buf;
500 }
501
502
503 static struct sk_buff *port_build_peer_abort_msg(struct tipc_port *p_ptr, u32 err)
504 {
505         struct sk_buff *buf;
506         struct tipc_msg *msg;
507         u32 imp;
508
509         if (!p_ptr->connected)
510                 return NULL;
511
512         buf = tipc_buf_acquire(BASIC_H_SIZE);
513         if (buf) {
514                 msg = buf_msg(buf);
515                 memcpy(msg, &p_ptr->phdr, BASIC_H_SIZE);
516                 msg_set_hdr_sz(msg, BASIC_H_SIZE);
517                 msg_set_size(msg, BASIC_H_SIZE);
518                 imp = msg_importance(msg);
519                 if (imp < TIPC_CRITICAL_IMPORTANCE)
520                         msg_set_importance(msg, ++imp);
521                 msg_set_errcode(msg, err);
522         }
523         return buf;
524 }
525
526 void tipc_port_recv_proto_msg(struct sk_buff *buf)
527 {
528         struct tipc_msg *msg = buf_msg(buf);
529         struct tipc_port *p_ptr;
530         struct sk_buff *r_buf = NULL;
531         u32 orignode = msg_orignode(msg);
532         u32 origport = msg_origport(msg);
533         u32 destport = msg_destport(msg);
534         int wakeable;
535
536         /* Validate connection */
537
538         p_ptr = tipc_port_lock(destport);
539         if (!p_ptr || !p_ptr->connected ||
540             (port_peernode(p_ptr) != orignode) ||
541             (port_peerport(p_ptr) != origport)) {
542                 r_buf = port_build_proto_msg(origport,
543                                              orignode,
544                                              destport,
545                                              tipc_own_addr,
546                                              TIPC_HIGH_IMPORTANCE,
547                                              TIPC_CONN_MSG,
548                                              TIPC_ERR_NO_PORT,
549                                              0);
550                 if (p_ptr)
551                         tipc_port_unlock(p_ptr);
552                 goto exit;
553         }
554
555         /* Process protocol message sent by peer */
556
557         switch (msg_type(msg)) {
558         case CONN_ACK:
559                 wakeable = tipc_port_congested(p_ptr) && p_ptr->congested &&
560                         p_ptr->wakeup;
561                 p_ptr->acked += msg_msgcnt(msg);
562                 if (!tipc_port_congested(p_ptr)) {
563                         p_ptr->congested = 0;
564                         if (wakeable)
565                                 p_ptr->wakeup(p_ptr);
566                 }
567                 break;
568         case CONN_PROBE:
569                 r_buf = port_build_proto_msg(origport,
570                                              orignode,
571                                              destport,
572                                              tipc_own_addr,
573                                              CONN_MANAGER,
574                                              CONN_PROBE_REPLY,
575                                              TIPC_OK,
576                                              0);
577                 break;
578         default:
579                 /* CONN_PROBE_REPLY or unrecognized - no action required */
580                 break;
581         }
582         p_ptr->probing_state = CONFIRMED;
583         tipc_port_unlock(p_ptr);
584 exit:
585         tipc_net_route_msg(r_buf);
586         buf_discard(buf);
587 }
588
589 static void port_print(struct tipc_port *p_ptr, struct print_buf *buf, int full_id)
590 {
591         struct publication *publ;
592
593         if (full_id)
594                 tipc_printf(buf, "<%u.%u.%u:%u>:",
595                             tipc_zone(tipc_own_addr), tipc_cluster(tipc_own_addr),
596                             tipc_node(tipc_own_addr), p_ptr->ref);
597         else
598                 tipc_printf(buf, "%-10u:", p_ptr->ref);
599
600         if (p_ptr->connected) {
601                 u32 dport = port_peerport(p_ptr);
602                 u32 destnode = port_peernode(p_ptr);
603
604                 tipc_printf(buf, " connected to <%u.%u.%u:%u>",
605                             tipc_zone(destnode), tipc_cluster(destnode),
606                             tipc_node(destnode), dport);
607                 if (p_ptr->conn_type != 0)
608                         tipc_printf(buf, " via {%u,%u}",
609                                     p_ptr->conn_type,
610                                     p_ptr->conn_instance);
611         } else if (p_ptr->published) {
612                 tipc_printf(buf, " bound to");
613                 list_for_each_entry(publ, &p_ptr->publications, pport_list) {
614                         if (publ->lower == publ->upper)
615                                 tipc_printf(buf, " {%u,%u}", publ->type,
616                                             publ->lower);
617                         else
618                                 tipc_printf(buf, " {%u,%u,%u}", publ->type,
619                                             publ->lower, publ->upper);
620                 }
621         }
622         tipc_printf(buf, "\n");
623 }
624
625 #define MAX_PORT_QUERY 32768
626
627 struct sk_buff *tipc_port_get_ports(void)
628 {
629         struct sk_buff *buf;
630         struct tlv_desc *rep_tlv;
631         struct print_buf pb;
632         struct tipc_port *p_ptr;
633         int str_len;
634
635         buf = tipc_cfg_reply_alloc(TLV_SPACE(MAX_PORT_QUERY));
636         if (!buf)
637                 return NULL;
638         rep_tlv = (struct tlv_desc *)buf->data;
639
640         tipc_printbuf_init(&pb, TLV_DATA(rep_tlv), MAX_PORT_QUERY);
641         spin_lock_bh(&tipc_port_list_lock);
642         list_for_each_entry(p_ptr, &ports, port_list) {
643                 spin_lock_bh(p_ptr->lock);
644                 port_print(p_ptr, &pb, 0);
645                 spin_unlock_bh(p_ptr->lock);
646         }
647         spin_unlock_bh(&tipc_port_list_lock);
648         str_len = tipc_printbuf_validate(&pb);
649
650         skb_put(buf, TLV_SPACE(str_len));
651         TLV_SET(rep_tlv, TIPC_TLV_ULTRA_STRING, NULL, str_len);
652
653         return buf;
654 }
655
656 void tipc_port_reinit(void)
657 {
658         struct tipc_port *p_ptr;
659         struct tipc_msg *msg;
660
661         spin_lock_bh(&tipc_port_list_lock);
662         list_for_each_entry(p_ptr, &ports, port_list) {
663                 msg = &p_ptr->phdr;
664                 if (msg_orignode(msg) == tipc_own_addr)
665                         break;
666                 msg_set_prevnode(msg, tipc_own_addr);
667                 msg_set_orignode(msg, tipc_own_addr);
668         }
669         spin_unlock_bh(&tipc_port_list_lock);
670 }
671
672
673 /*
674  *  port_dispatcher_sigh(): Signal handler for messages destinated
675  *                          to the tipc_port interface.
676  */
677
678 static void port_dispatcher_sigh(void *dummy)
679 {
680         struct sk_buff *buf;
681
682         spin_lock_bh(&queue_lock);
683         buf = msg_queue_head;
684         msg_queue_head = NULL;
685         spin_unlock_bh(&queue_lock);
686
687         while (buf) {
688                 struct tipc_port *p_ptr;
689                 struct user_port *up_ptr;
690                 struct tipc_portid orig;
691                 struct tipc_name_seq dseq;
692                 void *usr_handle;
693                 int connected;
694                 int published;
695                 u32 message_type;
696
697                 struct sk_buff *next = buf->next;
698                 struct tipc_msg *msg = buf_msg(buf);
699                 u32 dref = msg_destport(msg);
700
701                 message_type = msg_type(msg);
702                 if (message_type > TIPC_DIRECT_MSG)
703                         goto reject;    /* Unsupported message type */
704
705                 p_ptr = tipc_port_lock(dref);
706                 if (!p_ptr)
707                         goto reject;    /* Port deleted while msg in queue */
708
709                 orig.ref = msg_origport(msg);
710                 orig.node = msg_orignode(msg);
711                 up_ptr = p_ptr->user_port;
712                 usr_handle = up_ptr->usr_handle;
713                 connected = p_ptr->connected;
714                 published = p_ptr->published;
715
716                 if (unlikely(msg_errcode(msg)))
717                         goto err;
718
719                 switch (message_type) {
720
721                 case TIPC_CONN_MSG:{
722                                 tipc_conn_msg_event cb = up_ptr->conn_msg_cb;
723                                 u32 peer_port = port_peerport(p_ptr);
724                                 u32 peer_node = port_peernode(p_ptr);
725                                 u32 dsz;
726
727                                 tipc_port_unlock(p_ptr);
728                                 if (unlikely(!cb))
729                                         goto reject;
730                                 if (unlikely(!connected)) {
731                                         if (tipc_connect2port(dref, &orig))
732                                                 goto reject;
733                                 } else if ((msg_origport(msg) != peer_port) ||
734                                            (msg_orignode(msg) != peer_node))
735                                         goto reject;
736                                 dsz = msg_data_sz(msg);
737                                 if (unlikely(dsz &&
738                                              (++p_ptr->conn_unacked >=
739                                               TIPC_FLOW_CONTROL_WIN)))
740                                         tipc_acknowledge(dref,
741                                                          p_ptr->conn_unacked);
742                                 skb_pull(buf, msg_hdr_sz(msg));
743                                 cb(usr_handle, dref, &buf, msg_data(msg), dsz);
744                                 break;
745                         }
746                 case TIPC_DIRECT_MSG:{
747                                 tipc_msg_event cb = up_ptr->msg_cb;
748
749                                 tipc_port_unlock(p_ptr);
750                                 if (unlikely(!cb || connected))
751                                         goto reject;
752                                 skb_pull(buf, msg_hdr_sz(msg));
753                                 cb(usr_handle, dref, &buf, msg_data(msg),
754                                    msg_data_sz(msg), msg_importance(msg),
755                                    &orig);
756                                 break;
757                         }
758                 case TIPC_MCAST_MSG:
759                 case TIPC_NAMED_MSG:{
760                                 tipc_named_msg_event cb = up_ptr->named_msg_cb;
761
762                                 tipc_port_unlock(p_ptr);
763                                 if (unlikely(!cb || connected || !published))
764                                         goto reject;
765                                 dseq.type =  msg_nametype(msg);
766                                 dseq.lower = msg_nameinst(msg);
767                                 dseq.upper = (message_type == TIPC_NAMED_MSG)
768                                         ? dseq.lower : msg_nameupper(msg);
769                                 skb_pull(buf, msg_hdr_sz(msg));
770                                 cb(usr_handle, dref, &buf, msg_data(msg),
771                                    msg_data_sz(msg), msg_importance(msg),
772                                    &orig, &dseq);
773                                 break;
774                         }
775                 }
776                 if (buf)
777                         buf_discard(buf);
778                 buf = next;
779                 continue;
780 err:
781                 switch (message_type) {
782
783                 case TIPC_CONN_MSG:{
784                                 tipc_conn_shutdown_event cb =
785                                         up_ptr->conn_err_cb;
786                                 u32 peer_port = port_peerport(p_ptr);
787                                 u32 peer_node = port_peernode(p_ptr);
788
789                                 tipc_port_unlock(p_ptr);
790                                 if (!cb || !connected)
791                                         break;
792                                 if ((msg_origport(msg) != peer_port) ||
793                                     (msg_orignode(msg) != peer_node))
794                                         break;
795                                 tipc_disconnect(dref);
796                                 skb_pull(buf, msg_hdr_sz(msg));
797                                 cb(usr_handle, dref, &buf, msg_data(msg),
798                                    msg_data_sz(msg), msg_errcode(msg));
799                                 break;
800                         }
801                 case TIPC_DIRECT_MSG:{
802                                 tipc_msg_err_event cb = up_ptr->err_cb;
803
804                                 tipc_port_unlock(p_ptr);
805                                 if (!cb || connected)
806                                         break;
807                                 skb_pull(buf, msg_hdr_sz(msg));
808                                 cb(usr_handle, dref, &buf, msg_data(msg),
809                                    msg_data_sz(msg), msg_errcode(msg), &orig);
810                                 break;
811                         }
812                 case TIPC_MCAST_MSG:
813                 case TIPC_NAMED_MSG:{
814                                 tipc_named_msg_err_event cb =
815                                         up_ptr->named_err_cb;
816
817                                 tipc_port_unlock(p_ptr);
818                                 if (!cb || connected)
819                                         break;
820                                 dseq.type =  msg_nametype(msg);
821                                 dseq.lower = msg_nameinst(msg);
822                                 dseq.upper = (message_type == TIPC_NAMED_MSG)
823                                         ? dseq.lower : msg_nameupper(msg);
824                                 skb_pull(buf, msg_hdr_sz(msg));
825                                 cb(usr_handle, dref, &buf, msg_data(msg),
826                                    msg_data_sz(msg), msg_errcode(msg), &dseq);
827                                 break;
828                         }
829                 }
830                 if (buf)
831                         buf_discard(buf);
832                 buf = next;
833                 continue;
834 reject:
835                 tipc_reject_msg(buf, TIPC_ERR_NO_PORT);
836                 buf = next;
837         }
838 }
839
840 /*
841  *  port_dispatcher(): Dispatcher for messages destinated
842  *  to the tipc_port interface. Called with port locked.
843  */
844
845 static u32 port_dispatcher(struct tipc_port *dummy, struct sk_buff *buf)
846 {
847         buf->next = NULL;
848         spin_lock_bh(&queue_lock);
849         if (msg_queue_head) {
850                 msg_queue_tail->next = buf;
851                 msg_queue_tail = buf;
852         } else {
853                 msg_queue_tail = msg_queue_head = buf;
854                 tipc_k_signal((Handler)port_dispatcher_sigh, 0);
855         }
856         spin_unlock_bh(&queue_lock);
857         return 0;
858 }
859
860 /*
861  * Wake up port after congestion: Called with port locked,
862  *
863  */
864
865 static void port_wakeup_sh(unsigned long ref)
866 {
867         struct tipc_port *p_ptr;
868         struct user_port *up_ptr;
869         tipc_continue_event cb = NULL;
870         void *uh = NULL;
871
872         p_ptr = tipc_port_lock(ref);
873         if (p_ptr) {
874                 up_ptr = p_ptr->user_port;
875                 if (up_ptr) {
876                         cb = up_ptr->continue_event_cb;
877                         uh = up_ptr->usr_handle;
878                 }
879                 tipc_port_unlock(p_ptr);
880         }
881         if (cb)
882                 cb(uh, ref);
883 }
884
885
886 static void port_wakeup(struct tipc_port *p_ptr)
887 {
888         tipc_k_signal((Handler)port_wakeup_sh, p_ptr->ref);
889 }
890
891 void tipc_acknowledge(u32 ref, u32 ack)
892 {
893         struct tipc_port *p_ptr;
894         struct sk_buff *buf = NULL;
895
896         p_ptr = tipc_port_lock(ref);
897         if (!p_ptr)
898                 return;
899         if (p_ptr->connected) {
900                 p_ptr->conn_unacked -= ack;
901                 buf = port_build_proto_msg(port_peerport(p_ptr),
902                                            port_peernode(p_ptr),
903                                            ref,
904                                            tipc_own_addr,
905                                            CONN_MANAGER,
906                                            CONN_ACK,
907                                            TIPC_OK,
908                                            ack);
909         }
910         tipc_port_unlock(p_ptr);
911         tipc_net_route_msg(buf);
912 }
913
914 /*
915  * tipc_createport(): user level call.
916  */
917
918 int tipc_createport(void *usr_handle,
919                     unsigned int importance,
920                     tipc_msg_err_event error_cb,
921                     tipc_named_msg_err_event named_error_cb,
922                     tipc_conn_shutdown_event conn_error_cb,
923                     tipc_msg_event msg_cb,
924                     tipc_named_msg_event named_msg_cb,
925                     tipc_conn_msg_event conn_msg_cb,
926                     tipc_continue_event continue_event_cb,/* May be zero */
927                     u32 *portref)
928 {
929         struct user_port *up_ptr;
930         struct tipc_port *p_ptr;
931
932         up_ptr = kmalloc(sizeof(*up_ptr), GFP_ATOMIC);
933         if (!up_ptr) {
934                 warn("Port creation failed, no memory\n");
935                 return -ENOMEM;
936         }
937         p_ptr = (struct tipc_port *)tipc_createport_raw(NULL, port_dispatcher,
938                                                    port_wakeup, importance);
939         if (!p_ptr) {
940                 kfree(up_ptr);
941                 return -ENOMEM;
942         }
943
944         p_ptr->user_port = up_ptr;
945         up_ptr->usr_handle = usr_handle;
946         up_ptr->ref = p_ptr->ref;
947         up_ptr->err_cb = error_cb;
948         up_ptr->named_err_cb = named_error_cb;
949         up_ptr->conn_err_cb = conn_error_cb;
950         up_ptr->msg_cb = msg_cb;
951         up_ptr->named_msg_cb = named_msg_cb;
952         up_ptr->conn_msg_cb = conn_msg_cb;
953         up_ptr->continue_event_cb = continue_event_cb;
954         *portref = p_ptr->ref;
955         tipc_port_unlock(p_ptr);
956         return 0;
957 }
958
959 int tipc_portimportance(u32 ref, unsigned int *importance)
960 {
961         struct tipc_port *p_ptr;
962
963         p_ptr = tipc_port_lock(ref);
964         if (!p_ptr)
965                 return -EINVAL;
966         *importance = (unsigned int)msg_importance(&p_ptr->phdr);
967         tipc_port_unlock(p_ptr);
968         return 0;
969 }
970
971 int tipc_set_portimportance(u32 ref, unsigned int imp)
972 {
973         struct tipc_port *p_ptr;
974
975         if (imp > TIPC_CRITICAL_IMPORTANCE)
976                 return -EINVAL;
977
978         p_ptr = tipc_port_lock(ref);
979         if (!p_ptr)
980                 return -EINVAL;
981         msg_set_importance(&p_ptr->phdr, (u32)imp);
982         tipc_port_unlock(p_ptr);
983         return 0;
984 }
985
986
987 int tipc_publish(u32 ref, unsigned int scope, struct tipc_name_seq const *seq)
988 {
989         struct tipc_port *p_ptr;
990         struct publication *publ;
991         u32 key;
992         int res = -EINVAL;
993
994         p_ptr = tipc_port_lock(ref);
995         if (!p_ptr)
996                 return -EINVAL;
997
998         if (p_ptr->connected)
999                 goto exit;
1000         if (seq->lower > seq->upper)
1001                 goto exit;
1002         if ((scope < TIPC_ZONE_SCOPE) || (scope > TIPC_NODE_SCOPE))
1003                 goto exit;
1004         key = ref + p_ptr->pub_count + 1;
1005         if (key == ref) {
1006                 res = -EADDRINUSE;
1007                 goto exit;
1008         }
1009         publ = tipc_nametbl_publish(seq->type, seq->lower, seq->upper,
1010                                     scope, p_ptr->ref, key);
1011         if (publ) {
1012                 list_add(&publ->pport_list, &p_ptr->publications);
1013                 p_ptr->pub_count++;
1014                 p_ptr->published = 1;
1015                 res = 0;
1016         }
1017 exit:
1018         tipc_port_unlock(p_ptr);
1019         return res;
1020 }
1021
1022 int tipc_withdraw(u32 ref, unsigned int scope, struct tipc_name_seq const *seq)
1023 {
1024         struct tipc_port *p_ptr;
1025         struct publication *publ;
1026         struct publication *tpubl;
1027         int res = -EINVAL;
1028
1029         p_ptr = tipc_port_lock(ref);
1030         if (!p_ptr)
1031                 return -EINVAL;
1032         if (!seq) {
1033                 list_for_each_entry_safe(publ, tpubl,
1034                                          &p_ptr->publications, pport_list) {
1035                         tipc_nametbl_withdraw(publ->type, publ->lower,
1036                                               publ->ref, publ->key);
1037                 }
1038                 res = 0;
1039         } else {
1040                 list_for_each_entry_safe(publ, tpubl,
1041                                          &p_ptr->publications, pport_list) {
1042                         if (publ->scope != scope)
1043                                 continue;
1044                         if (publ->type != seq->type)
1045                                 continue;
1046                         if (publ->lower != seq->lower)
1047                                 continue;
1048                         if (publ->upper != seq->upper)
1049                                 break;
1050                         tipc_nametbl_withdraw(publ->type, publ->lower,
1051                                               publ->ref, publ->key);
1052                         res = 0;
1053                         break;
1054                 }
1055         }
1056         if (list_empty(&p_ptr->publications))
1057                 p_ptr->published = 0;
1058         tipc_port_unlock(p_ptr);
1059         return res;
1060 }
1061
1062 int tipc_connect2port(u32 ref, struct tipc_portid const *peer)
1063 {
1064         struct tipc_port *p_ptr;
1065         struct tipc_msg *msg;
1066         int res = -EINVAL;
1067
1068         p_ptr = tipc_port_lock(ref);
1069         if (!p_ptr)
1070                 return -EINVAL;
1071         if (p_ptr->published || p_ptr->connected)
1072                 goto exit;
1073         if (!peer->ref)
1074                 goto exit;
1075
1076         msg = &p_ptr->phdr;
1077         msg_set_destnode(msg, peer->node);
1078         msg_set_destport(msg, peer->ref);
1079         msg_set_orignode(msg, tipc_own_addr);
1080         msg_set_origport(msg, p_ptr->ref);
1081         msg_set_type(msg, TIPC_CONN_MSG);
1082         msg_set_lookup_scope(msg, 0);
1083         msg_set_hdr_sz(msg, SHORT_H_SIZE);
1084
1085         p_ptr->probing_interval = PROBING_INTERVAL;
1086         p_ptr->probing_state = CONFIRMED;
1087         p_ptr->connected = 1;
1088         k_start_timer(&p_ptr->timer, p_ptr->probing_interval);
1089
1090         tipc_nodesub_subscribe(&p_ptr->subscription, peer->node,
1091                           (void *)(unsigned long)ref,
1092                           (net_ev_handler)port_handle_node_down);
1093         res = 0;
1094 exit:
1095         tipc_port_unlock(p_ptr);
1096         p_ptr->max_pkt = tipc_link_get_max_pkt(peer->node, ref);
1097         return res;
1098 }
1099
1100 /**
1101  * tipc_disconnect_port - disconnect port from peer
1102  *
1103  * Port must be locked.
1104  */
1105
1106 int tipc_disconnect_port(struct tipc_port *tp_ptr)
1107 {
1108         int res;
1109
1110         if (tp_ptr->connected) {
1111                 tp_ptr->connected = 0;
1112                 /* let timer expire on it's own to avoid deadlock! */
1113                 tipc_nodesub_unsubscribe(
1114                         &((struct tipc_port *)tp_ptr)->subscription);
1115                 res = 0;
1116         } else {
1117                 res = -ENOTCONN;
1118         }
1119         return res;
1120 }
1121
1122 /*
1123  * tipc_disconnect(): Disconnect port form peer.
1124  *                    This is a node local operation.
1125  */
1126
1127 int tipc_disconnect(u32 ref)
1128 {
1129         struct tipc_port *p_ptr;
1130         int res;
1131
1132         p_ptr = tipc_port_lock(ref);
1133         if (!p_ptr)
1134                 return -EINVAL;
1135         res = tipc_disconnect_port((struct tipc_port *)p_ptr);
1136         tipc_port_unlock(p_ptr);
1137         return res;
1138 }
1139
1140 /*
1141  * tipc_shutdown(): Send a SHUTDOWN msg to peer and disconnect
1142  */
1143 int tipc_shutdown(u32 ref)
1144 {
1145         struct tipc_port *p_ptr;
1146         struct sk_buff *buf = NULL;
1147
1148         p_ptr = tipc_port_lock(ref);
1149         if (!p_ptr)
1150                 return -EINVAL;
1151
1152         buf = port_build_peer_abort_msg(p_ptr, TIPC_CONN_SHUTDOWN);
1153         tipc_port_unlock(p_ptr);
1154         tipc_net_route_msg(buf);
1155         return tipc_disconnect(ref);
1156 }
1157
1158 /*
1159  *  tipc_port_recv_sections(): Concatenate and deliver sectioned
1160  *                        message for this node.
1161  */
1162
1163 static int tipc_port_recv_sections(struct tipc_port *sender, unsigned int num_sect,
1164                                    struct iovec const *msg_sect,
1165                                    unsigned int total_len)
1166 {
1167         struct sk_buff *buf;
1168         int res;
1169
1170         res = tipc_msg_build(&sender->phdr, msg_sect, num_sect, total_len,
1171                         MAX_MSG_SIZE, !sender->user_port, &buf);
1172         if (likely(buf))
1173                 tipc_port_recv_msg(buf);
1174         return res;
1175 }
1176
1177 /**
1178  * tipc_send - send message sections on connection
1179  */
1180
1181 int tipc_send(u32 ref, unsigned int num_sect, struct iovec const *msg_sect,
1182               unsigned int total_len)
1183 {
1184         struct tipc_port *p_ptr;
1185         u32 destnode;
1186         int res;
1187
1188         p_ptr = tipc_port_deref(ref);
1189         if (!p_ptr || !p_ptr->connected)
1190                 return -EINVAL;
1191
1192         p_ptr->congested = 1;
1193         if (!tipc_port_congested(p_ptr)) {
1194                 destnode = port_peernode(p_ptr);
1195                 if (likely(destnode != tipc_own_addr))
1196                         res = tipc_link_send_sections_fast(p_ptr, msg_sect, num_sect,
1197                                                            total_len, destnode);
1198                 else
1199                         res = tipc_port_recv_sections(p_ptr, num_sect, msg_sect,
1200                                                       total_len);
1201
1202                 if (likely(res != -ELINKCONG)) {
1203                         p_ptr->congested = 0;
1204                         if (res > 0)
1205                                 p_ptr->sent++;
1206                         return res;
1207                 }
1208         }
1209         if (port_unreliable(p_ptr)) {
1210                 p_ptr->congested = 0;
1211                 return total_len;
1212         }
1213         return -ELINKCONG;
1214 }
1215
1216 /**
1217  * tipc_send2name - send message sections to port name
1218  */
1219
1220 int tipc_send2name(u32 ref, struct tipc_name const *name, unsigned int domain,
1221                    unsigned int num_sect, struct iovec const *msg_sect,
1222                    unsigned int total_len)
1223 {
1224         struct tipc_port *p_ptr;
1225         struct tipc_msg *msg;
1226         u32 destnode = domain;
1227         u32 destport;
1228         int res;
1229
1230         p_ptr = tipc_port_deref(ref);
1231         if (!p_ptr || p_ptr->connected)
1232                 return -EINVAL;
1233
1234         msg = &p_ptr->phdr;
1235         msg_set_type(msg, TIPC_NAMED_MSG);
1236         msg_set_orignode(msg, tipc_own_addr);
1237         msg_set_origport(msg, ref);
1238         msg_set_hdr_sz(msg, NAMED_H_SIZE);
1239         msg_set_nametype(msg, name->type);
1240         msg_set_nameinst(msg, name->instance);
1241         msg_set_lookup_scope(msg, tipc_addr_scope(domain));
1242         destport = tipc_nametbl_translate(name->type, name->instance, &destnode);
1243         msg_set_destnode(msg, destnode);
1244         msg_set_destport(msg, destport);
1245
1246         if (likely(destport)) {
1247                 if (likely(destnode == tipc_own_addr))
1248                         res = tipc_port_recv_sections(p_ptr, num_sect,
1249                                                       msg_sect, total_len);
1250                 else
1251                         res = tipc_link_send_sections_fast(p_ptr, msg_sect,
1252                                                            num_sect, total_len,
1253                                                            destnode);
1254                 if (likely(res != -ELINKCONG)) {
1255                         if (res > 0)
1256                                 p_ptr->sent++;
1257                         return res;
1258                 }
1259                 if (port_unreliable(p_ptr)) {
1260                         return total_len;
1261                 }
1262                 return -ELINKCONG;
1263         }
1264         return tipc_port_reject_sections(p_ptr, msg, msg_sect, num_sect,
1265                                          total_len, TIPC_ERR_NO_NAME);
1266 }
1267
1268 /**
1269  * tipc_send2port - send message sections to port identity
1270  */
1271
1272 int tipc_send2port(u32 ref, struct tipc_portid const *dest,
1273                    unsigned int num_sect, struct iovec const *msg_sect,
1274                    unsigned int total_len)
1275 {
1276         struct tipc_port *p_ptr;
1277         struct tipc_msg *msg;
1278         int res;
1279
1280         p_ptr = tipc_port_deref(ref);
1281         if (!p_ptr || p_ptr->connected)
1282                 return -EINVAL;
1283
1284         msg = &p_ptr->phdr;
1285         msg_set_type(msg, TIPC_DIRECT_MSG);
1286         msg_set_lookup_scope(msg, 0);
1287         msg_set_orignode(msg, tipc_own_addr);
1288         msg_set_origport(msg, ref);
1289         msg_set_destnode(msg, dest->node);
1290         msg_set_destport(msg, dest->ref);
1291         msg_set_hdr_sz(msg, BASIC_H_SIZE);
1292
1293         if (dest->node == tipc_own_addr)
1294                 res =  tipc_port_recv_sections(p_ptr, num_sect, msg_sect,
1295                                                total_len);
1296         else
1297                 res = tipc_link_send_sections_fast(p_ptr, msg_sect, num_sect,
1298                                                    total_len, dest->node);
1299         if (likely(res != -ELINKCONG)) {
1300                 if (res > 0)
1301                         p_ptr->sent++;
1302                 return res;
1303         }
1304         if (port_unreliable(p_ptr)) {
1305                 return total_len;
1306         }
1307         return -ELINKCONG;
1308 }
1309
1310 /**
1311  * tipc_send_buf2port - send message buffer to port identity
1312  */
1313
1314 int tipc_send_buf2port(u32 ref, struct tipc_portid const *dest,
1315                struct sk_buff *buf, unsigned int dsz)
1316 {
1317         struct tipc_port *p_ptr;
1318         struct tipc_msg *msg;
1319         int res;
1320
1321         p_ptr = (struct tipc_port *)tipc_ref_deref(ref);
1322         if (!p_ptr || p_ptr->connected)
1323                 return -EINVAL;
1324
1325         msg = &p_ptr->phdr;
1326         msg_set_type(msg, TIPC_DIRECT_MSG);
1327         msg_set_orignode(msg, tipc_own_addr);
1328         msg_set_origport(msg, ref);
1329         msg_set_destnode(msg, dest->node);
1330         msg_set_destport(msg, dest->ref);
1331         msg_set_hdr_sz(msg, BASIC_H_SIZE);
1332         msg_set_size(msg, BASIC_H_SIZE + dsz);
1333         if (skb_cow(buf, BASIC_H_SIZE))
1334                 return -ENOMEM;
1335
1336         skb_push(buf, BASIC_H_SIZE);
1337         skb_copy_to_linear_data(buf, msg, BASIC_H_SIZE);
1338
1339         if (dest->node == tipc_own_addr)
1340                 res = tipc_port_recv_msg(buf);
1341         else
1342                 res = tipc_send_buf_fast(buf, dest->node);
1343         if (likely(res != -ELINKCONG)) {
1344                 if (res > 0)
1345                         p_ptr->sent++;
1346                 return res;
1347         }
1348         if (port_unreliable(p_ptr))
1349                 return dsz;
1350         return -ELINKCONG;
1351 }
1352