e69e8b42a190a2aae6fca252379f4566402226f5
[linux-3.10.git] / net / tipc / port.c
1 /*
2  * net/tipc/port.c: TIPC port code
3  * 
4  * Copyright (c) 2003-2005, Ericsson Research Canada
5  * Copyright (c) 2004-2005, Wind River Systems
6  * Copyright (c) 2005-2006, Ericsson AB
7  * All rights reserved.
8  *
9  * Redistribution and use in source and binary forms, with or without
10  * modification, are permitted provided that the following conditions are met:
11  *
12  * 1. Redistributions of source code must retain the above copyright
13  *    notice, this list of conditions and the following disclaimer.
14  * 2. Redistributions in binary form must reproduce the above copyright
15  *    notice, this list of conditions and the following disclaimer in the
16  *    documentation and/or other materials provided with the distribution.
17  * 3. Neither the names of the copyright holders nor the names of its
18  *    contributors may be used to endorse or promote products derived from
19  *    this software without specific prior written permission.
20  *
21  * Alternatively, this software may be distributed under the terms of the
22  * GNU General Public License ("GPL") version 2 as published by the Free
23  * Software Foundation.
24  *
25  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
26  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
27  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
28  * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
29  * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
30  * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
31  * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
32  * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
33  * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
34  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
35  * POSSIBILITY OF SUCH DAMAGE.
36  */
37
38 #include "core.h"
39 #include "config.h"
40 #include "dbg.h"
41 #include "port.h"
42 #include "addr.h"
43 #include "link.h"
44 #include "node.h"
45 #include "port.h"
46 #include "name_table.h"
47 #include "user_reg.h"
48 #include "msg.h"
49 #include "bcast.h"
50
51 /* Connection management: */
52 #define PROBING_INTERVAL 3600000        /* [ms] => 1 h */
53 #define CONFIRMED 0
54 #define PROBING 1
55
56 #define MAX_REJECT_SIZE 1024
57
58 static struct sk_buff *msg_queue_head = 0;
59 static struct sk_buff *msg_queue_tail = 0;
60
61 spinlock_t port_list_lock = SPIN_LOCK_UNLOCKED;
62 static spinlock_t queue_lock = SPIN_LOCK_UNLOCKED;
63
64 LIST_HEAD(ports);
65 static void port_handle_node_down(unsigned long ref);
66 static struct sk_buff* port_build_self_abort_msg(struct port *,u32 err);
67 static struct sk_buff* port_build_peer_abort_msg(struct port *,u32 err);
68 static void port_timeout(unsigned long ref);
69
70
71 static inline u32 port_peernode(struct port *p_ptr)
72 {
73         return msg_destnode(&p_ptr->publ.phdr);
74 }
75
76 static inline u32 port_peerport(struct port *p_ptr)
77 {
78         return msg_destport(&p_ptr->publ.phdr);
79 }
80
81 static inline u32 port_out_seqno(struct port *p_ptr)
82 {
83         return msg_transp_seqno(&p_ptr->publ.phdr);
84 }
85
86 static inline void port_set_out_seqno(struct port *p_ptr, u32 seqno) 
87 {
88         msg_set_transp_seqno(&p_ptr->publ.phdr,seqno);
89 }
90
91 static inline void port_incr_out_seqno(struct port *p_ptr)
92 {
93         struct tipc_msg *m = &p_ptr->publ.phdr;
94
95         if (likely(!msg_routed(m)))
96                 return;
97         msg_set_transp_seqno(m, (msg_transp_seqno(m) + 1));
98 }
99
100 /**
101  * tipc_multicast - send a multicast message to local and remote destinations
102  */
103
104 int tipc_multicast(u32 ref, struct tipc_name_seq const *seq, u32 domain,
105                    u32 num_sect, struct iovec const *msg_sect)
106 {
107         struct tipc_msg *hdr;
108         struct sk_buff *buf;
109         struct sk_buff *ibuf = NULL;
110         struct port_list dports = {0, NULL, };
111         struct port *oport = port_deref(ref);
112         int ext_targets;
113         int res;
114
115         if (unlikely(!oport))
116                 return -EINVAL;
117
118         /* Create multicast message */
119
120         hdr = &oport->publ.phdr;
121         msg_set_type(hdr, TIPC_MCAST_MSG);
122         msg_set_nametype(hdr, seq->type);
123         msg_set_namelower(hdr, seq->lower);
124         msg_set_nameupper(hdr, seq->upper);
125         msg_set_hdr_sz(hdr, MCAST_H_SIZE);
126         res = msg_build(hdr, msg_sect, num_sect, MAX_MSG_SIZE,
127                         !oport->user_port, &buf);
128         if (unlikely(!buf))
129                 return res;
130
131         /* Figure out where to send multicast message */
132
133         ext_targets = nametbl_mc_translate(seq->type, seq->lower, seq->upper,
134                                            TIPC_NODE_SCOPE, &dports);
135         
136         /* Send message to destinations (duplicate it only if necessary) */ 
137
138         if (ext_targets) {
139                 if (dports.count != 0) {
140                         ibuf = skb_copy(buf, GFP_ATOMIC);
141                         if (ibuf == NULL) {
142                                 port_list_free(&dports);
143                                 buf_discard(buf);
144                                 return -ENOMEM;
145                         }
146                 }
147                 res = bclink_send_msg(buf);
148                 if ((res < 0) && (dports.count != 0)) {
149                         buf_discard(ibuf);
150                 }
151         } else {
152                 ibuf = buf;
153         }
154
155         if (res >= 0) {
156                 if (ibuf)
157                         port_recv_mcast(ibuf, &dports);
158         } else {
159                 port_list_free(&dports);
160         }
161         return res;
162 }
163
164 /**
165  * port_recv_mcast - deliver multicast message to all destination ports
166  * 
167  * If there is no port list, perform a lookup to create one
168  */
169
170 void port_recv_mcast(struct sk_buff *buf, struct port_list *dp)
171 {
172         struct tipc_msg* msg;
173         struct port_list dports = {0, NULL, };
174         struct port_list *item = dp;
175         int cnt = 0;
176
177         assert(buf);
178         msg = buf_msg(buf);
179
180         /* Create destination port list, if one wasn't supplied */
181
182         if (dp == NULL) {
183                 nametbl_mc_translate(msg_nametype(msg),
184                                      msg_namelower(msg),
185                                      msg_nameupper(msg),
186                                      TIPC_CLUSTER_SCOPE,
187                                      &dports);
188                 item = dp = &dports;
189         }
190
191         /* Deliver a copy of message to each destination port */
192
193         if (dp->count != 0) {
194                 if (dp->count == 1) {
195                         msg_set_destport(msg, dp->ports[0]);
196                         port_recv_msg(buf);
197                         port_list_free(dp);
198                         return;
199                 }
200                 for (; cnt < dp->count; cnt++) {
201                         int index = cnt % PLSIZE;
202                         struct sk_buff *b = skb_clone(buf, GFP_ATOMIC);
203
204                         if (b == NULL) {
205                                 warn("Buffer allocation failure\n");
206                                 msg_dbg(msg, "LOST:");
207                                 goto exit;
208                         }
209                         if ((index == 0) && (cnt != 0)) {
210                                 item = item->next;
211                         }
212                         msg_set_destport(buf_msg(b),item->ports[index]);
213                         port_recv_msg(b);
214                 }
215         }
216 exit:
217         buf_discard(buf);
218         port_list_free(dp);
219 }
220
221 /**
222  * tipc_createport_raw - create a native TIPC port
223  * 
224  * Returns local port reference
225  */
226
227 u32 tipc_createport_raw(void *usr_handle,
228                         u32 (*dispatcher)(struct tipc_port *, struct sk_buff *),
229                         void (*wakeup)(struct tipc_port *),
230                         const u32 importance)
231 {
232         struct port *p_ptr;
233         struct tipc_msg *msg;
234         u32 ref;
235
236         p_ptr = kmalloc(sizeof(*p_ptr), GFP_ATOMIC);
237         if (p_ptr == NULL) {
238                 warn("Memory squeeze; failed to create port\n");
239                 return 0;
240         }
241         memset(p_ptr, 0, sizeof(*p_ptr));
242         ref = ref_acquire(p_ptr, &p_ptr->publ.lock);
243         if (!ref) {
244                 warn("Reference Table Exhausted\n");
245                 kfree(p_ptr);
246                 return 0;
247         }
248
249         port_lock(ref);
250         p_ptr->publ.ref = ref;
251         msg = &p_ptr->publ.phdr;
252         msg_init(msg, DATA_LOW, TIPC_NAMED_MSG, TIPC_OK, LONG_H_SIZE, 0);
253         msg_set_orignode(msg, tipc_own_addr);
254         msg_set_prevnode(msg, tipc_own_addr);
255         msg_set_origport(msg, ref);
256         msg_set_importance(msg,importance);
257         p_ptr->last_in_seqno = 41;
258         p_ptr->sent = 1;
259         p_ptr->publ.usr_handle = usr_handle;
260         INIT_LIST_HEAD(&p_ptr->wait_list);
261         INIT_LIST_HEAD(&p_ptr->subscription.nodesub_list);
262         p_ptr->congested_link = 0;
263         p_ptr->max_pkt = MAX_PKT_DEFAULT;
264         p_ptr->dispatcher = dispatcher;
265         p_ptr->wakeup = wakeup;
266         p_ptr->user_port = 0;
267         k_init_timer(&p_ptr->timer, (Handler)port_timeout, ref);
268         spin_lock_bh(&port_list_lock);
269         INIT_LIST_HEAD(&p_ptr->publications);
270         INIT_LIST_HEAD(&p_ptr->port_list);
271         list_add_tail(&p_ptr->port_list, &ports);
272         spin_unlock_bh(&port_list_lock);
273         port_unlock(p_ptr);
274         return ref;
275 }
276
277 int tipc_deleteport(u32 ref)
278 {
279         struct port *p_ptr;
280         struct sk_buff *buf = 0;
281
282         tipc_withdraw(ref, 0, 0);
283         p_ptr = port_lock(ref);
284         if (!p_ptr) 
285                 return -EINVAL;
286
287         ref_discard(ref);
288         port_unlock(p_ptr);
289
290         k_cancel_timer(&p_ptr->timer);
291         if (p_ptr->publ.connected) {
292                 buf = port_build_peer_abort_msg(p_ptr, TIPC_ERR_NO_PORT);
293                 nodesub_unsubscribe(&p_ptr->subscription);
294         }
295         if (p_ptr->user_port) {
296                 reg_remove_port(p_ptr->user_port);
297                 kfree(p_ptr->user_port);
298         }
299
300         spin_lock_bh(&port_list_lock);
301         list_del(&p_ptr->port_list);
302         list_del(&p_ptr->wait_list);
303         spin_unlock_bh(&port_list_lock);
304         k_term_timer(&p_ptr->timer);
305         kfree(p_ptr);
306         dbg("Deleted port %u\n", ref);
307         net_route_msg(buf);
308         return TIPC_OK;
309 }
310
311 /**
312  * tipc_get_port() - return port associated with 'ref'
313  * 
314  * Note: Port is not locked.
315  */
316
317 struct tipc_port *tipc_get_port(const u32 ref)
318 {
319         return (struct tipc_port *)ref_deref(ref);
320 }
321
322 /**
323  * tipc_get_handle - return user handle associated to port 'ref'
324  */
325
326 void *tipc_get_handle(const u32 ref)
327 {
328         struct port *p_ptr;
329         void * handle;
330
331         p_ptr = port_lock(ref);
332         if (!p_ptr)
333                 return 0;
334         handle = p_ptr->publ.usr_handle;
335         port_unlock(p_ptr);
336         return handle;
337 }
338
339 static inline int port_unreliable(struct port *p_ptr)
340 {
341         return msg_src_droppable(&p_ptr->publ.phdr);
342 }
343
344 int tipc_portunreliable(u32 ref, unsigned int *isunreliable)
345 {
346         struct port *p_ptr;
347         
348         p_ptr = port_lock(ref);
349         if (!p_ptr)
350                 return -EINVAL;
351         *isunreliable = port_unreliable(p_ptr);
352         spin_unlock_bh(p_ptr->publ.lock);
353         return TIPC_OK;
354 }
355
356 int tipc_set_portunreliable(u32 ref, unsigned int isunreliable)
357 {
358         struct port *p_ptr;
359         
360         p_ptr = port_lock(ref);
361         if (!p_ptr)
362                 return -EINVAL;
363         msg_set_src_droppable(&p_ptr->publ.phdr, (isunreliable != 0));
364         port_unlock(p_ptr);
365         return TIPC_OK;
366 }
367
368 static inline int port_unreturnable(struct port *p_ptr)
369 {
370         return msg_dest_droppable(&p_ptr->publ.phdr);
371 }
372
373 int tipc_portunreturnable(u32 ref, unsigned int *isunrejectable)
374 {
375         struct port *p_ptr;
376         
377         p_ptr = port_lock(ref);
378         if (!p_ptr)
379                 return -EINVAL;
380         *isunrejectable = port_unreturnable(p_ptr);
381         spin_unlock_bh(p_ptr->publ.lock);
382         return TIPC_OK;
383 }
384
385 int tipc_set_portunreturnable(u32 ref, unsigned int isunrejectable)
386 {
387         struct port *p_ptr;
388         
389         p_ptr = port_lock(ref);
390         if (!p_ptr)
391                 return -EINVAL;
392         msg_set_dest_droppable(&p_ptr->publ.phdr, (isunrejectable != 0));
393         port_unlock(p_ptr);
394         return TIPC_OK;
395 }
396
397 /* 
398  * port_build_proto_msg(): build a port level protocol 
399  * or a connection abortion message. Called with 
400  * tipc_port lock on.
401  */
402 static struct sk_buff *port_build_proto_msg(u32 destport, u32 destnode,
403                                             u32 origport, u32 orignode,
404                                             u32 usr, u32 type, u32 err, 
405                                             u32 seqno, u32 ack)
406 {
407         struct sk_buff *buf;
408         struct tipc_msg *msg;
409         
410         buf = buf_acquire(LONG_H_SIZE);
411         if (buf) {
412                 msg = buf_msg(buf);
413                 msg_init(msg, usr, type, err, LONG_H_SIZE, destnode);
414                 msg_set_destport(msg, destport);
415                 msg_set_origport(msg, origport);
416                 msg_set_destnode(msg, destnode);
417                 msg_set_orignode(msg, orignode);
418                 msg_set_transp_seqno(msg, seqno);
419                 msg_set_msgcnt(msg, ack);
420                 msg_dbg(msg, "PORT>SEND>:");
421         }
422         return buf;
423 }
424
425 int tipc_set_msg_option(struct tipc_port *tp_ptr, const char *opt, const u32 sz)
426 {
427         msg_expand(&tp_ptr->phdr, msg_destnode(&tp_ptr->phdr));
428         msg_set_options(&tp_ptr->phdr, opt, sz);
429         return TIPC_OK;
430 }
431
432 int tipc_reject_msg(struct sk_buff *buf, u32 err)
433 {
434         struct tipc_msg *msg = buf_msg(buf);
435         struct sk_buff *rbuf;
436         struct tipc_msg *rmsg;
437         int hdr_sz;
438         u32 imp = msg_importance(msg);
439         u32 data_sz = msg_data_sz(msg);
440
441         if (data_sz > MAX_REJECT_SIZE)
442                 data_sz = MAX_REJECT_SIZE;
443         if (msg_connected(msg) && (imp < TIPC_CRITICAL_IMPORTANCE))
444                 imp++;
445         msg_dbg(msg, "port->rej: ");
446
447         /* discard rejected message if it shouldn't be returned to sender */
448         if (msg_errcode(msg) || msg_dest_droppable(msg)) {
449                 buf_discard(buf);
450                 return data_sz;
451         }
452
453         /* construct rejected message */
454         if (msg_mcast(msg))
455                 hdr_sz = MCAST_H_SIZE;
456         else
457                 hdr_sz = LONG_H_SIZE;
458         rbuf = buf_acquire(data_sz + hdr_sz);
459         if (rbuf == NULL) {
460                 buf_discard(buf);
461                 return data_sz;
462         }
463         rmsg = buf_msg(rbuf);
464         msg_init(rmsg, imp, msg_type(msg), err, hdr_sz, msg_orignode(msg));
465         msg_set_destport(rmsg, msg_origport(msg));
466         msg_set_prevnode(rmsg, tipc_own_addr);
467         msg_set_origport(rmsg, msg_destport(msg));
468         if (msg_short(msg))
469                 msg_set_orignode(rmsg, tipc_own_addr);
470         else
471                 msg_set_orignode(rmsg, msg_destnode(msg));
472         msg_set_size(rmsg, data_sz + hdr_sz); 
473         msg_set_nametype(rmsg, msg_nametype(msg));
474         msg_set_nameinst(rmsg, msg_nameinst(msg));
475         memcpy(rbuf->data + hdr_sz, msg_data(msg), data_sz);
476
477         /* send self-abort message when rejecting on a connected port */
478         if (msg_connected(msg)) {
479                 struct sk_buff *abuf = 0;
480                 struct port *p_ptr = port_lock(msg_destport(msg));
481
482                 if (p_ptr) {
483                         if (p_ptr->publ.connected)
484                                 abuf = port_build_self_abort_msg(p_ptr, err);
485                         port_unlock(p_ptr);
486                 }
487                 net_route_msg(abuf);
488         }
489
490         /* send rejected message */
491         buf_discard(buf);
492         net_route_msg(rbuf);
493         return data_sz;
494 }
495
496 int port_reject_sections(struct port *p_ptr, struct tipc_msg *hdr,
497                          struct iovec const *msg_sect, u32 num_sect,
498                          int err)
499 {
500         struct sk_buff *buf;
501         int res;
502
503         res = msg_build(hdr, msg_sect, num_sect, MAX_MSG_SIZE, 
504                         !p_ptr->user_port, &buf);
505         if (!buf)
506                 return res;
507
508         return tipc_reject_msg(buf, err);
509 }
510
511 static void port_timeout(unsigned long ref)
512 {
513         struct port *p_ptr = port_lock(ref);
514         struct sk_buff *buf = 0;
515
516         if (!p_ptr || !p_ptr->publ.connected)
517                 return;
518
519         /* Last probe answered ? */
520         if (p_ptr->probing_state == PROBING) {
521                 buf = port_build_self_abort_msg(p_ptr, TIPC_ERR_NO_PORT);
522         } else {
523                 buf = port_build_proto_msg(port_peerport(p_ptr),
524                                            port_peernode(p_ptr),
525                                            p_ptr->publ.ref,
526                                            tipc_own_addr,
527                                            CONN_MANAGER,
528                                            CONN_PROBE,
529                                            TIPC_OK, 
530                                            port_out_seqno(p_ptr),
531                                            0);
532                 port_incr_out_seqno(p_ptr);
533                 p_ptr->probing_state = PROBING;
534                 k_start_timer(&p_ptr->timer, p_ptr->probing_interval);
535         }
536         port_unlock(p_ptr);
537         net_route_msg(buf);
538 }
539
540
541 static void port_handle_node_down(unsigned long ref)
542 {
543         struct port *p_ptr = port_lock(ref);
544         struct sk_buff* buf = 0;
545
546         if (!p_ptr)
547                 return;
548         buf = port_build_self_abort_msg(p_ptr, TIPC_ERR_NO_NODE);
549         port_unlock(p_ptr);
550         net_route_msg(buf);
551 }
552
553
554 static struct sk_buff *port_build_self_abort_msg(struct port *p_ptr, u32 err)
555 {
556         u32 imp = msg_importance(&p_ptr->publ.phdr);
557
558         if (!p_ptr->publ.connected)
559                 return 0;
560         if (imp < TIPC_CRITICAL_IMPORTANCE)
561                 imp++;
562         return port_build_proto_msg(p_ptr->publ.ref,
563                                     tipc_own_addr,
564                                     port_peerport(p_ptr),
565                                     port_peernode(p_ptr),
566                                     imp,
567                                     TIPC_CONN_MSG,
568                                     err, 
569                                     p_ptr->last_in_seqno + 1,
570                                     0);
571 }
572
573
574 static struct sk_buff *port_build_peer_abort_msg(struct port *p_ptr, u32 err)
575 {
576         u32 imp = msg_importance(&p_ptr->publ.phdr);
577
578         if (!p_ptr->publ.connected)
579                 return 0;
580         if (imp < TIPC_CRITICAL_IMPORTANCE)
581                 imp++;
582         return port_build_proto_msg(port_peerport(p_ptr),
583                                     port_peernode(p_ptr),
584                                     p_ptr->publ.ref,
585                                     tipc_own_addr,
586                                     imp,
587                                     TIPC_CONN_MSG,
588                                     err, 
589                                     port_out_seqno(p_ptr),
590                                     0);
591 }
592
593 void port_recv_proto_msg(struct sk_buff *buf)
594 {
595         struct tipc_msg *msg = buf_msg(buf);
596         struct port *p_ptr = port_lock(msg_destport(msg));
597         u32 err = TIPC_OK;
598         struct sk_buff *r_buf = 0;
599         struct sk_buff *abort_buf = 0;
600
601         msg_dbg(msg, "PORT<RECV<:");
602
603         if (!p_ptr) {
604                 err = TIPC_ERR_NO_PORT;
605         } else if (p_ptr->publ.connected) {
606                 if (port_peernode(p_ptr) != msg_orignode(msg))
607                         err = TIPC_ERR_NO_PORT;
608                 if (port_peerport(p_ptr) != msg_origport(msg))
609                         err = TIPC_ERR_NO_PORT;
610                 if (!err && msg_routed(msg)) {
611                         u32 seqno = msg_transp_seqno(msg);
612                         u32 myno =  ++p_ptr->last_in_seqno;
613                         if (seqno != myno) {
614                                 err = TIPC_ERR_NO_PORT;
615                                 abort_buf = port_build_self_abort_msg(p_ptr, err);
616                         }
617                 }
618                 if (msg_type(msg) == CONN_ACK) {
619                         int wakeup = port_congested(p_ptr) && 
620                                      p_ptr->publ.congested &&
621                                      p_ptr->wakeup;
622                         p_ptr->acked += msg_msgcnt(msg);
623                         if (port_congested(p_ptr))
624                                 goto exit;
625                         p_ptr->publ.congested = 0;
626                         if (!wakeup)
627                                 goto exit;
628                         p_ptr->wakeup(&p_ptr->publ);
629                         goto exit;
630                 }
631         } else if (p_ptr->publ.published) {
632                 err = TIPC_ERR_NO_PORT;
633         }
634         if (err) {
635                 r_buf = port_build_proto_msg(msg_origport(msg),
636                                              msg_orignode(msg), 
637                                              msg_destport(msg), 
638                                              tipc_own_addr,
639                                              DATA_HIGH,
640                                              TIPC_CONN_MSG,
641                                              err,
642                                              0,
643                                              0);
644                 goto exit;
645         }
646
647         /* All is fine */
648         if (msg_type(msg) == CONN_PROBE) {
649                 r_buf = port_build_proto_msg(msg_origport(msg), 
650                                              msg_orignode(msg), 
651                                              msg_destport(msg), 
652                                              tipc_own_addr, 
653                                              CONN_MANAGER,
654                                              CONN_PROBE_REPLY,
655                                              TIPC_OK,
656                                              port_out_seqno(p_ptr),
657                                              0);
658         }
659         p_ptr->probing_state = CONFIRMED;
660         port_incr_out_seqno(p_ptr);
661 exit:
662         if (p_ptr)
663                 port_unlock(p_ptr);
664         net_route_msg(r_buf);
665         net_route_msg(abort_buf);
666         buf_discard(buf);
667 }
668
669 static void port_print(struct port *p_ptr, struct print_buf *buf, int full_id)
670 {
671         struct publication *publ;
672
673         if (full_id)
674                 tipc_printf(buf, "<%u.%u.%u:%u>:", 
675                             tipc_zone(tipc_own_addr), tipc_cluster(tipc_own_addr),
676                             tipc_node(tipc_own_addr), p_ptr->publ.ref);
677         else
678                 tipc_printf(buf, "%-10u:", p_ptr->publ.ref);
679
680         if (p_ptr->publ.connected) {
681                 u32 dport = port_peerport(p_ptr);
682                 u32 destnode = port_peernode(p_ptr);
683
684                 tipc_printf(buf, " connected to <%u.%u.%u:%u>",
685                             tipc_zone(destnode), tipc_cluster(destnode),
686                             tipc_node(destnode), dport);
687                 if (p_ptr->publ.conn_type != 0)
688                         tipc_printf(buf, " via {%u,%u}",
689                                     p_ptr->publ.conn_type,
690                                     p_ptr->publ.conn_instance);
691         }
692         else if (p_ptr->publ.published) {
693                 tipc_printf(buf, " bound to");
694                 list_for_each_entry(publ, &p_ptr->publications, pport_list) {
695                         if (publ->lower == publ->upper)
696                                 tipc_printf(buf, " {%u,%u}", publ->type,
697                                             publ->lower);
698                         else
699                                 tipc_printf(buf, " {%u,%u,%u}", publ->type, 
700                                             publ->lower, publ->upper);
701                 }
702         }
703         tipc_printf(buf, "\n");
704 }
705
706 #define MAX_PORT_QUERY 32768
707
708 struct sk_buff *port_get_ports(void)
709 {
710         struct sk_buff *buf;
711         struct tlv_desc *rep_tlv;
712         struct print_buf pb;
713         struct port *p_ptr;
714         int str_len;
715
716         buf = cfg_reply_alloc(TLV_SPACE(MAX_PORT_QUERY));
717         if (!buf)
718                 return NULL;
719         rep_tlv = (struct tlv_desc *)buf->data;
720
721         printbuf_init(&pb, TLV_DATA(rep_tlv), MAX_PORT_QUERY);
722         spin_lock_bh(&port_list_lock);
723         list_for_each_entry(p_ptr, &ports, port_list) {
724                 spin_lock_bh(p_ptr->publ.lock);
725                 port_print(p_ptr, &pb, 0);
726                 spin_unlock_bh(p_ptr->publ.lock);
727         }
728         spin_unlock_bh(&port_list_lock);
729         str_len = printbuf_validate(&pb);
730
731         skb_put(buf, TLV_SPACE(str_len));
732         TLV_SET(rep_tlv, TIPC_TLV_ULTRA_STRING, NULL, str_len);
733
734         return buf;
735 }
736
737 #if 0
738
739 #define MAX_PORT_STATS 2000
740
741 struct sk_buff *port_show_stats(const void *req_tlv_area, int req_tlv_space)
742 {
743         u32 ref;
744         struct port *p_ptr;
745         struct sk_buff *buf;
746         struct tlv_desc *rep_tlv;
747         struct print_buf pb;
748         int str_len;
749
750         if (!TLV_CHECK(req_tlv_area, req_tlv_space, TIPC_TLV_PORT_REF))
751                 return cfg_reply_error_string(TIPC_CFG_TLV_ERROR);
752
753         ref = *(u32 *)TLV_DATA(req_tlv_area);
754         ref = ntohl(ref);
755
756         p_ptr = port_lock(ref);
757         if (!p_ptr)
758                 return cfg_reply_error_string("port not found");
759
760         buf = cfg_reply_alloc(TLV_SPACE(MAX_PORT_STATS));
761         if (!buf) {
762                 port_unlock(p_ptr);
763                 return NULL;
764         }
765         rep_tlv = (struct tlv_desc *)buf->data;
766
767         printbuf_init(&pb, TLV_DATA(rep_tlv), MAX_PORT_STATS);
768         port_print(p_ptr, &pb, 1);
769         /* NEED TO FILL IN ADDITIONAL PORT STATISTICS HERE */
770         port_unlock(p_ptr);
771         str_len = printbuf_validate(&pb);
772
773         skb_put(buf, TLV_SPACE(str_len));
774         TLV_SET(rep_tlv, TIPC_TLV_ULTRA_STRING, NULL, str_len);
775
776         return buf;
777 }
778
779 #endif
780
781 void port_reinit(void)
782 {
783         struct port *p_ptr;
784         struct tipc_msg *msg;
785
786         spin_lock_bh(&port_list_lock);
787         list_for_each_entry(p_ptr, &ports, port_list) {
788                 msg = &p_ptr->publ.phdr;
789                 if (msg_orignode(msg) == tipc_own_addr)
790                         break;
791                 msg_set_orignode(msg, tipc_own_addr);
792         }
793         spin_unlock_bh(&port_list_lock);
794 }
795
796
797 /*
798  *  port_dispatcher_sigh(): Signal handler for messages destinated
799  *                          to the tipc_port interface.
800  */
801
802 static void port_dispatcher_sigh(void *dummy)
803 {
804         struct sk_buff *buf;
805
806         spin_lock_bh(&queue_lock);
807         buf = msg_queue_head;
808         msg_queue_head = 0;
809         spin_unlock_bh(&queue_lock);
810
811         while (buf) {
812                 struct port *p_ptr;
813                 struct user_port *up_ptr;
814                 struct tipc_portid orig;
815                 struct tipc_name_seq dseq;
816                 void *usr_handle;
817                 int connected;
818                 int published;
819
820                 struct sk_buff *next = buf->next;
821                 struct tipc_msg *msg = buf_msg(buf);
822                 u32 dref = msg_destport(msg);
823                 
824                 p_ptr = port_lock(dref);
825                 if (!p_ptr) {
826                         /* Port deleted while msg in queue */
827                         tipc_reject_msg(buf, TIPC_ERR_NO_PORT);
828                         buf = next;
829                         continue;
830                 }
831                 orig.ref = msg_origport(msg);
832                 orig.node = msg_orignode(msg);
833                 up_ptr = p_ptr->user_port;
834                 usr_handle = up_ptr->usr_handle;
835                 connected = p_ptr->publ.connected;
836                 published = p_ptr->publ.published;
837
838                 if (unlikely(msg_errcode(msg)))
839                         goto err;
840
841                 switch (msg_type(msg)) {
842                 
843                 case TIPC_CONN_MSG:{
844                                 tipc_conn_msg_event cb = up_ptr->conn_msg_cb;
845                                 u32 peer_port = port_peerport(p_ptr);
846                                 u32 peer_node = port_peernode(p_ptr);
847
848                                 spin_unlock_bh(p_ptr->publ.lock);
849                                 if (unlikely(!connected)) {
850                                         if (unlikely(published))
851                                                 goto reject;
852                                         tipc_connect2port(dref,&orig);
853                                 }
854                                 if (unlikely(msg_origport(msg) != peer_port))
855                                         goto reject;
856                                 if (unlikely(msg_orignode(msg) != peer_node))
857                                         goto reject;
858                                 if (unlikely(!cb))
859                                         goto reject;
860                                 if (unlikely(++p_ptr->publ.conn_unacked >= 
861                                              TIPC_FLOW_CONTROL_WIN))
862                                         tipc_acknowledge(dref, 
863                                                          p_ptr->publ.conn_unacked);
864                                 skb_pull(buf, msg_hdr_sz(msg));
865                                 cb(usr_handle, dref, &buf, msg_data(msg),
866                                    msg_data_sz(msg));
867                                 break;
868                         }
869                 case TIPC_DIRECT_MSG:{
870                                 tipc_msg_event cb = up_ptr->msg_cb;
871
872                                 spin_unlock_bh(p_ptr->publ.lock);
873                                 if (unlikely(connected))
874                                         goto reject;
875                                 if (unlikely(!cb))
876                                         goto reject;
877                                 skb_pull(buf, msg_hdr_sz(msg));
878                                 cb(usr_handle, dref, &buf, msg_data(msg), 
879                                    msg_data_sz(msg), msg_importance(msg),
880                                    &orig);
881                                 break;
882                         }
883                 case TIPC_NAMED_MSG:{
884                                 tipc_named_msg_event cb = up_ptr->named_msg_cb;
885
886                                 spin_unlock_bh(p_ptr->publ.lock);
887                                 if (unlikely(connected))
888                                         goto reject;
889                                 if (unlikely(!cb))
890                                         goto reject;
891                                 if (unlikely(!published))
892                                         goto reject;
893                                 dseq.type =  msg_nametype(msg);
894                                 dseq.lower = msg_nameinst(msg);
895                                 dseq.upper = dseq.lower;
896                                 skb_pull(buf, msg_hdr_sz(msg));
897                                 cb(usr_handle, dref, &buf, msg_data(msg), 
898                                    msg_data_sz(msg), msg_importance(msg),
899                                    &orig, &dseq);
900                                 break;
901                         }
902                 }
903                 if (buf)
904                         buf_discard(buf);
905                 buf = next;
906                 continue;
907 err:
908                 switch (msg_type(msg)) {
909                 
910                 case TIPC_CONN_MSG:{
911                                 tipc_conn_shutdown_event cb = 
912                                         up_ptr->conn_err_cb;
913                                 u32 peer_port = port_peerport(p_ptr);
914                                 u32 peer_node = port_peernode(p_ptr);
915
916                                 spin_unlock_bh(p_ptr->publ.lock);
917                                 if (!connected || !cb)
918                                         break;
919                                 if (msg_origport(msg) != peer_port)
920                                         break;
921                                 if (msg_orignode(msg) != peer_node)
922                                         break;
923                                 tipc_disconnect(dref);
924                                 skb_pull(buf, msg_hdr_sz(msg));
925                                 cb(usr_handle, dref, &buf, msg_data(msg),
926                                    msg_data_sz(msg), msg_errcode(msg));
927                                 break;
928                         }
929                 case TIPC_DIRECT_MSG:{
930                                 tipc_msg_err_event cb = up_ptr->err_cb;
931
932                                 spin_unlock_bh(p_ptr->publ.lock);
933                                 if (connected || !cb)
934                                         break;
935                                 skb_pull(buf, msg_hdr_sz(msg));
936                                 cb(usr_handle, dref, &buf, msg_data(msg),
937                                    msg_data_sz(msg), msg_errcode(msg), &orig);
938                                 break;
939                         }
940                 case TIPC_NAMED_MSG:{
941                                 tipc_named_msg_err_event cb = 
942                                         up_ptr->named_err_cb;
943
944                                 spin_unlock_bh(p_ptr->publ.lock);
945                                 if (connected || !cb)
946                                         break;
947                                 dseq.type =  msg_nametype(msg);
948                                 dseq.lower = msg_nameinst(msg);
949                                 dseq.upper = dseq.lower;
950                                 skb_pull(buf, msg_hdr_sz(msg));
951                                 cb(usr_handle, dref, &buf, msg_data(msg), 
952                                    msg_data_sz(msg), msg_errcode(msg), &dseq);
953                                 break;
954                         }
955                 }
956                 if (buf)
957                         buf_discard(buf);
958                 buf = next;
959                 continue;
960 reject:
961                 tipc_reject_msg(buf, TIPC_ERR_NO_PORT);
962                 buf = next;
963         }
964 }
965
966 /*
967  *  port_dispatcher(): Dispatcher for messages destinated
968  *  to the tipc_port interface. Called with port locked.
969  */
970
971 static u32 port_dispatcher(struct tipc_port *dummy, struct sk_buff *buf)
972 {
973         buf->next = NULL;
974         spin_lock_bh(&queue_lock);
975         if (msg_queue_head) {
976                 msg_queue_tail->next = buf;
977                 msg_queue_tail = buf;
978         } else {
979                 msg_queue_tail = msg_queue_head = buf;
980                 k_signal((Handler)port_dispatcher_sigh, 0);
981         }
982         spin_unlock_bh(&queue_lock);
983         return TIPC_OK;
984 }
985
986 /* 
987  * Wake up port after congestion: Called with port locked,
988  *                                
989  */
990
991 static void port_wakeup_sh(unsigned long ref)
992 {
993         struct port *p_ptr;
994         struct user_port *up_ptr;
995         tipc_continue_event cb = 0;
996         void *uh = 0;
997
998         p_ptr = port_lock(ref);
999         if (p_ptr) {
1000                 up_ptr = p_ptr->user_port;
1001                 if (up_ptr) {
1002                         cb = up_ptr->continue_event_cb;
1003                         uh = up_ptr->usr_handle;
1004                 }
1005                 port_unlock(p_ptr);
1006         }
1007         if (cb)
1008                 cb(uh, ref);
1009 }
1010
1011
1012 static void port_wakeup(struct tipc_port *p_ptr)
1013 {
1014         k_signal((Handler)port_wakeup_sh, p_ptr->ref);
1015 }
1016
1017 void tipc_acknowledge(u32 ref, u32 ack)
1018 {
1019         struct port *p_ptr;
1020         struct sk_buff *buf = 0;
1021
1022         p_ptr = port_lock(ref);
1023         if (!p_ptr)
1024                 return;
1025         if (p_ptr->publ.connected) {
1026                 p_ptr->publ.conn_unacked -= ack;
1027                 buf = port_build_proto_msg(port_peerport(p_ptr),
1028                                            port_peernode(p_ptr),
1029                                            ref,
1030                                            tipc_own_addr,
1031                                            CONN_MANAGER,
1032                                            CONN_ACK,
1033                                            TIPC_OK, 
1034                                            port_out_seqno(p_ptr),
1035                                            ack);
1036         }
1037         port_unlock(p_ptr);
1038         net_route_msg(buf);
1039 }
1040
1041 /*
1042  * tipc_createport(): user level call. Will add port to
1043  *                    registry if non-zero user_ref.
1044  */
1045
1046 int tipc_createport(u32 user_ref, 
1047                     void *usr_handle, 
1048                     unsigned int importance, 
1049                     tipc_msg_err_event error_cb, 
1050                     tipc_named_msg_err_event named_error_cb, 
1051                     tipc_conn_shutdown_event conn_error_cb, 
1052                     tipc_msg_event msg_cb, 
1053                     tipc_named_msg_event named_msg_cb, 
1054                     tipc_conn_msg_event conn_msg_cb, 
1055                     tipc_continue_event continue_event_cb,/* May be zero */
1056                     u32 *portref)
1057 {
1058         struct user_port *up_ptr;
1059         struct port *p_ptr; 
1060         u32 ref;
1061
1062         up_ptr = (struct user_port *)kmalloc(sizeof(*up_ptr), GFP_ATOMIC);
1063         if (up_ptr == NULL) {
1064                 return -ENOMEM;
1065         }
1066         ref = tipc_createport_raw(0, port_dispatcher, port_wakeup, importance);
1067         p_ptr = port_lock(ref);
1068         if (!p_ptr) {
1069                 kfree(up_ptr);
1070                 return -ENOMEM;
1071         }
1072
1073         p_ptr->user_port = up_ptr;
1074         up_ptr->user_ref = user_ref;
1075         up_ptr->usr_handle = usr_handle;
1076         up_ptr->ref = p_ptr->publ.ref;
1077         up_ptr->err_cb = error_cb;
1078         up_ptr->named_err_cb = named_error_cb;
1079         up_ptr->conn_err_cb = conn_error_cb;
1080         up_ptr->msg_cb = msg_cb;
1081         up_ptr->named_msg_cb = named_msg_cb;
1082         up_ptr->conn_msg_cb = conn_msg_cb;
1083         up_ptr->continue_event_cb = continue_event_cb;
1084         INIT_LIST_HEAD(&up_ptr->uport_list);
1085         reg_add_port(up_ptr);
1086         *portref = p_ptr->publ.ref;
1087         dbg(" tipc_createport: %x with ref %u\n", p_ptr, p_ptr->publ.ref);        
1088         port_unlock(p_ptr);
1089         return TIPC_OK;
1090 }
1091
1092 int tipc_ownidentity(u32 ref, struct tipc_portid *id)
1093 {
1094         id->ref = ref;
1095         id->node = tipc_own_addr;
1096         return TIPC_OK;
1097 }
1098
1099 int tipc_portimportance(u32 ref, unsigned int *importance)
1100 {
1101         struct port *p_ptr;
1102         
1103         p_ptr = port_lock(ref);
1104         if (!p_ptr)
1105                 return -EINVAL;
1106         *importance = (unsigned int)msg_importance(&p_ptr->publ.phdr);
1107         spin_unlock_bh(p_ptr->publ.lock);
1108         return TIPC_OK;
1109 }
1110
1111 int tipc_set_portimportance(u32 ref, unsigned int imp)
1112 {
1113         struct port *p_ptr;
1114
1115         if (imp > TIPC_CRITICAL_IMPORTANCE)
1116                 return -EINVAL;
1117
1118         p_ptr = port_lock(ref);
1119         if (!p_ptr)
1120                 return -EINVAL;
1121         msg_set_importance(&p_ptr->publ.phdr, (u32)imp);
1122         spin_unlock_bh(p_ptr->publ.lock);
1123         return TIPC_OK;
1124 }
1125
1126
1127 int tipc_publish(u32 ref, unsigned int scope, struct tipc_name_seq const *seq)
1128 {
1129         struct port *p_ptr;
1130         struct publication *publ;
1131         u32 key;
1132         int res = -EINVAL;
1133
1134         p_ptr = port_lock(ref);
1135         dbg("tipc_publ %u, p_ptr = %x, conn = %x, scope = %x, "
1136             "lower = %u, upper = %u\n",
1137             ref, p_ptr, p_ptr->publ.connected, scope, seq->lower, seq->upper);
1138         if (!p_ptr)
1139                 return -EINVAL;
1140         if (p_ptr->publ.connected)
1141                 goto exit;
1142         if (seq->lower > seq->upper)
1143                 goto exit;
1144         if ((scope < TIPC_ZONE_SCOPE) || (scope > TIPC_NODE_SCOPE))
1145                 goto exit;
1146         key = ref + p_ptr->pub_count + 1;
1147         if (key == ref) {
1148                 res = -EADDRINUSE;
1149                 goto exit;
1150         }
1151         publ = nametbl_publish(seq->type, seq->lower, seq->upper,
1152                                scope, p_ptr->publ.ref, key);
1153         if (publ) {
1154                 list_add(&publ->pport_list, &p_ptr->publications);
1155                 p_ptr->pub_count++;
1156                 p_ptr->publ.published = 1;
1157                 res = TIPC_OK;
1158         }
1159 exit:
1160         port_unlock(p_ptr);
1161         return res;
1162 }
1163
1164 int tipc_withdraw(u32 ref, unsigned int scope, struct tipc_name_seq const *seq)
1165 {
1166         struct port *p_ptr;
1167         struct publication *publ;
1168         struct publication *tpubl;
1169         int res = -EINVAL;
1170         
1171         p_ptr = port_lock(ref);
1172         if (!p_ptr)
1173                 return -EINVAL;
1174         if (!p_ptr->publ.published)
1175                 goto exit;
1176         if (!seq) {
1177                 list_for_each_entry_safe(publ, tpubl, 
1178                                          &p_ptr->publications, pport_list) {
1179                         nametbl_withdraw(publ->type, publ->lower, 
1180                                          publ->ref, publ->key);
1181                 }
1182                 res = TIPC_OK;
1183         } else {
1184                 list_for_each_entry_safe(publ, tpubl, 
1185                                          &p_ptr->publications, pport_list) {
1186                         if (publ->scope != scope)
1187                                 continue;
1188                         if (publ->type != seq->type)
1189                                 continue;
1190                         if (publ->lower != seq->lower)
1191                                 continue;
1192                         if (publ->upper != seq->upper)
1193                                 break;
1194                         nametbl_withdraw(publ->type, publ->lower, 
1195                                          publ->ref, publ->key);
1196                         res = TIPC_OK;
1197                         break;
1198                 }
1199         }
1200         if (list_empty(&p_ptr->publications))
1201                 p_ptr->publ.published = 0;
1202 exit:
1203         port_unlock(p_ptr);
1204         return res;
1205 }
1206
1207 int tipc_connect2port(u32 ref, struct tipc_portid const *peer)
1208 {
1209         struct port *p_ptr;
1210         struct tipc_msg *msg;
1211         int res = -EINVAL;
1212
1213         p_ptr = port_lock(ref);
1214         if (!p_ptr)
1215                 return -EINVAL;
1216         if (p_ptr->publ.published || p_ptr->publ.connected)
1217                 goto exit;
1218         if (!peer->ref)
1219                 goto exit;
1220
1221         msg = &p_ptr->publ.phdr;
1222         msg_set_destnode(msg, peer->node);
1223         msg_set_destport(msg, peer->ref);
1224         msg_set_orignode(msg, tipc_own_addr);
1225         msg_set_origport(msg, p_ptr->publ.ref);
1226         msg_set_transp_seqno(msg, 42);
1227         msg_set_type(msg, TIPC_CONN_MSG);
1228         if (!may_route(peer->node))
1229                 msg_set_hdr_sz(msg, SHORT_H_SIZE);
1230         else
1231                 msg_set_hdr_sz(msg, LONG_H_SIZE);
1232
1233         p_ptr->probing_interval = PROBING_INTERVAL;
1234         p_ptr->probing_state = CONFIRMED;
1235         p_ptr->publ.connected = 1;
1236         k_start_timer(&p_ptr->timer, p_ptr->probing_interval);
1237
1238         nodesub_subscribe(&p_ptr->subscription,peer->node, (void *)ref,
1239                           (net_ev_handler)port_handle_node_down);
1240         res = TIPC_OK;
1241 exit:
1242         port_unlock(p_ptr);
1243         p_ptr->max_pkt = link_get_max_pkt(peer->node, ref);
1244         return res;
1245 }
1246
1247 /*
1248  * tipc_disconnect(): Disconnect port form peer.
1249  *                    This is a node local operation.
1250  */
1251
1252 int tipc_disconnect(u32 ref)
1253 {
1254         struct port *p_ptr;
1255         int res = -ENOTCONN;
1256
1257         p_ptr = port_lock(ref);
1258         if (!p_ptr)
1259                 return -EINVAL;
1260         if (p_ptr->publ.connected) {
1261                 p_ptr->publ.connected = 0;
1262                 /* let timer expire on it's own to avoid deadlock! */
1263                 nodesub_unsubscribe(&p_ptr->subscription);
1264                 res = TIPC_OK;
1265         }
1266         port_unlock(p_ptr);
1267         return res;
1268 }
1269
1270 /*
1271  * tipc_shutdown(): Send a SHUTDOWN msg to peer and disconnect
1272  */
1273 int tipc_shutdown(u32 ref)
1274 {
1275         struct port *p_ptr;
1276         struct sk_buff *buf = 0;
1277
1278         p_ptr = port_lock(ref);
1279         if (!p_ptr)
1280                 return -EINVAL;
1281
1282         if (p_ptr->publ.connected) {
1283                 u32 imp = msg_importance(&p_ptr->publ.phdr);
1284                 if (imp < TIPC_CRITICAL_IMPORTANCE)
1285                         imp++;
1286                 buf = port_build_proto_msg(port_peerport(p_ptr),
1287                                            port_peernode(p_ptr),
1288                                            ref,
1289                                            tipc_own_addr,
1290                                            imp,
1291                                            TIPC_CONN_MSG,
1292                                            TIPC_CONN_SHUTDOWN, 
1293                                            port_out_seqno(p_ptr),
1294                                            0);
1295         }
1296         port_unlock(p_ptr);
1297         net_route_msg(buf);
1298         return tipc_disconnect(ref);
1299 }
1300
1301 int tipc_isconnected(u32 ref, int *isconnected)
1302 {
1303         struct port *p_ptr;
1304         
1305         p_ptr = port_lock(ref);
1306         if (!p_ptr)
1307                 return -EINVAL;
1308         *isconnected = p_ptr->publ.connected;
1309         port_unlock(p_ptr);
1310         return TIPC_OK;
1311 }
1312
1313 int tipc_peer(u32 ref, struct tipc_portid *peer)
1314 {
1315         struct port *p_ptr;
1316         int res;
1317          
1318         p_ptr = port_lock(ref);
1319         if (!p_ptr)
1320                 return -EINVAL;
1321         if (p_ptr->publ.connected) {
1322                 peer->ref = port_peerport(p_ptr);
1323                 peer->node = port_peernode(p_ptr);
1324                 res = TIPC_OK;
1325         } else
1326                 res = -ENOTCONN;
1327         port_unlock(p_ptr);
1328         return res;
1329 }
1330
1331 int tipc_ref_valid(u32 ref)
1332 {
1333         /* Works irrespective of type */
1334         return !!ref_deref(ref);
1335 }
1336
1337
1338 /*
1339  *  port_recv_sections(): Concatenate and deliver sectioned
1340  *                        message for this node.
1341  */
1342
1343 int port_recv_sections(struct port *sender, unsigned int num_sect,
1344                        struct iovec const *msg_sect)
1345 {
1346         struct sk_buff *buf;
1347         int res;
1348          
1349         res = msg_build(&sender->publ.phdr, msg_sect, num_sect,
1350                         MAX_MSG_SIZE, !sender->user_port, &buf);
1351         if (likely(buf))
1352                 port_recv_msg(buf);
1353         return res;
1354 }
1355
1356 /**
1357  * tipc_send - send message sections on connection
1358  */
1359
1360 int tipc_send(u32 ref, unsigned int num_sect, struct iovec const *msg_sect)
1361 {
1362         struct port *p_ptr;
1363         u32 destnode;
1364         int res;
1365
1366         p_ptr = port_deref(ref);
1367         if (!p_ptr || !p_ptr->publ.connected)
1368                 return -EINVAL;
1369
1370         p_ptr->publ.congested = 1;
1371         if (!port_congested(p_ptr)) {
1372                 destnode = port_peernode(p_ptr);
1373                 if (likely(destnode != tipc_own_addr))
1374                         res = link_send_sections_fast(p_ptr, msg_sect, num_sect,
1375                                                       destnode);
1376                 else
1377                         res = port_recv_sections(p_ptr, num_sect, msg_sect);
1378
1379                 if (likely(res != -ELINKCONG)) {
1380                         port_incr_out_seqno(p_ptr);
1381                         p_ptr->publ.congested = 0;
1382                         p_ptr->sent++;
1383                         return res;
1384                 }
1385         }
1386         if (port_unreliable(p_ptr)) {
1387                 p_ptr->publ.congested = 0;
1388                 /* Just calculate msg length and return */
1389                 return msg_calc_data_size(msg_sect, num_sect);
1390         }
1391         return -ELINKCONG;
1392 }
1393
1394 /** 
1395  * tipc_send_buf - send message buffer on connection
1396  */
1397
1398 int tipc_send_buf(u32 ref, struct sk_buff *buf, unsigned int dsz)
1399 {
1400         struct port *p_ptr;
1401         struct tipc_msg *msg;
1402         u32 destnode;
1403         u32 hsz;
1404         u32 sz;
1405         u32 res;
1406          
1407         p_ptr = port_deref(ref);
1408         if (!p_ptr || !p_ptr->publ.connected)
1409                 return -EINVAL;
1410
1411         msg = &p_ptr->publ.phdr;
1412         hsz = msg_hdr_sz(msg);
1413         sz = hsz + dsz;
1414         msg_set_size(msg, sz);
1415         if (skb_cow(buf, hsz))
1416                 return -ENOMEM;
1417
1418         skb_push(buf, hsz);
1419         memcpy(buf->data, (unchar *)msg, hsz);
1420         destnode = msg_destnode(msg);
1421         p_ptr->publ.congested = 1;
1422         if (!port_congested(p_ptr)) {
1423                 if (likely(destnode != tipc_own_addr))
1424                         res = tipc_send_buf_fast(buf, destnode);
1425                 else {
1426                         port_recv_msg(buf);
1427                         res = sz;
1428                 }
1429                 if (likely(res != -ELINKCONG)) {
1430                         port_incr_out_seqno(p_ptr);
1431                         p_ptr->sent++;
1432                         p_ptr->publ.congested = 0;
1433                         return res;
1434                 }
1435         }
1436         if (port_unreliable(p_ptr)) {
1437                 p_ptr->publ.congested = 0;
1438                 return dsz;
1439         }
1440         return -ELINKCONG;
1441 }
1442
1443 /**
1444  * tipc_forward2name - forward message sections to port name
1445  */
1446
1447 int tipc_forward2name(u32 ref, 
1448                       struct tipc_name const *name, 
1449                       u32 domain,
1450                       u32 num_sect, 
1451                       struct iovec const *msg_sect,
1452                       struct tipc_portid const *orig, 
1453                       unsigned int importance)
1454 {
1455         struct port *p_ptr;
1456         struct tipc_msg *msg;
1457         u32 destnode = domain;
1458         u32 destport = 0;
1459         int res;
1460
1461         p_ptr = port_deref(ref);
1462         if (!p_ptr || p_ptr->publ.connected)
1463                 return -EINVAL;
1464
1465         msg = &p_ptr->publ.phdr;
1466         msg_set_type(msg, TIPC_NAMED_MSG);
1467         msg_set_orignode(msg, orig->node);
1468         msg_set_origport(msg, orig->ref);
1469         msg_set_hdr_sz(msg, LONG_H_SIZE);
1470         msg_set_nametype(msg, name->type);
1471         msg_set_nameinst(msg, name->instance);
1472         msg_set_lookup_scope(msg, addr_scope(domain));
1473         if (importance <= TIPC_CRITICAL_IMPORTANCE)
1474                 msg_set_importance(msg,importance);
1475         destport = nametbl_translate(name->type, name->instance, &destnode);
1476         msg_set_destnode(msg, destnode);
1477         msg_set_destport(msg, destport);
1478
1479         if (likely(destport || destnode)) {
1480                 p_ptr->sent++;
1481                 if (likely(destnode == tipc_own_addr))
1482                         return port_recv_sections(p_ptr, num_sect, msg_sect);
1483                 res = link_send_sections_fast(p_ptr, msg_sect, num_sect, 
1484                                               destnode);
1485                 if (likely(res != -ELINKCONG))
1486                         return res;
1487                 if (port_unreliable(p_ptr)) {
1488                         /* Just calculate msg length and return */
1489                         return msg_calc_data_size(msg_sect, num_sect);
1490                 }
1491                 return -ELINKCONG;
1492         }
1493         return port_reject_sections(p_ptr, msg, msg_sect, num_sect, 
1494                                     TIPC_ERR_NO_NAME);
1495 }
1496
1497 /**
1498  * tipc_send2name - send message sections to port name
1499  */
1500
1501 int tipc_send2name(u32 ref, 
1502                    struct tipc_name const *name,
1503                    unsigned int domain, 
1504                    unsigned int num_sect, 
1505                    struct iovec const *msg_sect)
1506 {
1507         struct tipc_portid orig;
1508
1509         orig.ref = ref;
1510         orig.node = tipc_own_addr;
1511         return tipc_forward2name(ref, name, domain, num_sect, msg_sect, &orig,
1512                                  TIPC_PORT_IMPORTANCE);
1513 }
1514
1515 /** 
1516  * tipc_forward_buf2name - forward message buffer to port name
1517  */
1518
1519 int tipc_forward_buf2name(u32 ref,
1520                           struct tipc_name const *name,
1521                           u32 domain,
1522                           struct sk_buff *buf,
1523                           unsigned int dsz,
1524                           struct tipc_portid const *orig,
1525                           unsigned int importance)
1526 {
1527         struct port *p_ptr;
1528         struct tipc_msg *msg;
1529         u32 destnode = domain;
1530         u32 destport = 0;
1531         int res;
1532
1533         p_ptr = (struct port *)ref_deref(ref);
1534         if (!p_ptr || p_ptr->publ.connected)
1535                 return -EINVAL;
1536
1537         msg = &p_ptr->publ.phdr;
1538         if (importance <= TIPC_CRITICAL_IMPORTANCE)
1539                 msg_set_importance(msg, importance);
1540         msg_set_type(msg, TIPC_NAMED_MSG);
1541         msg_set_orignode(msg, orig->node);
1542         msg_set_origport(msg, orig->ref);
1543         msg_set_nametype(msg, name->type);
1544         msg_set_nameinst(msg, name->instance);
1545         msg_set_lookup_scope(msg, addr_scope(domain));
1546         msg_set_hdr_sz(msg, LONG_H_SIZE);
1547         msg_set_size(msg, LONG_H_SIZE + dsz);
1548         destport = nametbl_translate(name->type, name->instance, &destnode);
1549         msg_set_destnode(msg, destnode);
1550         msg_set_destport(msg, destport);
1551         msg_dbg(msg, "forw2name ==> ");
1552         if (skb_cow(buf, LONG_H_SIZE))
1553                 return -ENOMEM;
1554         skb_push(buf, LONG_H_SIZE);
1555         memcpy(buf->data, (unchar *)msg, LONG_H_SIZE);
1556         msg_dbg(buf_msg(buf),"PREP:");
1557         if (likely(destport || destnode)) {
1558                 p_ptr->sent++;
1559                 if (destnode == tipc_own_addr)
1560                         return port_recv_msg(buf);
1561                 res = tipc_send_buf_fast(buf, destnode);
1562                 if (likely(res != -ELINKCONG))
1563                         return res;
1564                 if (port_unreliable(p_ptr))
1565                         return dsz;
1566                 return -ELINKCONG;
1567         }
1568         return tipc_reject_msg(buf, TIPC_ERR_NO_NAME);
1569 }
1570
1571 /** 
1572  * tipc_send_buf2name - send message buffer to port name
1573  */
1574
1575 int tipc_send_buf2name(u32 ref, 
1576                        struct tipc_name const *dest, 
1577                        u32 domain,
1578                        struct sk_buff *buf, 
1579                        unsigned int dsz)
1580 {
1581         struct tipc_portid orig;
1582
1583         orig.ref = ref;
1584         orig.node = tipc_own_addr;
1585         return tipc_forward_buf2name(ref, dest, domain, buf, dsz, &orig,
1586                                      TIPC_PORT_IMPORTANCE);
1587 }
1588
1589 /** 
1590  * tipc_forward2port - forward message sections to port identity
1591  */
1592
1593 int tipc_forward2port(u32 ref,
1594                       struct tipc_portid const *dest,
1595                       unsigned int num_sect, 
1596                       struct iovec const *msg_sect,
1597                       struct tipc_portid const *orig, 
1598                       unsigned int importance)
1599 {
1600         struct port *p_ptr;
1601         struct tipc_msg *msg;
1602         int res;
1603
1604         p_ptr = port_deref(ref);
1605         if (!p_ptr || p_ptr->publ.connected)
1606                 return -EINVAL;
1607
1608         msg = &p_ptr->publ.phdr;
1609         msg_set_type(msg, TIPC_DIRECT_MSG);
1610         msg_set_orignode(msg, orig->node);
1611         msg_set_origport(msg, orig->ref);
1612         msg_set_destnode(msg, dest->node);
1613         msg_set_destport(msg, dest->ref);
1614         msg_set_hdr_sz(msg, DIR_MSG_H_SIZE);
1615         if (importance <= TIPC_CRITICAL_IMPORTANCE)
1616                 msg_set_importance(msg, importance);
1617         p_ptr->sent++;
1618         if (dest->node == tipc_own_addr)
1619                 return port_recv_sections(p_ptr, num_sect, msg_sect);
1620         res = link_send_sections_fast(p_ptr, msg_sect, num_sect, dest->node);
1621         if (likely(res != -ELINKCONG))
1622                 return res;
1623         if (port_unreliable(p_ptr)) {
1624                 /* Just calculate msg length and return */
1625                 return msg_calc_data_size(msg_sect, num_sect);
1626         }
1627         return -ELINKCONG;
1628 }
1629
1630 /** 
1631  * tipc_send2port - send message sections to port identity 
1632  */
1633
1634 int tipc_send2port(u32 ref, 
1635                    struct tipc_portid const *dest,
1636                    unsigned int num_sect, 
1637                    struct iovec const *msg_sect)
1638 {
1639         struct tipc_portid orig;
1640
1641         orig.ref = ref;
1642         orig.node = tipc_own_addr;
1643         return tipc_forward2port(ref, dest, num_sect, msg_sect, &orig, 
1644                                  TIPC_PORT_IMPORTANCE);
1645 }
1646
1647 /** 
1648  * tipc_forward_buf2port - forward message buffer to port identity
1649  */
1650 int tipc_forward_buf2port(u32 ref,
1651                           struct tipc_portid const *dest,
1652                           struct sk_buff *buf,
1653                           unsigned int dsz,
1654                           struct tipc_portid const *orig,
1655                           unsigned int importance)
1656 {
1657         struct port *p_ptr;
1658         struct tipc_msg *msg;
1659         int res;
1660
1661         p_ptr = (struct port *)ref_deref(ref);
1662         if (!p_ptr || p_ptr->publ.connected)
1663                 return -EINVAL;
1664
1665         msg = &p_ptr->publ.phdr;
1666         msg_set_type(msg, TIPC_DIRECT_MSG);
1667         msg_set_orignode(msg, orig->node);
1668         msg_set_origport(msg, orig->ref);
1669         msg_set_destnode(msg, dest->node);
1670         msg_set_destport(msg, dest->ref);
1671         msg_set_hdr_sz(msg, DIR_MSG_H_SIZE);
1672         if (importance <= TIPC_CRITICAL_IMPORTANCE)
1673                 msg_set_importance(msg, importance);
1674         msg_set_size(msg, DIR_MSG_H_SIZE + dsz);
1675         if (skb_cow(buf, DIR_MSG_H_SIZE))
1676                 return -ENOMEM;
1677
1678         skb_push(buf, DIR_MSG_H_SIZE);
1679         memcpy(buf->data, (unchar *)msg, DIR_MSG_H_SIZE);
1680         msg_dbg(msg, "buf2port: ");
1681         p_ptr->sent++;
1682         if (dest->node == tipc_own_addr)
1683                 return port_recv_msg(buf);
1684         res = tipc_send_buf_fast(buf, dest->node);
1685         if (likely(res != -ELINKCONG))
1686                 return res;
1687         if (port_unreliable(p_ptr))
1688                 return dsz;
1689         return -ELINKCONG;
1690 }
1691
1692 /** 
1693  * tipc_send_buf2port - send message buffer to port identity
1694  */
1695
1696 int tipc_send_buf2port(u32 ref, 
1697                        struct tipc_portid const *dest,
1698                        struct sk_buff *buf, 
1699                        unsigned int dsz)
1700 {
1701         struct tipc_portid orig;
1702
1703         orig.ref = ref;
1704         orig.node = tipc_own_addr;
1705         return tipc_forward_buf2port(ref, dest, buf, dsz, &orig, 
1706                                      TIPC_PORT_IMPORTANCE);
1707 }
1708