c23f3380d0a781dd49e855350fa8d4505c386c2f
[linux-2.6.git] / net / tipc / port.c
1 /*
2  * net/tipc/port.c: TIPC port code
3  *
4  * Copyright (c) 1992-2007, Ericsson AB
5  * Copyright (c) 2004-2008, Wind River Systems
6  * All rights reserved.
7  *
8  * Redistribution and use in source and binary forms, with or without
9  * modification, are permitted provided that the following conditions are met:
10  *
11  * 1. Redistributions of source code must retain the above copyright
12  *    notice, this list of conditions and the following disclaimer.
13  * 2. Redistributions in binary form must reproduce the above copyright
14  *    notice, this list of conditions and the following disclaimer in the
15  *    documentation and/or other materials provided with the distribution.
16  * 3. Neither the names of the copyright holders nor the names of its
17  *    contributors may be used to endorse or promote products derived from
18  *    this software without specific prior written permission.
19  *
20  * Alternatively, this software may be distributed under the terms of the
21  * GNU General Public License ("GPL") version 2 as published by the Free
22  * Software Foundation.
23  *
24  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
25  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
26  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
27  * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
28  * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
29  * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
30  * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
31  * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
32  * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
33  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
34  * POSSIBILITY OF SUCH DAMAGE.
35  */
36
37 #include "core.h"
38 #include "config.h"
39 #include "port.h"
40 #include "name_table.h"
41
42 /* Connection management: */
43 #define PROBING_INTERVAL 3600000        /* [ms] => 1 h */
44 #define CONFIRMED 0
45 #define PROBING 1
46
47 #define MAX_REJECT_SIZE 1024
48
49 static struct sk_buff *msg_queue_head;
50 static struct sk_buff *msg_queue_tail;
51
52 DEFINE_SPINLOCK(tipc_port_list_lock);
53 static DEFINE_SPINLOCK(queue_lock);
54
55 static LIST_HEAD(ports);
56 static void port_handle_node_down(unsigned long ref);
57 static struct sk_buff *port_build_self_abort_msg(struct port *, u32 err);
58 static struct sk_buff *port_build_peer_abort_msg(struct port *, u32 err);
59 static void port_timeout(unsigned long ref);
60
61
62 static u32 port_peernode(struct port *p_ptr)
63 {
64         return msg_destnode(&p_ptr->publ.phdr);
65 }
66
67 static u32 port_peerport(struct port *p_ptr)
68 {
69         return msg_destport(&p_ptr->publ.phdr);
70 }
71
72 static u32 port_out_seqno(struct port *p_ptr)
73 {
74         return msg_transp_seqno(&p_ptr->publ.phdr);
75 }
76
77 static void port_incr_out_seqno(struct port *p_ptr)
78 {
79         struct tipc_msg *m = &p_ptr->publ.phdr;
80
81         if (likely(!msg_routed(m)))
82                 return;
83         msg_set_transp_seqno(m, (msg_transp_seqno(m) + 1));
84 }
85
86 /**
87  * tipc_multicast - send a multicast message to local and remote destinations
88  */
89
90 int tipc_multicast(u32 ref, struct tipc_name_seq const *seq,
91                    u32 num_sect, struct iovec const *msg_sect)
92 {
93         struct tipc_msg *hdr;
94         struct sk_buff *buf;
95         struct sk_buff *ibuf = NULL;
96         struct port_list dports = {0, NULL, };
97         struct port *oport = tipc_port_deref(ref);
98         int ext_targets;
99         int res;
100
101         if (unlikely(!oport))
102                 return -EINVAL;
103
104         /* Create multicast message */
105
106         hdr = &oport->publ.phdr;
107         msg_set_type(hdr, TIPC_MCAST_MSG);
108         msg_set_nametype(hdr, seq->type);
109         msg_set_namelower(hdr, seq->lower);
110         msg_set_nameupper(hdr, seq->upper);
111         msg_set_hdr_sz(hdr, MCAST_H_SIZE);
112         res = tipc_msg_build(hdr, msg_sect, num_sect, MAX_MSG_SIZE,
113                         !oport->user_port, &buf);
114         if (unlikely(!buf))
115                 return res;
116
117         /* Figure out where to send multicast message */
118
119         ext_targets = tipc_nametbl_mc_translate(seq->type, seq->lower, seq->upper,
120                                                 TIPC_NODE_SCOPE, &dports);
121
122         /* Send message to destinations (duplicate it only if necessary) */
123
124         if (ext_targets) {
125                 if (dports.count != 0) {
126                         ibuf = skb_copy(buf, GFP_ATOMIC);
127                         if (ibuf == NULL) {
128                                 tipc_port_list_free(&dports);
129                                 buf_discard(buf);
130                                 return -ENOMEM;
131                         }
132                 }
133                 res = tipc_bclink_send_msg(buf);
134                 if ((res < 0) && (dports.count != 0)) {
135                         buf_discard(ibuf);
136                 }
137         } else {
138                 ibuf = buf;
139         }
140
141         if (res >= 0) {
142                 if (ibuf)
143                         tipc_port_recv_mcast(ibuf, &dports);
144         } else {
145                 tipc_port_list_free(&dports);
146         }
147         return res;
148 }
149
150 /**
151  * tipc_port_recv_mcast - deliver multicast message to all destination ports
152  *
153  * If there is no port list, perform a lookup to create one
154  */
155
156 void tipc_port_recv_mcast(struct sk_buff *buf, struct port_list *dp)
157 {
158         struct tipc_msg *msg;
159         struct port_list dports = {0, NULL, };
160         struct port_list *item = dp;
161         int cnt = 0;
162
163         msg = buf_msg(buf);
164
165         /* Create destination port list, if one wasn't supplied */
166
167         if (dp == NULL) {
168                 tipc_nametbl_mc_translate(msg_nametype(msg),
169                                      msg_namelower(msg),
170                                      msg_nameupper(msg),
171                                      TIPC_CLUSTER_SCOPE,
172                                      &dports);
173                 item = dp = &dports;
174         }
175
176         /* Deliver a copy of message to each destination port */
177
178         if (dp->count != 0) {
179                 if (dp->count == 1) {
180                         msg_set_destport(msg, dp->ports[0]);
181                         tipc_port_recv_msg(buf);
182                         tipc_port_list_free(dp);
183                         return;
184                 }
185                 for (; cnt < dp->count; cnt++) {
186                         int index = cnt % PLSIZE;
187                         struct sk_buff *b = skb_clone(buf, GFP_ATOMIC);
188
189                         if (b == NULL) {
190                                 warn("Unable to deliver multicast message(s)\n");
191                                 goto exit;
192                         }
193                         if ((index == 0) && (cnt != 0)) {
194                                 item = item->next;
195                         }
196                         msg_set_destport(buf_msg(b), item->ports[index]);
197                         tipc_port_recv_msg(b);
198                 }
199         }
200 exit:
201         buf_discard(buf);
202         tipc_port_list_free(dp);
203 }
204
205 /**
206  * tipc_createport_raw - create a generic TIPC port
207  *
208  * Returns pointer to (locked) TIPC port, or NULL if unable to create it
209  */
210
211 struct tipc_port *tipc_createport_raw(void *usr_handle,
212                         u32 (*dispatcher)(struct tipc_port *, struct sk_buff *),
213                         void (*wakeup)(struct tipc_port *),
214                         const u32 importance)
215 {
216         struct port *p_ptr;
217         struct tipc_msg *msg;
218         u32 ref;
219
220         p_ptr = kzalloc(sizeof(*p_ptr), GFP_ATOMIC);
221         if (!p_ptr) {
222                 warn("Port creation failed, no memory\n");
223                 return NULL;
224         }
225         ref = tipc_ref_acquire(p_ptr, &p_ptr->publ.lock);
226         if (!ref) {
227                 warn("Port creation failed, reference table exhausted\n");
228                 kfree(p_ptr);
229                 return NULL;
230         }
231
232         p_ptr->publ.usr_handle = usr_handle;
233         p_ptr->publ.max_pkt = MAX_PKT_DEFAULT;
234         p_ptr->publ.ref = ref;
235         msg = &p_ptr->publ.phdr;
236         tipc_msg_init(msg, importance, TIPC_NAMED_MSG, LONG_H_SIZE, 0);
237         msg_set_origport(msg, ref);
238         p_ptr->last_in_seqno = 41;
239         p_ptr->sent = 1;
240         INIT_LIST_HEAD(&p_ptr->wait_list);
241         INIT_LIST_HEAD(&p_ptr->subscription.nodesub_list);
242         p_ptr->dispatcher = dispatcher;
243         p_ptr->wakeup = wakeup;
244         p_ptr->user_port = NULL;
245         k_init_timer(&p_ptr->timer, (Handler)port_timeout, ref);
246         spin_lock_bh(&tipc_port_list_lock);
247         INIT_LIST_HEAD(&p_ptr->publications);
248         INIT_LIST_HEAD(&p_ptr->port_list);
249         list_add_tail(&p_ptr->port_list, &ports);
250         spin_unlock_bh(&tipc_port_list_lock);
251         return &(p_ptr->publ);
252 }
253
254 int tipc_deleteport(u32 ref)
255 {
256         struct port *p_ptr;
257         struct sk_buff *buf = NULL;
258
259         tipc_withdraw(ref, 0, NULL);
260         p_ptr = tipc_port_lock(ref);
261         if (!p_ptr)
262                 return -EINVAL;
263
264         tipc_ref_discard(ref);
265         tipc_port_unlock(p_ptr);
266
267         k_cancel_timer(&p_ptr->timer);
268         if (p_ptr->publ.connected) {
269                 buf = port_build_peer_abort_msg(p_ptr, TIPC_ERR_NO_PORT);
270                 tipc_nodesub_unsubscribe(&p_ptr->subscription);
271         }
272         kfree(p_ptr->user_port);
273
274         spin_lock_bh(&tipc_port_list_lock);
275         list_del(&p_ptr->port_list);
276         list_del(&p_ptr->wait_list);
277         spin_unlock_bh(&tipc_port_list_lock);
278         k_term_timer(&p_ptr->timer);
279         kfree(p_ptr);
280         tipc_net_route_msg(buf);
281         return 0;
282 }
283
284 static int port_unreliable(struct port *p_ptr)
285 {
286         return msg_src_droppable(&p_ptr->publ.phdr);
287 }
288
289 int tipc_portunreliable(u32 ref, unsigned int *isunreliable)
290 {
291         struct port *p_ptr;
292
293         p_ptr = tipc_port_lock(ref);
294         if (!p_ptr)
295                 return -EINVAL;
296         *isunreliable = port_unreliable(p_ptr);
297         tipc_port_unlock(p_ptr);
298         return 0;
299 }
300
301 int tipc_set_portunreliable(u32 ref, unsigned int isunreliable)
302 {
303         struct port *p_ptr;
304
305         p_ptr = tipc_port_lock(ref);
306         if (!p_ptr)
307                 return -EINVAL;
308         msg_set_src_droppable(&p_ptr->publ.phdr, (isunreliable != 0));
309         tipc_port_unlock(p_ptr);
310         return 0;
311 }
312
313 static int port_unreturnable(struct port *p_ptr)
314 {
315         return msg_dest_droppable(&p_ptr->publ.phdr);
316 }
317
318 int tipc_portunreturnable(u32 ref, unsigned int *isunrejectable)
319 {
320         struct port *p_ptr;
321
322         p_ptr = tipc_port_lock(ref);
323         if (!p_ptr)
324                 return -EINVAL;
325         *isunrejectable = port_unreturnable(p_ptr);
326         tipc_port_unlock(p_ptr);
327         return 0;
328 }
329
330 int tipc_set_portunreturnable(u32 ref, unsigned int isunrejectable)
331 {
332         struct port *p_ptr;
333
334         p_ptr = tipc_port_lock(ref);
335         if (!p_ptr)
336                 return -EINVAL;
337         msg_set_dest_droppable(&p_ptr->publ.phdr, (isunrejectable != 0));
338         tipc_port_unlock(p_ptr);
339         return 0;
340 }
341
342 /*
343  * port_build_proto_msg(): build a port level protocol
344  * or a connection abortion message. Called with
345  * tipc_port lock on.
346  */
347 static struct sk_buff *port_build_proto_msg(u32 destport, u32 destnode,
348                                             u32 origport, u32 orignode,
349                                             u32 usr, u32 type, u32 err,
350                                             u32 seqno, u32 ack)
351 {
352         struct sk_buff *buf;
353         struct tipc_msg *msg;
354
355         buf = tipc_buf_acquire(LONG_H_SIZE);
356         if (buf) {
357                 msg = buf_msg(buf);
358                 tipc_msg_init(msg, usr, type, LONG_H_SIZE, destnode);
359                 msg_set_errcode(msg, err);
360                 msg_set_destport(msg, destport);
361                 msg_set_origport(msg, origport);
362                 msg_set_orignode(msg, orignode);
363                 msg_set_transp_seqno(msg, seqno);
364                 msg_set_msgcnt(msg, ack);
365         }
366         return buf;
367 }
368
369 int tipc_reject_msg(struct sk_buff *buf, u32 err)
370 {
371         struct tipc_msg *msg = buf_msg(buf);
372         struct sk_buff *rbuf;
373         struct tipc_msg *rmsg;
374         int hdr_sz;
375         u32 imp = msg_importance(msg);
376         u32 data_sz = msg_data_sz(msg);
377
378         if (data_sz > MAX_REJECT_SIZE)
379                 data_sz = MAX_REJECT_SIZE;
380         if (msg_connected(msg) && (imp < TIPC_CRITICAL_IMPORTANCE))
381                 imp++;
382
383         /* discard rejected message if it shouldn't be returned to sender */
384         if (msg_errcode(msg) || msg_dest_droppable(msg)) {
385                 buf_discard(buf);
386                 return data_sz;
387         }
388
389         /* construct rejected message */
390         if (msg_mcast(msg))
391                 hdr_sz = MCAST_H_SIZE;
392         else
393                 hdr_sz = LONG_H_SIZE;
394         rbuf = tipc_buf_acquire(data_sz + hdr_sz);
395         if (rbuf == NULL) {
396                 buf_discard(buf);
397                 return data_sz;
398         }
399         rmsg = buf_msg(rbuf);
400         tipc_msg_init(rmsg, imp, msg_type(msg), hdr_sz, msg_orignode(msg));
401         msg_set_errcode(rmsg, err);
402         msg_set_destport(rmsg, msg_origport(msg));
403         msg_set_origport(rmsg, msg_destport(msg));
404         if (msg_short(msg)) {
405                 msg_set_orignode(rmsg, tipc_own_addr);
406                 /* leave name type & instance as zeroes */
407         } else {
408                 msg_set_orignode(rmsg, msg_destnode(msg));
409                 msg_set_nametype(rmsg, msg_nametype(msg));
410                 msg_set_nameinst(rmsg, msg_nameinst(msg));
411         }
412         msg_set_size(rmsg, data_sz + hdr_sz);
413         skb_copy_to_linear_data_offset(rbuf, hdr_sz, msg_data(msg), data_sz);
414
415         /* send self-abort message when rejecting on a connected port */
416         if (msg_connected(msg)) {
417                 struct sk_buff *abuf = NULL;
418                 struct port *p_ptr = tipc_port_lock(msg_destport(msg));
419
420                 if (p_ptr) {
421                         if (p_ptr->publ.connected)
422                                 abuf = port_build_self_abort_msg(p_ptr, err);
423                         tipc_port_unlock(p_ptr);
424                 }
425                 tipc_net_route_msg(abuf);
426         }
427
428         /* send rejected message */
429         buf_discard(buf);
430         tipc_net_route_msg(rbuf);
431         return data_sz;
432 }
433
434 int tipc_port_reject_sections(struct port *p_ptr, struct tipc_msg *hdr,
435                               struct iovec const *msg_sect, u32 num_sect,
436                               int err)
437 {
438         struct sk_buff *buf;
439         int res;
440
441         res = tipc_msg_build(hdr, msg_sect, num_sect, MAX_MSG_SIZE,
442                         !p_ptr->user_port, &buf);
443         if (!buf)
444                 return res;
445
446         return tipc_reject_msg(buf, err);
447 }
448
449 static void port_timeout(unsigned long ref)
450 {
451         struct port *p_ptr = tipc_port_lock(ref);
452         struct sk_buff *buf = NULL;
453
454         if (!p_ptr)
455                 return;
456
457         if (!p_ptr->publ.connected) {
458                 tipc_port_unlock(p_ptr);
459                 return;
460         }
461
462         /* Last probe answered ? */
463         if (p_ptr->probing_state == PROBING) {
464                 buf = port_build_self_abort_msg(p_ptr, TIPC_ERR_NO_PORT);
465         } else {
466                 buf = port_build_proto_msg(port_peerport(p_ptr),
467                                            port_peernode(p_ptr),
468                                            p_ptr->publ.ref,
469                                            tipc_own_addr,
470                                            CONN_MANAGER,
471                                            CONN_PROBE,
472                                            TIPC_OK,
473                                            port_out_seqno(p_ptr),
474                                            0);
475                 port_incr_out_seqno(p_ptr);
476                 p_ptr->probing_state = PROBING;
477                 k_start_timer(&p_ptr->timer, p_ptr->probing_interval);
478         }
479         tipc_port_unlock(p_ptr);
480         tipc_net_route_msg(buf);
481 }
482
483
484 static void port_handle_node_down(unsigned long ref)
485 {
486         struct port *p_ptr = tipc_port_lock(ref);
487         struct sk_buff *buf = NULL;
488
489         if (!p_ptr)
490                 return;
491         buf = port_build_self_abort_msg(p_ptr, TIPC_ERR_NO_NODE);
492         tipc_port_unlock(p_ptr);
493         tipc_net_route_msg(buf);
494 }
495
496
497 static struct sk_buff *port_build_self_abort_msg(struct port *p_ptr, u32 err)
498 {
499         u32 imp = msg_importance(&p_ptr->publ.phdr);
500
501         if (!p_ptr->publ.connected)
502                 return NULL;
503         if (imp < TIPC_CRITICAL_IMPORTANCE)
504                 imp++;
505         return port_build_proto_msg(p_ptr->publ.ref,
506                                     tipc_own_addr,
507                                     port_peerport(p_ptr),
508                                     port_peernode(p_ptr),
509                                     imp,
510                                     TIPC_CONN_MSG,
511                                     err,
512                                     p_ptr->last_in_seqno + 1,
513                                     0);
514 }
515
516
517 static struct sk_buff *port_build_peer_abort_msg(struct port *p_ptr, u32 err)
518 {
519         u32 imp = msg_importance(&p_ptr->publ.phdr);
520
521         if (!p_ptr->publ.connected)
522                 return NULL;
523         if (imp < TIPC_CRITICAL_IMPORTANCE)
524                 imp++;
525         return port_build_proto_msg(port_peerport(p_ptr),
526                                     port_peernode(p_ptr),
527                                     p_ptr->publ.ref,
528                                     tipc_own_addr,
529                                     imp,
530                                     TIPC_CONN_MSG,
531                                     err,
532                                     port_out_seqno(p_ptr),
533                                     0);
534 }
535
536 void tipc_port_recv_proto_msg(struct sk_buff *buf)
537 {
538         struct tipc_msg *msg = buf_msg(buf);
539         struct port *p_ptr = tipc_port_lock(msg_destport(msg));
540         u32 err = TIPC_OK;
541         struct sk_buff *r_buf = NULL;
542         struct sk_buff *abort_buf = NULL;
543
544         if (!p_ptr) {
545                 err = TIPC_ERR_NO_PORT;
546         } else if (p_ptr->publ.connected) {
547                 if ((port_peernode(p_ptr) != msg_orignode(msg)) ||
548                     (port_peerport(p_ptr) != msg_origport(msg))) {
549                         err = TIPC_ERR_NO_PORT;
550                 } else if (msg_type(msg) == CONN_ACK) {
551                         int wakeup = tipc_port_congested(p_ptr) &&
552                                      p_ptr->publ.congested &&
553                                      p_ptr->wakeup;
554                         p_ptr->acked += msg_msgcnt(msg);
555                         if (tipc_port_congested(p_ptr))
556                                 goto exit;
557                         p_ptr->publ.congested = 0;
558                         if (!wakeup)
559                                 goto exit;
560                         p_ptr->wakeup(&p_ptr->publ);
561                         goto exit;
562                 }
563         } else if (p_ptr->publ.published) {
564                 err = TIPC_ERR_NO_PORT;
565         }
566         if (err) {
567                 r_buf = port_build_proto_msg(msg_origport(msg),
568                                              msg_orignode(msg),
569                                              msg_destport(msg),
570                                              tipc_own_addr,
571                                              TIPC_HIGH_IMPORTANCE,
572                                              TIPC_CONN_MSG,
573                                              err,
574                                              0,
575                                              0);
576                 goto exit;
577         }
578
579         /* All is fine */
580         if (msg_type(msg) == CONN_PROBE) {
581                 r_buf = port_build_proto_msg(msg_origport(msg),
582                                              msg_orignode(msg),
583                                              msg_destport(msg),
584                                              tipc_own_addr,
585                                              CONN_MANAGER,
586                                              CONN_PROBE_REPLY,
587                                              TIPC_OK,
588                                              port_out_seqno(p_ptr),
589                                              0);
590         }
591         p_ptr->probing_state = CONFIRMED;
592         port_incr_out_seqno(p_ptr);
593 exit:
594         if (p_ptr)
595                 tipc_port_unlock(p_ptr);
596         tipc_net_route_msg(r_buf);
597         tipc_net_route_msg(abort_buf);
598         buf_discard(buf);
599 }
600
601 static void port_print(struct port *p_ptr, struct print_buf *buf, int full_id)
602 {
603         struct publication *publ;
604
605         if (full_id)
606                 tipc_printf(buf, "<%u.%u.%u:%u>:",
607                             tipc_zone(tipc_own_addr), tipc_cluster(tipc_own_addr),
608                             tipc_node(tipc_own_addr), p_ptr->publ.ref);
609         else
610                 tipc_printf(buf, "%-10u:", p_ptr->publ.ref);
611
612         if (p_ptr->publ.connected) {
613                 u32 dport = port_peerport(p_ptr);
614                 u32 destnode = port_peernode(p_ptr);
615
616                 tipc_printf(buf, " connected to <%u.%u.%u:%u>",
617                             tipc_zone(destnode), tipc_cluster(destnode),
618                             tipc_node(destnode), dport);
619                 if (p_ptr->publ.conn_type != 0)
620                         tipc_printf(buf, " via {%u,%u}",
621                                     p_ptr->publ.conn_type,
622                                     p_ptr->publ.conn_instance);
623         } else if (p_ptr->publ.published) {
624                 tipc_printf(buf, " bound to");
625                 list_for_each_entry(publ, &p_ptr->publications, pport_list) {
626                         if (publ->lower == publ->upper)
627                                 tipc_printf(buf, " {%u,%u}", publ->type,
628                                             publ->lower);
629                         else
630                                 tipc_printf(buf, " {%u,%u,%u}", publ->type,
631                                             publ->lower, publ->upper);
632                 }
633         }
634         tipc_printf(buf, "\n");
635 }
636
637 #define MAX_PORT_QUERY 32768
638
639 struct sk_buff *tipc_port_get_ports(void)
640 {
641         struct sk_buff *buf;
642         struct tlv_desc *rep_tlv;
643         struct print_buf pb;
644         struct port *p_ptr;
645         int str_len;
646
647         buf = tipc_cfg_reply_alloc(TLV_SPACE(MAX_PORT_QUERY));
648         if (!buf)
649                 return NULL;
650         rep_tlv = (struct tlv_desc *)buf->data;
651
652         tipc_printbuf_init(&pb, TLV_DATA(rep_tlv), MAX_PORT_QUERY);
653         spin_lock_bh(&tipc_port_list_lock);
654         list_for_each_entry(p_ptr, &ports, port_list) {
655                 spin_lock_bh(p_ptr->publ.lock);
656                 port_print(p_ptr, &pb, 0);
657                 spin_unlock_bh(p_ptr->publ.lock);
658         }
659         spin_unlock_bh(&tipc_port_list_lock);
660         str_len = tipc_printbuf_validate(&pb);
661
662         skb_put(buf, TLV_SPACE(str_len));
663         TLV_SET(rep_tlv, TIPC_TLV_ULTRA_STRING, NULL, str_len);
664
665         return buf;
666 }
667
668 void tipc_port_reinit(void)
669 {
670         struct port *p_ptr;
671         struct tipc_msg *msg;
672
673         spin_lock_bh(&tipc_port_list_lock);
674         list_for_each_entry(p_ptr, &ports, port_list) {
675                 msg = &p_ptr->publ.phdr;
676                 if (msg_orignode(msg) == tipc_own_addr)
677                         break;
678                 msg_set_prevnode(msg, tipc_own_addr);
679                 msg_set_orignode(msg, tipc_own_addr);
680         }
681         spin_unlock_bh(&tipc_port_list_lock);
682 }
683
684
685 /*
686  *  port_dispatcher_sigh(): Signal handler for messages destinated
687  *                          to the tipc_port interface.
688  */
689
690 static void port_dispatcher_sigh(void *dummy)
691 {
692         struct sk_buff *buf;
693
694         spin_lock_bh(&queue_lock);
695         buf = msg_queue_head;
696         msg_queue_head = NULL;
697         spin_unlock_bh(&queue_lock);
698
699         while (buf) {
700                 struct port *p_ptr;
701                 struct user_port *up_ptr;
702                 struct tipc_portid orig;
703                 struct tipc_name_seq dseq;
704                 void *usr_handle;
705                 int connected;
706                 int published;
707                 u32 message_type;
708
709                 struct sk_buff *next = buf->next;
710                 struct tipc_msg *msg = buf_msg(buf);
711                 u32 dref = msg_destport(msg);
712
713                 message_type = msg_type(msg);
714                 if (message_type > TIPC_DIRECT_MSG)
715                         goto reject;    /* Unsupported message type */
716
717                 p_ptr = tipc_port_lock(dref);
718                 if (!p_ptr)
719                         goto reject;    /* Port deleted while msg in queue */
720
721                 orig.ref = msg_origport(msg);
722                 orig.node = msg_orignode(msg);
723                 up_ptr = p_ptr->user_port;
724                 usr_handle = up_ptr->usr_handle;
725                 connected = p_ptr->publ.connected;
726                 published = p_ptr->publ.published;
727
728                 if (unlikely(msg_errcode(msg)))
729                         goto err;
730
731                 switch (message_type) {
732
733                 case TIPC_CONN_MSG:{
734                                 tipc_conn_msg_event cb = up_ptr->conn_msg_cb;
735                                 u32 peer_port = port_peerport(p_ptr);
736                                 u32 peer_node = port_peernode(p_ptr);
737
738                                 tipc_port_unlock(p_ptr);
739                                 if (unlikely(!cb))
740                                         goto reject;
741                                 if (unlikely(!connected)) {
742                                         if (tipc_connect2port(dref, &orig))
743                                                 goto reject;
744                                 } else if ((msg_origport(msg) != peer_port) ||
745                                            (msg_orignode(msg) != peer_node))
746                                         goto reject;
747                                 if (unlikely(++p_ptr->publ.conn_unacked >=
748                                              TIPC_FLOW_CONTROL_WIN))
749                                         tipc_acknowledge(dref,
750                                                          p_ptr->publ.conn_unacked);
751                                 skb_pull(buf, msg_hdr_sz(msg));
752                                 cb(usr_handle, dref, &buf, msg_data(msg),
753                                    msg_data_sz(msg));
754                                 break;
755                         }
756                 case TIPC_DIRECT_MSG:{
757                                 tipc_msg_event cb = up_ptr->msg_cb;
758
759                                 tipc_port_unlock(p_ptr);
760                                 if (unlikely(!cb || connected))
761                                         goto reject;
762                                 skb_pull(buf, msg_hdr_sz(msg));
763                                 cb(usr_handle, dref, &buf, msg_data(msg),
764                                    msg_data_sz(msg), msg_importance(msg),
765                                    &orig);
766                                 break;
767                         }
768                 case TIPC_MCAST_MSG:
769                 case TIPC_NAMED_MSG:{
770                                 tipc_named_msg_event cb = up_ptr->named_msg_cb;
771
772                                 tipc_port_unlock(p_ptr);
773                                 if (unlikely(!cb || connected || !published))
774                                         goto reject;
775                                 dseq.type =  msg_nametype(msg);
776                                 dseq.lower = msg_nameinst(msg);
777                                 dseq.upper = (message_type == TIPC_NAMED_MSG)
778                                         ? dseq.lower : msg_nameupper(msg);
779                                 skb_pull(buf, msg_hdr_sz(msg));
780                                 cb(usr_handle, dref, &buf, msg_data(msg),
781                                    msg_data_sz(msg), msg_importance(msg),
782                                    &orig, &dseq);
783                                 break;
784                         }
785                 }
786                 if (buf)
787                         buf_discard(buf);
788                 buf = next;
789                 continue;
790 err:
791                 switch (message_type) {
792
793                 case TIPC_CONN_MSG:{
794                                 tipc_conn_shutdown_event cb =
795                                         up_ptr->conn_err_cb;
796                                 u32 peer_port = port_peerport(p_ptr);
797                                 u32 peer_node = port_peernode(p_ptr);
798
799                                 tipc_port_unlock(p_ptr);
800                                 if (!cb || !connected)
801                                         break;
802                                 if ((msg_origport(msg) != peer_port) ||
803                                     (msg_orignode(msg) != peer_node))
804                                         break;
805                                 tipc_disconnect(dref);
806                                 skb_pull(buf, msg_hdr_sz(msg));
807                                 cb(usr_handle, dref, &buf, msg_data(msg),
808                                    msg_data_sz(msg), msg_errcode(msg));
809                                 break;
810                         }
811                 case TIPC_DIRECT_MSG:{
812                                 tipc_msg_err_event cb = up_ptr->err_cb;
813
814                                 tipc_port_unlock(p_ptr);
815                                 if (!cb || connected)
816                                         break;
817                                 skb_pull(buf, msg_hdr_sz(msg));
818                                 cb(usr_handle, dref, &buf, msg_data(msg),
819                                    msg_data_sz(msg), msg_errcode(msg), &orig);
820                                 break;
821                         }
822                 case TIPC_MCAST_MSG:
823                 case TIPC_NAMED_MSG:{
824                                 tipc_named_msg_err_event cb =
825                                         up_ptr->named_err_cb;
826
827                                 tipc_port_unlock(p_ptr);
828                                 if (!cb || connected)
829                                         break;
830                                 dseq.type =  msg_nametype(msg);
831                                 dseq.lower = msg_nameinst(msg);
832                                 dseq.upper = (message_type == TIPC_NAMED_MSG)
833                                         ? dseq.lower : msg_nameupper(msg);
834                                 skb_pull(buf, msg_hdr_sz(msg));
835                                 cb(usr_handle, dref, &buf, msg_data(msg),
836                                    msg_data_sz(msg), msg_errcode(msg), &dseq);
837                                 break;
838                         }
839                 }
840                 if (buf)
841                         buf_discard(buf);
842                 buf = next;
843                 continue;
844 reject:
845                 tipc_reject_msg(buf, TIPC_ERR_NO_PORT);
846                 buf = next;
847         }
848 }
849
850 /*
851  *  port_dispatcher(): Dispatcher for messages destinated
852  *  to the tipc_port interface. Called with port locked.
853  */
854
855 static u32 port_dispatcher(struct tipc_port *dummy, struct sk_buff *buf)
856 {
857         buf->next = NULL;
858         spin_lock_bh(&queue_lock);
859         if (msg_queue_head) {
860                 msg_queue_tail->next = buf;
861                 msg_queue_tail = buf;
862         } else {
863                 msg_queue_tail = msg_queue_head = buf;
864                 tipc_k_signal((Handler)port_dispatcher_sigh, 0);
865         }
866         spin_unlock_bh(&queue_lock);
867         return 0;
868 }
869
870 /*
871  * Wake up port after congestion: Called with port locked,
872  *
873  */
874
875 static void port_wakeup_sh(unsigned long ref)
876 {
877         struct port *p_ptr;
878         struct user_port *up_ptr;
879         tipc_continue_event cb = NULL;
880         void *uh = NULL;
881
882         p_ptr = tipc_port_lock(ref);
883         if (p_ptr) {
884                 up_ptr = p_ptr->user_port;
885                 if (up_ptr) {
886                         cb = up_ptr->continue_event_cb;
887                         uh = up_ptr->usr_handle;
888                 }
889                 tipc_port_unlock(p_ptr);
890         }
891         if (cb)
892                 cb(uh, ref);
893 }
894
895
896 static void port_wakeup(struct tipc_port *p_ptr)
897 {
898         tipc_k_signal((Handler)port_wakeup_sh, p_ptr->ref);
899 }
900
901 void tipc_acknowledge(u32 ref, u32 ack)
902 {
903         struct port *p_ptr;
904         struct sk_buff *buf = NULL;
905
906         p_ptr = tipc_port_lock(ref);
907         if (!p_ptr)
908                 return;
909         if (p_ptr->publ.connected) {
910                 p_ptr->publ.conn_unacked -= ack;
911                 buf = port_build_proto_msg(port_peerport(p_ptr),
912                                            port_peernode(p_ptr),
913                                            ref,
914                                            tipc_own_addr,
915                                            CONN_MANAGER,
916                                            CONN_ACK,
917                                            TIPC_OK,
918                                            port_out_seqno(p_ptr),
919                                            ack);
920         }
921         tipc_port_unlock(p_ptr);
922         tipc_net_route_msg(buf);
923 }
924
925 /*
926  * tipc_createport(): user level call.
927  */
928
929 int tipc_createport(void *usr_handle,
930                     unsigned int importance,
931                     tipc_msg_err_event error_cb,
932                     tipc_named_msg_err_event named_error_cb,
933                     tipc_conn_shutdown_event conn_error_cb,
934                     tipc_msg_event msg_cb,
935                     tipc_named_msg_event named_msg_cb,
936                     tipc_conn_msg_event conn_msg_cb,
937                     tipc_continue_event continue_event_cb,/* May be zero */
938                     u32 *portref)
939 {
940         struct user_port *up_ptr;
941         struct port *p_ptr;
942
943         up_ptr = kmalloc(sizeof(*up_ptr), GFP_ATOMIC);
944         if (!up_ptr) {
945                 warn("Port creation failed, no memory\n");
946                 return -ENOMEM;
947         }
948         p_ptr = (struct port *)tipc_createport_raw(NULL, port_dispatcher,
949                                                    port_wakeup, importance);
950         if (!p_ptr) {
951                 kfree(up_ptr);
952                 return -ENOMEM;
953         }
954
955         p_ptr->user_port = up_ptr;
956         up_ptr->usr_handle = usr_handle;
957         up_ptr->ref = p_ptr->publ.ref;
958         up_ptr->err_cb = error_cb;
959         up_ptr->named_err_cb = named_error_cb;
960         up_ptr->conn_err_cb = conn_error_cb;
961         up_ptr->msg_cb = msg_cb;
962         up_ptr->named_msg_cb = named_msg_cb;
963         up_ptr->conn_msg_cb = conn_msg_cb;
964         up_ptr->continue_event_cb = continue_event_cb;
965         *portref = p_ptr->publ.ref;
966         tipc_port_unlock(p_ptr);
967         return 0;
968 }
969
970 int tipc_portimportance(u32 ref, unsigned int *importance)
971 {
972         struct port *p_ptr;
973
974         p_ptr = tipc_port_lock(ref);
975         if (!p_ptr)
976                 return -EINVAL;
977         *importance = (unsigned int)msg_importance(&p_ptr->publ.phdr);
978         tipc_port_unlock(p_ptr);
979         return 0;
980 }
981
982 int tipc_set_portimportance(u32 ref, unsigned int imp)
983 {
984         struct port *p_ptr;
985
986         if (imp > TIPC_CRITICAL_IMPORTANCE)
987                 return -EINVAL;
988
989         p_ptr = tipc_port_lock(ref);
990         if (!p_ptr)
991                 return -EINVAL;
992         msg_set_importance(&p_ptr->publ.phdr, (u32)imp);
993         tipc_port_unlock(p_ptr);
994         return 0;
995 }
996
997
998 int tipc_publish(u32 ref, unsigned int scope, struct tipc_name_seq const *seq)
999 {
1000         struct port *p_ptr;
1001         struct publication *publ;
1002         u32 key;
1003         int res = -EINVAL;
1004
1005         p_ptr = tipc_port_lock(ref);
1006         if (!p_ptr)
1007                 return -EINVAL;
1008
1009         if (p_ptr->publ.connected)
1010                 goto exit;
1011         if (seq->lower > seq->upper)
1012                 goto exit;
1013         if ((scope < TIPC_ZONE_SCOPE) || (scope > TIPC_NODE_SCOPE))
1014                 goto exit;
1015         key = ref + p_ptr->pub_count + 1;
1016         if (key == ref) {
1017                 res = -EADDRINUSE;
1018                 goto exit;
1019         }
1020         publ = tipc_nametbl_publish(seq->type, seq->lower, seq->upper,
1021                                     scope, p_ptr->publ.ref, key);
1022         if (publ) {
1023                 list_add(&publ->pport_list, &p_ptr->publications);
1024                 p_ptr->pub_count++;
1025                 p_ptr->publ.published = 1;
1026                 res = 0;
1027         }
1028 exit:
1029         tipc_port_unlock(p_ptr);
1030         return res;
1031 }
1032
1033 int tipc_withdraw(u32 ref, unsigned int scope, struct tipc_name_seq const *seq)
1034 {
1035         struct port *p_ptr;
1036         struct publication *publ;
1037         struct publication *tpubl;
1038         int res = -EINVAL;
1039
1040         p_ptr = tipc_port_lock(ref);
1041         if (!p_ptr)
1042                 return -EINVAL;
1043         if (!seq) {
1044                 list_for_each_entry_safe(publ, tpubl,
1045                                          &p_ptr->publications, pport_list) {
1046                         tipc_nametbl_withdraw(publ->type, publ->lower,
1047                                               publ->ref, publ->key);
1048                 }
1049                 res = 0;
1050         } else {
1051                 list_for_each_entry_safe(publ, tpubl,
1052                                          &p_ptr->publications, pport_list) {
1053                         if (publ->scope != scope)
1054                                 continue;
1055                         if (publ->type != seq->type)
1056                                 continue;
1057                         if (publ->lower != seq->lower)
1058                                 continue;
1059                         if (publ->upper != seq->upper)
1060                                 break;
1061                         tipc_nametbl_withdraw(publ->type, publ->lower,
1062                                               publ->ref, publ->key);
1063                         res = 0;
1064                         break;
1065                 }
1066         }
1067         if (list_empty(&p_ptr->publications))
1068                 p_ptr->publ.published = 0;
1069         tipc_port_unlock(p_ptr);
1070         return res;
1071 }
1072
1073 int tipc_connect2port(u32 ref, struct tipc_portid const *peer)
1074 {
1075         struct port *p_ptr;
1076         struct tipc_msg *msg;
1077         int res = -EINVAL;
1078
1079         p_ptr = tipc_port_lock(ref);
1080         if (!p_ptr)
1081                 return -EINVAL;
1082         if (p_ptr->publ.published || p_ptr->publ.connected)
1083                 goto exit;
1084         if (!peer->ref)
1085                 goto exit;
1086
1087         msg = &p_ptr->publ.phdr;
1088         msg_set_destnode(msg, peer->node);
1089         msg_set_destport(msg, peer->ref);
1090         msg_set_orignode(msg, tipc_own_addr);
1091         msg_set_origport(msg, p_ptr->publ.ref);
1092         msg_set_transp_seqno(msg, 42);
1093         msg_set_type(msg, TIPC_CONN_MSG);
1094         msg_set_hdr_sz(msg, SHORT_H_SIZE);
1095
1096         p_ptr->probing_interval = PROBING_INTERVAL;
1097         p_ptr->probing_state = CONFIRMED;
1098         p_ptr->publ.connected = 1;
1099         k_start_timer(&p_ptr->timer, p_ptr->probing_interval);
1100
1101         tipc_nodesub_subscribe(&p_ptr->subscription, peer->node,
1102                           (void *)(unsigned long)ref,
1103                           (net_ev_handler)port_handle_node_down);
1104         res = 0;
1105 exit:
1106         tipc_port_unlock(p_ptr);
1107         p_ptr->publ.max_pkt = tipc_link_get_max_pkt(peer->node, ref);
1108         return res;
1109 }
1110
1111 /**
1112  * tipc_disconnect_port - disconnect port from peer
1113  *
1114  * Port must be locked.
1115  */
1116
1117 int tipc_disconnect_port(struct tipc_port *tp_ptr)
1118 {
1119         int res;
1120
1121         if (tp_ptr->connected) {
1122                 tp_ptr->connected = 0;
1123                 /* let timer expire on it's own to avoid deadlock! */
1124                 tipc_nodesub_unsubscribe(
1125                         &((struct port *)tp_ptr)->subscription);
1126                 res = 0;
1127         } else {
1128                 res = -ENOTCONN;
1129         }
1130         return res;
1131 }
1132
1133 /*
1134  * tipc_disconnect(): Disconnect port form peer.
1135  *                    This is a node local operation.
1136  */
1137
1138 int tipc_disconnect(u32 ref)
1139 {
1140         struct port *p_ptr;
1141         int res;
1142
1143         p_ptr = tipc_port_lock(ref);
1144         if (!p_ptr)
1145                 return -EINVAL;
1146         res = tipc_disconnect_port((struct tipc_port *)p_ptr);
1147         tipc_port_unlock(p_ptr);
1148         return res;
1149 }
1150
1151 /*
1152  * tipc_shutdown(): Send a SHUTDOWN msg to peer and disconnect
1153  */
1154 int tipc_shutdown(u32 ref)
1155 {
1156         struct port *p_ptr;
1157         struct sk_buff *buf = NULL;
1158
1159         p_ptr = tipc_port_lock(ref);
1160         if (!p_ptr)
1161                 return -EINVAL;
1162
1163         if (p_ptr->publ.connected) {
1164                 u32 imp = msg_importance(&p_ptr->publ.phdr);
1165                 if (imp < TIPC_CRITICAL_IMPORTANCE)
1166                         imp++;
1167                 buf = port_build_proto_msg(port_peerport(p_ptr),
1168                                            port_peernode(p_ptr),
1169                                            ref,
1170                                            tipc_own_addr,
1171                                            imp,
1172                                            TIPC_CONN_MSG,
1173                                            TIPC_CONN_SHUTDOWN,
1174                                            port_out_seqno(p_ptr),
1175                                            0);
1176         }
1177         tipc_port_unlock(p_ptr);
1178         tipc_net_route_msg(buf);
1179         return tipc_disconnect(ref);
1180 }
1181
1182 /*
1183  *  tipc_port_recv_sections(): Concatenate and deliver sectioned
1184  *                        message for this node.
1185  */
1186
1187 static int tipc_port_recv_sections(struct port *sender, unsigned int num_sect,
1188                                    struct iovec const *msg_sect)
1189 {
1190         struct sk_buff *buf;
1191         int res;
1192
1193         res = tipc_msg_build(&sender->publ.phdr, msg_sect, num_sect,
1194                         MAX_MSG_SIZE, !sender->user_port, &buf);
1195         if (likely(buf))
1196                 tipc_port_recv_msg(buf);
1197         return res;
1198 }
1199
1200 /**
1201  * tipc_send - send message sections on connection
1202  */
1203
1204 int tipc_send(u32 ref, unsigned int num_sect, struct iovec const *msg_sect)
1205 {
1206         struct port *p_ptr;
1207         u32 destnode;
1208         int res;
1209
1210         p_ptr = tipc_port_deref(ref);
1211         if (!p_ptr || !p_ptr->publ.connected)
1212                 return -EINVAL;
1213
1214         p_ptr->publ.congested = 1;
1215         if (!tipc_port_congested(p_ptr)) {
1216                 destnode = port_peernode(p_ptr);
1217                 if (likely(destnode != tipc_own_addr))
1218                         res = tipc_link_send_sections_fast(p_ptr, msg_sect, num_sect,
1219                                                            destnode);
1220                 else
1221                         res = tipc_port_recv_sections(p_ptr, num_sect, msg_sect);
1222
1223                 if (likely(res != -ELINKCONG)) {
1224                         port_incr_out_seqno(p_ptr);
1225                         p_ptr->publ.congested = 0;
1226                         p_ptr->sent++;
1227                         return res;
1228                 }
1229         }
1230         if (port_unreliable(p_ptr)) {
1231                 p_ptr->publ.congested = 0;
1232                 /* Just calculate msg length and return */
1233                 return tipc_msg_calc_data_size(msg_sect, num_sect);
1234         }
1235         return -ELINKCONG;
1236 }
1237
1238 /**
1239  * tipc_send2name - send message sections to port name
1240  */
1241
1242 int tipc_send2name(u32 ref, struct tipc_name const *name, unsigned int domain,
1243            unsigned int num_sect, struct iovec const *msg_sect)
1244 {
1245         struct port *p_ptr;
1246         struct tipc_msg *msg;
1247         u32 destnode = domain;
1248         u32 destport;
1249         int res;
1250
1251         p_ptr = tipc_port_deref(ref);
1252         if (!p_ptr || p_ptr->publ.connected)
1253                 return -EINVAL;
1254
1255         msg = &p_ptr->publ.phdr;
1256         msg_set_type(msg, TIPC_NAMED_MSG);
1257         msg_set_orignode(msg, tipc_own_addr);
1258         msg_set_origport(msg, ref);
1259         msg_set_hdr_sz(msg, LONG_H_SIZE);
1260         msg_set_nametype(msg, name->type);
1261         msg_set_nameinst(msg, name->instance);
1262         msg_set_lookup_scope(msg, tipc_addr_scope(domain));
1263         destport = tipc_nametbl_translate(name->type, name->instance, &destnode);
1264         msg_set_destnode(msg, destnode);
1265         msg_set_destport(msg, destport);
1266
1267         if (likely(destport)) {
1268                 p_ptr->sent++;
1269                 if (likely(destnode == tipc_own_addr))
1270                         return tipc_port_recv_sections(p_ptr, num_sect, msg_sect);
1271                 res = tipc_link_send_sections_fast(p_ptr, msg_sect, num_sect,
1272                                                    destnode);
1273                 if (likely(res != -ELINKCONG))
1274                         return res;
1275                 if (port_unreliable(p_ptr)) {
1276                         /* Just calculate msg length and return */
1277                         return tipc_msg_calc_data_size(msg_sect, num_sect);
1278                 }
1279                 return -ELINKCONG;
1280         }
1281         return tipc_port_reject_sections(p_ptr, msg, msg_sect, num_sect,
1282                                          TIPC_ERR_NO_NAME);
1283 }
1284
1285 /**
1286  * tipc_send2port - send message sections to port identity
1287  */
1288
1289 int tipc_send2port(u32 ref, struct tipc_portid const *dest,
1290            unsigned int num_sect, struct iovec const *msg_sect)
1291 {
1292         struct port *p_ptr;
1293         struct tipc_msg *msg;
1294         int res;
1295
1296         p_ptr = tipc_port_deref(ref);
1297         if (!p_ptr || p_ptr->publ.connected)
1298                 return -EINVAL;
1299
1300         msg = &p_ptr->publ.phdr;
1301         msg_set_type(msg, TIPC_DIRECT_MSG);
1302         msg_set_orignode(msg, tipc_own_addr);
1303         msg_set_origport(msg, ref);
1304         msg_set_destnode(msg, dest->node);
1305         msg_set_destport(msg, dest->ref);
1306         msg_set_hdr_sz(msg, DIR_MSG_H_SIZE);
1307         p_ptr->sent++;
1308         if (dest->node == tipc_own_addr)
1309                 return tipc_port_recv_sections(p_ptr, num_sect, msg_sect);
1310         res = tipc_link_send_sections_fast(p_ptr, msg_sect, num_sect, dest->node);
1311         if (likely(res != -ELINKCONG))
1312                 return res;
1313         if (port_unreliable(p_ptr)) {
1314                 /* Just calculate msg length and return */
1315                 return tipc_msg_calc_data_size(msg_sect, num_sect);
1316         }
1317         return -ELINKCONG;
1318 }
1319
1320 /**
1321  * tipc_send_buf2port - send message buffer to port identity
1322  */
1323
1324 int tipc_send_buf2port(u32 ref, struct tipc_portid const *dest,
1325                struct sk_buff *buf, unsigned int dsz)
1326 {
1327         struct port *p_ptr;
1328         struct tipc_msg *msg;
1329         int res;
1330
1331         p_ptr = (struct port *)tipc_ref_deref(ref);
1332         if (!p_ptr || p_ptr->publ.connected)
1333                 return -EINVAL;
1334
1335         msg = &p_ptr->publ.phdr;
1336         msg_set_type(msg, TIPC_DIRECT_MSG);
1337         msg_set_orignode(msg, tipc_own_addr);
1338         msg_set_origport(msg, ref);
1339         msg_set_destnode(msg, dest->node);
1340         msg_set_destport(msg, dest->ref);
1341         msg_set_hdr_sz(msg, DIR_MSG_H_SIZE);
1342         msg_set_size(msg, DIR_MSG_H_SIZE + dsz);
1343         if (skb_cow(buf, DIR_MSG_H_SIZE))
1344                 return -ENOMEM;
1345
1346         skb_push(buf, DIR_MSG_H_SIZE);
1347         skb_copy_to_linear_data(buf, msg, DIR_MSG_H_SIZE);
1348         p_ptr->sent++;
1349         if (dest->node == tipc_own_addr)
1350                 return tipc_port_recv_msg(buf);
1351         res = tipc_send_buf_fast(buf, dest->node);
1352         if (likely(res != -ELINKCONG))
1353                 return res;
1354         if (port_unreliable(p_ptr))
1355                 return dsz;
1356         return -ELINKCONG;
1357 }
1358