48b16136305e64d9c9170c33c0c1e858e8369e5a
[linux-2.6.git] / drivers / misc / sgi-xp / xpc_channel.c
1 /*
2  * This file is subject to the terms and conditions of the GNU General Public
3  * License.  See the file "COPYING" in the main directory of this archive
4  * for more details.
5  *
6  * Copyright (c) 2004-2008 Silicon Graphics, Inc.  All Rights Reserved.
7  */
8
9 /*
10  * Cross Partition Communication (XPC) channel support.
11  *
12  *      This is the part of XPC that manages the channels and
13  *      sends/receives messages across them to/from other partitions.
14  *
15  */
16
17 #include <linux/kernel.h>
18 #include <linux/init.h>
19 #include <linux/sched.h>
20 #include <linux/cache.h>
21 #include <linux/interrupt.h>
22 #include <linux/mutex.h>
23 #include <linux/completion.h>
24 #include <asm/sn/sn_sal.h>
25 #include "xpc.h"
26
27 /*
28  * Guarantee that the kzalloc'd memory is cacheline aligned.
29  */
30 void *
31 xpc_kzalloc_cacheline_aligned(size_t size, gfp_t flags, void **base)
32 {
33         /* see if kzalloc will give us cachline aligned memory by default */
34         *base = kzalloc(size, flags);
35         if (*base == NULL)
36                 return NULL;
37
38         if ((u64)*base == L1_CACHE_ALIGN((u64)*base))
39                 return *base;
40
41         kfree(*base);
42
43         /* nope, we'll have to do it ourselves */
44         *base = kzalloc(size + L1_CACHE_BYTES, flags);
45         if (*base == NULL)
46                 return NULL;
47
48         return (void *)L1_CACHE_ALIGN((u64)*base);
49 }
50
51 /*
52  * Allocate the local message queue and the notify queue.
53  */
54 static enum xp_retval
55 xpc_allocate_local_msgqueue(struct xpc_channel *ch)
56 {
57         unsigned long irq_flags;
58         int nentries;
59         size_t nbytes;
60
61         for (nentries = ch->local_nentries; nentries > 0; nentries--) {
62
63                 nbytes = nentries * ch->msg_size;
64                 ch->local_msgqueue = xpc_kzalloc_cacheline_aligned(nbytes,
65                                                                    GFP_KERNEL,
66                                                       &ch->local_msgqueue_base);
67                 if (ch->local_msgqueue == NULL)
68                         continue;
69
70                 nbytes = nentries * sizeof(struct xpc_notify);
71                 ch->notify_queue = kzalloc(nbytes, GFP_KERNEL);
72                 if (ch->notify_queue == NULL) {
73                         kfree(ch->local_msgqueue_base);
74                         ch->local_msgqueue = NULL;
75                         continue;
76                 }
77
78                 spin_lock_irqsave(&ch->lock, irq_flags);
79                 if (nentries < ch->local_nentries) {
80                         dev_dbg(xpc_chan, "nentries=%d local_nentries=%d, "
81                                 "partid=%d, channel=%d\n", nentries,
82                                 ch->local_nentries, ch->partid, ch->number);
83
84                         ch->local_nentries = nentries;
85                 }
86                 spin_unlock_irqrestore(&ch->lock, irq_flags);
87                 return xpSuccess;
88         }
89
90         dev_dbg(xpc_chan, "can't get memory for local message queue and notify "
91                 "queue, partid=%d, channel=%d\n", ch->partid, ch->number);
92         return xpNoMemory;
93 }
94
95 /*
96  * Allocate the cached remote message queue.
97  */
98 static enum xp_retval
99 xpc_allocate_remote_msgqueue(struct xpc_channel *ch)
100 {
101         unsigned long irq_flags;
102         int nentries;
103         size_t nbytes;
104
105         DBUG_ON(ch->remote_nentries <= 0);
106
107         for (nentries = ch->remote_nentries; nentries > 0; nentries--) {
108
109                 nbytes = nentries * ch->msg_size;
110                 ch->remote_msgqueue = xpc_kzalloc_cacheline_aligned(nbytes,
111                                                                     GFP_KERNEL,
112                                                      &ch->remote_msgqueue_base);
113                 if (ch->remote_msgqueue == NULL)
114                         continue;
115
116                 spin_lock_irqsave(&ch->lock, irq_flags);
117                 if (nentries < ch->remote_nentries) {
118                         dev_dbg(xpc_chan, "nentries=%d remote_nentries=%d, "
119                                 "partid=%d, channel=%d\n", nentries,
120                                 ch->remote_nentries, ch->partid, ch->number);
121
122                         ch->remote_nentries = nentries;
123                 }
124                 spin_unlock_irqrestore(&ch->lock, irq_flags);
125                 return xpSuccess;
126         }
127
128         dev_dbg(xpc_chan, "can't get memory for cached remote message queue, "
129                 "partid=%d, channel=%d\n", ch->partid, ch->number);
130         return xpNoMemory;
131 }
132
133 /*
134  * Allocate message queues and other stuff associated with a channel.
135  *
136  * Note: Assumes all of the channel sizes are filled in.
137  */
138 static enum xp_retval
139 xpc_allocate_msgqueues(struct xpc_channel *ch)
140 {
141         unsigned long irq_flags;
142         enum xp_retval ret;
143
144         DBUG_ON(ch->flags & XPC_C_SETUP);
145
146         ret = xpc_allocate_local_msgqueue(ch);
147         if (ret != xpSuccess)
148                 return ret;
149
150         ret = xpc_allocate_remote_msgqueue(ch);
151         if (ret != xpSuccess) {
152                 kfree(ch->local_msgqueue_base);
153                 ch->local_msgqueue = NULL;
154                 kfree(ch->notify_queue);
155                 ch->notify_queue = NULL;
156                 return ret;
157         }
158
159         spin_lock_irqsave(&ch->lock, irq_flags);
160         ch->flags |= XPC_C_SETUP;
161         spin_unlock_irqrestore(&ch->lock, irq_flags);
162
163         return xpSuccess;
164 }
165
166 /*
167  * Process a connect message from a remote partition.
168  *
169  * Note: xpc_process_connect() is expecting to be called with the
170  * spin_lock_irqsave held and will leave it locked upon return.
171  */
172 static void
173 xpc_process_connect(struct xpc_channel *ch, unsigned long *irq_flags)
174 {
175         enum xp_retval ret;
176
177         DBUG_ON(!spin_is_locked(&ch->lock));
178
179         if (!(ch->flags & XPC_C_OPENREQUEST) ||
180             !(ch->flags & XPC_C_ROPENREQUEST)) {
181                 /* nothing more to do for now */
182                 return;
183         }
184         DBUG_ON(!(ch->flags & XPC_C_CONNECTING));
185
186         if (!(ch->flags & XPC_C_SETUP)) {
187                 spin_unlock_irqrestore(&ch->lock, *irq_flags);
188                 ret = xpc_allocate_msgqueues(ch);
189                 spin_lock_irqsave(&ch->lock, *irq_flags);
190
191                 if (ret != xpSuccess)
192                         XPC_DISCONNECT_CHANNEL(ch, ret, irq_flags);
193
194                 if (ch->flags & (XPC_C_CONNECTED | XPC_C_DISCONNECTING))
195                         return;
196
197                 DBUG_ON(!(ch->flags & XPC_C_SETUP));
198                 DBUG_ON(ch->local_msgqueue == NULL);
199                 DBUG_ON(ch->remote_msgqueue == NULL);
200         }
201
202         if (!(ch->flags & XPC_C_OPENREPLY)) {
203                 ch->flags |= XPC_C_OPENREPLY;
204                 xpc_send_channel_openreply(ch, irq_flags);
205         }
206
207         if (!(ch->flags & XPC_C_ROPENREPLY))
208                 return;
209
210         DBUG_ON(ch->remote_msgqueue_pa == 0);
211
212         ch->flags = (XPC_C_CONNECTED | XPC_C_SETUP);    /* clear all else */
213
214         dev_info(xpc_chan, "channel %d to partition %d connected\n",
215                  ch->number, ch->partid);
216
217         spin_unlock_irqrestore(&ch->lock, *irq_flags);
218         xpc_create_kthreads(ch, 1, 0);
219         spin_lock_irqsave(&ch->lock, *irq_flags);
220 }
221
222 /*
223  * Free up message queues and other stuff that were allocated for the specified
224  * channel.
225  *
226  * Note: ch->reason and ch->reason_line are left set for debugging purposes,
227  * they're cleared when XPC_C_DISCONNECTED is cleared.
228  */
229 static void
230 xpc_free_msgqueues(struct xpc_channel *ch)
231 {
232         struct xpc_channel_sn2 *ch_sn2 = &ch->sn.sn2;
233
234         DBUG_ON(!spin_is_locked(&ch->lock));
235         DBUG_ON(atomic_read(&ch->n_to_notify) != 0);
236
237         ch->remote_msgqueue_pa = 0;
238         ch->func = NULL;
239         ch->key = NULL;
240         ch->msg_size = 0;
241         ch->local_nentries = 0;
242         ch->remote_nentries = 0;
243         ch->kthreads_assigned_limit = 0;
244         ch->kthreads_idle_limit = 0;
245
246         ch_sn2->local_GP->get = 0;
247         ch_sn2->local_GP->put = 0;
248         ch_sn2->remote_GP.get = 0;
249         ch_sn2->remote_GP.put = 0;
250         ch_sn2->w_local_GP.get = 0;
251         ch_sn2->w_local_GP.put = 0;
252         ch_sn2->w_remote_GP.get = 0;
253         ch_sn2->w_remote_GP.put = 0;
254         ch_sn2->next_msg_to_pull = 0;
255
256         if (ch->flags & XPC_C_SETUP) {
257                 ch->flags &= ~XPC_C_SETUP;
258
259                 dev_dbg(xpc_chan, "ch->flags=0x%x, partid=%d, channel=%d\n",
260                         ch->flags, ch->partid, ch->number);
261
262                 kfree(ch->local_msgqueue_base);
263                 ch->local_msgqueue = NULL;
264                 kfree(ch->remote_msgqueue_base);
265                 ch->remote_msgqueue = NULL;
266                 kfree(ch->notify_queue);
267                 ch->notify_queue = NULL;
268         }
269 }
270
271 /*
272  * spin_lock_irqsave() is expected to be held on entry.
273  */
274 static void
275 xpc_process_disconnect(struct xpc_channel *ch, unsigned long *irq_flags)
276 {
277         struct xpc_partition *part = &xpc_partitions[ch->partid];
278         u32 channel_was_connected = (ch->flags & XPC_C_WASCONNECTED);
279
280         DBUG_ON(!spin_is_locked(&ch->lock));
281
282         if (!(ch->flags & XPC_C_DISCONNECTING))
283                 return;
284
285         DBUG_ON(!(ch->flags & XPC_C_CLOSEREQUEST));
286
287         /* make sure all activity has settled down first */
288
289         if (atomic_read(&ch->kthreads_assigned) > 0 ||
290             atomic_read(&ch->references) > 0) {
291                 return;
292         }
293         DBUG_ON((ch->flags & XPC_C_CONNECTEDCALLOUT_MADE) &&
294                 !(ch->flags & XPC_C_DISCONNECTINGCALLOUT_MADE));
295
296         if (part->act_state == XPC_P_DEACTIVATING) {
297                 /* can't proceed until the other side disengages from us */
298                 if (xpc_partition_engaged(ch->partid))
299                         return;
300
301         } else {
302
303                 /* as long as the other side is up do the full protocol */
304
305                 if (!(ch->flags & XPC_C_RCLOSEREQUEST))
306                         return;
307
308                 if (!(ch->flags & XPC_C_CLOSEREPLY)) {
309                         ch->flags |= XPC_C_CLOSEREPLY;
310                         xpc_send_channel_closereply(ch, irq_flags);
311                 }
312
313                 if (!(ch->flags & XPC_C_RCLOSEREPLY))
314                         return;
315         }
316
317         /* wake those waiting for notify completion */
318         if (atomic_read(&ch->n_to_notify) > 0) {
319                 /* >>> we do callout while holding ch->lock */
320                 xpc_notify_senders_of_disconnect(ch);
321         }
322
323         /* both sides are disconnected now */
324
325         if (ch->flags & XPC_C_DISCONNECTINGCALLOUT_MADE) {
326                 spin_unlock_irqrestore(&ch->lock, *irq_flags);
327                 xpc_disconnect_callout(ch, xpDisconnected);
328                 spin_lock_irqsave(&ch->lock, *irq_flags);
329         }
330
331         /* it's now safe to free the channel's message queues */
332         xpc_free_msgqueues(ch);
333
334         /* mark disconnected, clear all other flags except XPC_C_WDISCONNECT */
335         ch->flags = (XPC_C_DISCONNECTED | (ch->flags & XPC_C_WDISCONNECT));
336
337         atomic_dec(&part->nchannels_active);
338
339         if (channel_was_connected) {
340                 dev_info(xpc_chan, "channel %d to partition %d disconnected, "
341                          "reason=%d\n", ch->number, ch->partid, ch->reason);
342         }
343
344         if (ch->flags & XPC_C_WDISCONNECT) {
345                 /* we won't lose the CPU since we're holding ch->lock */
346                 complete(&ch->wdisconnect_wait);
347         } else if (ch->delayed_IPI_flags) {
348                 if (part->act_state != XPC_P_DEACTIVATING) {
349                         /* time to take action on any delayed IPI flags */
350                         spin_lock(&part->IPI_lock);
351                         XPC_SET_IPI_FLAGS(part->local_IPI_amo, ch->number,
352                                           ch->delayed_IPI_flags);
353                         spin_unlock(&part->IPI_lock);
354                 }
355                 ch->delayed_IPI_flags = 0;
356         }
357 }
358
359 /*
360  * Process a change in the channel's remote connection state.
361  */
362 static void
363 xpc_process_openclose_IPI(struct xpc_partition *part, int ch_number,
364                           u8 IPI_flags)
365 {
366         unsigned long irq_flags;
367         struct xpc_openclose_args *args =
368             &part->remote_openclose_args[ch_number];
369         struct xpc_channel *ch = &part->channels[ch_number];
370         enum xp_retval reason;
371
372         spin_lock_irqsave(&ch->lock, irq_flags);
373
374 again:
375
376         if ((ch->flags & XPC_C_DISCONNECTED) &&
377             (ch->flags & XPC_C_WDISCONNECT)) {
378                 /*
379                  * Delay processing IPI flags until thread waiting disconnect
380                  * has had a chance to see that the channel is disconnected.
381                  */
382                 ch->delayed_IPI_flags |= IPI_flags;
383                 spin_unlock_irqrestore(&ch->lock, irq_flags);
384                 return;
385         }
386
387         if (IPI_flags & XPC_IPI_CLOSEREQUEST) {
388
389                 dev_dbg(xpc_chan, "XPC_IPI_CLOSEREQUEST (reason=%d) received "
390                         "from partid=%d, channel=%d\n", args->reason,
391                         ch->partid, ch->number);
392
393                 /*
394                  * If RCLOSEREQUEST is set, we're probably waiting for
395                  * RCLOSEREPLY. We should find it and a ROPENREQUEST packed
396                  * with this RCLOSEREQUEST in the IPI_flags.
397                  */
398
399                 if (ch->flags & XPC_C_RCLOSEREQUEST) {
400                         DBUG_ON(!(ch->flags & XPC_C_DISCONNECTING));
401                         DBUG_ON(!(ch->flags & XPC_C_CLOSEREQUEST));
402                         DBUG_ON(!(ch->flags & XPC_C_CLOSEREPLY));
403                         DBUG_ON(ch->flags & XPC_C_RCLOSEREPLY);
404
405                         DBUG_ON(!(IPI_flags & XPC_IPI_CLOSEREPLY));
406                         IPI_flags &= ~XPC_IPI_CLOSEREPLY;
407                         ch->flags |= XPC_C_RCLOSEREPLY;
408
409                         /* both sides have finished disconnecting */
410                         xpc_process_disconnect(ch, &irq_flags);
411                         DBUG_ON(!(ch->flags & XPC_C_DISCONNECTED));
412                         goto again;
413                 }
414
415                 if (ch->flags & XPC_C_DISCONNECTED) {
416                         if (!(IPI_flags & XPC_IPI_OPENREQUEST)) {
417                                 if ((XPC_GET_IPI_FLAGS(part->local_IPI_amo,
418                                                        ch_number) &
419                                      XPC_IPI_OPENREQUEST)) {
420
421                                         DBUG_ON(ch->delayed_IPI_flags != 0);
422                                         spin_lock(&part->IPI_lock);
423                                         XPC_SET_IPI_FLAGS(part->local_IPI_amo,
424                                                           ch_number,
425                                                           XPC_IPI_CLOSEREQUEST);
426                                         spin_unlock(&part->IPI_lock);
427                                 }
428                                 spin_unlock_irqrestore(&ch->lock, irq_flags);
429                                 return;
430                         }
431
432                         XPC_SET_REASON(ch, 0, 0);
433                         ch->flags &= ~XPC_C_DISCONNECTED;
434
435                         atomic_inc(&part->nchannels_active);
436                         ch->flags |= (XPC_C_CONNECTING | XPC_C_ROPENREQUEST);
437                 }
438
439                 IPI_flags &= ~(XPC_IPI_OPENREQUEST | XPC_IPI_OPENREPLY);
440
441                 /*
442                  * The meaningful CLOSEREQUEST connection state fields are:
443                  *      reason = reason connection is to be closed
444                  */
445
446                 ch->flags |= XPC_C_RCLOSEREQUEST;
447
448                 if (!(ch->flags & XPC_C_DISCONNECTING)) {
449                         reason = args->reason;
450                         if (reason <= xpSuccess || reason > xpUnknownReason)
451                                 reason = xpUnknownReason;
452                         else if (reason == xpUnregistering)
453                                 reason = xpOtherUnregistering;
454
455                         XPC_DISCONNECT_CHANNEL(ch, reason, &irq_flags);
456
457                         DBUG_ON(IPI_flags & XPC_IPI_CLOSEREPLY);
458                         spin_unlock_irqrestore(&ch->lock, irq_flags);
459                         return;
460                 }
461
462                 xpc_process_disconnect(ch, &irq_flags);
463         }
464
465         if (IPI_flags & XPC_IPI_CLOSEREPLY) {
466
467                 dev_dbg(xpc_chan, "XPC_IPI_CLOSEREPLY received from partid=%d,"
468                         " channel=%d\n", ch->partid, ch->number);
469
470                 if (ch->flags & XPC_C_DISCONNECTED) {
471                         DBUG_ON(part->act_state != XPC_P_DEACTIVATING);
472                         spin_unlock_irqrestore(&ch->lock, irq_flags);
473                         return;
474                 }
475
476                 DBUG_ON(!(ch->flags & XPC_C_CLOSEREQUEST));
477
478                 if (!(ch->flags & XPC_C_RCLOSEREQUEST)) {
479                         if ((XPC_GET_IPI_FLAGS(part->local_IPI_amo, ch_number)
480                              & XPC_IPI_CLOSEREQUEST)) {
481
482                                 DBUG_ON(ch->delayed_IPI_flags != 0);
483                                 spin_lock(&part->IPI_lock);
484                                 XPC_SET_IPI_FLAGS(part->local_IPI_amo,
485                                                   ch_number,
486                                                   XPC_IPI_CLOSEREPLY);
487                                 spin_unlock(&part->IPI_lock);
488                         }
489                         spin_unlock_irqrestore(&ch->lock, irq_flags);
490                         return;
491                 }
492
493                 ch->flags |= XPC_C_RCLOSEREPLY;
494
495                 if (ch->flags & XPC_C_CLOSEREPLY) {
496                         /* both sides have finished disconnecting */
497                         xpc_process_disconnect(ch, &irq_flags);
498                 }
499         }
500
501         if (IPI_flags & XPC_IPI_OPENREQUEST) {
502
503                 dev_dbg(xpc_chan, "XPC_IPI_OPENREQUEST (msg_size=%d, "
504                         "local_nentries=%d) received from partid=%d, "
505                         "channel=%d\n", args->msg_size, args->local_nentries,
506                         ch->partid, ch->number);
507
508                 if (part->act_state == XPC_P_DEACTIVATING ||
509                     (ch->flags & XPC_C_ROPENREQUEST)) {
510                         spin_unlock_irqrestore(&ch->lock, irq_flags);
511                         return;
512                 }
513
514                 if (ch->flags & (XPC_C_DISCONNECTING | XPC_C_WDISCONNECT)) {
515                         ch->delayed_IPI_flags |= XPC_IPI_OPENREQUEST;
516                         spin_unlock_irqrestore(&ch->lock, irq_flags);
517                         return;
518                 }
519                 DBUG_ON(!(ch->flags & (XPC_C_DISCONNECTED |
520                                        XPC_C_OPENREQUEST)));
521                 DBUG_ON(ch->flags & (XPC_C_ROPENREQUEST | XPC_C_ROPENREPLY |
522                                      XPC_C_OPENREPLY | XPC_C_CONNECTED));
523
524                 /*
525                  * The meaningful OPENREQUEST connection state fields are:
526                  *      msg_size = size of channel's messages in bytes
527                  *      local_nentries = remote partition's local_nentries
528                  */
529                 if (args->msg_size == 0 || args->local_nentries == 0) {
530                         /* assume OPENREQUEST was delayed by mistake */
531                         spin_unlock_irqrestore(&ch->lock, irq_flags);
532                         return;
533                 }
534
535                 ch->flags |= (XPC_C_ROPENREQUEST | XPC_C_CONNECTING);
536                 ch->remote_nentries = args->local_nentries;
537
538                 if (ch->flags & XPC_C_OPENREQUEST) {
539                         if (args->msg_size != ch->msg_size) {
540                                 XPC_DISCONNECT_CHANNEL(ch, xpUnequalMsgSizes,
541                                                        &irq_flags);
542                                 spin_unlock_irqrestore(&ch->lock, irq_flags);
543                                 return;
544                         }
545                 } else {
546                         ch->msg_size = args->msg_size;
547
548                         XPC_SET_REASON(ch, 0, 0);
549                         ch->flags &= ~XPC_C_DISCONNECTED;
550
551                         atomic_inc(&part->nchannels_active);
552                 }
553
554                 xpc_process_connect(ch, &irq_flags);
555         }
556
557         if (IPI_flags & XPC_IPI_OPENREPLY) {
558
559                 dev_dbg(xpc_chan, "XPC_IPI_OPENREPLY (local_msgqueue_pa=0x%lx, "
560                         "local_nentries=%d, remote_nentries=%d) received from "
561                         "partid=%d, channel=%d\n", args->local_msgqueue_pa,
562                         args->local_nentries, args->remote_nentries,
563                         ch->partid, ch->number);
564
565                 if (ch->flags & (XPC_C_DISCONNECTING | XPC_C_DISCONNECTED)) {
566                         spin_unlock_irqrestore(&ch->lock, irq_flags);
567                         return;
568                 }
569                 if (!(ch->flags & XPC_C_OPENREQUEST)) {
570                         XPC_DISCONNECT_CHANNEL(ch, xpOpenCloseError,
571                                                &irq_flags);
572                         spin_unlock_irqrestore(&ch->lock, irq_flags);
573                         return;
574                 }
575
576                 DBUG_ON(!(ch->flags & XPC_C_ROPENREQUEST));
577                 DBUG_ON(ch->flags & XPC_C_CONNECTED);
578
579                 /*
580                  * The meaningful OPENREPLY connection state fields are:
581                  *      local_msgqueue_pa = physical address of remote
582                  *                          partition's local_msgqueue
583                  *      local_nentries = remote partition's local_nentries
584                  *      remote_nentries = remote partition's remote_nentries
585                  */
586                 DBUG_ON(args->local_msgqueue_pa == 0);
587                 DBUG_ON(args->local_nentries == 0);
588                 DBUG_ON(args->remote_nentries == 0);
589
590                 ch->flags |= XPC_C_ROPENREPLY;
591                 ch->remote_msgqueue_pa = args->local_msgqueue_pa;
592
593                 if (args->local_nentries < ch->remote_nentries) {
594                         dev_dbg(xpc_chan, "XPC_IPI_OPENREPLY: new "
595                                 "remote_nentries=%d, old remote_nentries=%d, "
596                                 "partid=%d, channel=%d\n",
597                                 args->local_nentries, ch->remote_nentries,
598                                 ch->partid, ch->number);
599
600                         ch->remote_nentries = args->local_nentries;
601                 }
602                 if (args->remote_nentries < ch->local_nentries) {
603                         dev_dbg(xpc_chan, "XPC_IPI_OPENREPLY: new "
604                                 "local_nentries=%d, old local_nentries=%d, "
605                                 "partid=%d, channel=%d\n",
606                                 args->remote_nentries, ch->local_nentries,
607                                 ch->partid, ch->number);
608
609                         ch->local_nentries = args->remote_nentries;
610                 }
611
612                 xpc_process_connect(ch, &irq_flags);
613         }
614
615         spin_unlock_irqrestore(&ch->lock, irq_flags);
616 }
617
618 /*
619  * Attempt to establish a channel connection to a remote partition.
620  */
621 static enum xp_retval
622 xpc_connect_channel(struct xpc_channel *ch)
623 {
624         unsigned long irq_flags;
625         struct xpc_registration *registration = &xpc_registrations[ch->number];
626
627         if (mutex_trylock(&registration->mutex) == 0)
628                 return xpRetry;
629
630         if (!XPC_CHANNEL_REGISTERED(ch->number)) {
631                 mutex_unlock(&registration->mutex);
632                 return xpUnregistered;
633         }
634
635         spin_lock_irqsave(&ch->lock, irq_flags);
636
637         DBUG_ON(ch->flags & XPC_C_CONNECTED);
638         DBUG_ON(ch->flags & XPC_C_OPENREQUEST);
639
640         if (ch->flags & XPC_C_DISCONNECTING) {
641                 spin_unlock_irqrestore(&ch->lock, irq_flags);
642                 mutex_unlock(&registration->mutex);
643                 return ch->reason;
644         }
645
646         /* add info from the channel connect registration to the channel */
647
648         ch->kthreads_assigned_limit = registration->assigned_limit;
649         ch->kthreads_idle_limit = registration->idle_limit;
650         DBUG_ON(atomic_read(&ch->kthreads_assigned) != 0);
651         DBUG_ON(atomic_read(&ch->kthreads_idle) != 0);
652         DBUG_ON(atomic_read(&ch->kthreads_active) != 0);
653
654         ch->func = registration->func;
655         DBUG_ON(registration->func == NULL);
656         ch->key = registration->key;
657
658         ch->local_nentries = registration->nentries;
659
660         if (ch->flags & XPC_C_ROPENREQUEST) {
661                 if (registration->msg_size != ch->msg_size) {
662                         /* the local and remote sides aren't the same */
663
664                         /*
665                          * Because XPC_DISCONNECT_CHANNEL() can block we're
666                          * forced to up the registration sema before we unlock
667                          * the channel lock. But that's okay here because we're
668                          * done with the part that required the registration
669                          * sema. XPC_DISCONNECT_CHANNEL() requires that the
670                          * channel lock be locked and will unlock and relock
671                          * the channel lock as needed.
672                          */
673                         mutex_unlock(&registration->mutex);
674                         XPC_DISCONNECT_CHANNEL(ch, xpUnequalMsgSizes,
675                                                &irq_flags);
676                         spin_unlock_irqrestore(&ch->lock, irq_flags);
677                         return xpUnequalMsgSizes;
678                 }
679         } else {
680                 ch->msg_size = registration->msg_size;
681
682                 XPC_SET_REASON(ch, 0, 0);
683                 ch->flags &= ~XPC_C_DISCONNECTED;
684
685                 atomic_inc(&xpc_partitions[ch->partid].nchannels_active);
686         }
687
688         mutex_unlock(&registration->mutex);
689
690         /* initiate the connection */
691
692         ch->flags |= (XPC_C_OPENREQUEST | XPC_C_CONNECTING);
693         xpc_send_channel_openrequest(ch, &irq_flags);
694
695         xpc_process_connect(ch, &irq_flags);
696
697         spin_unlock_irqrestore(&ch->lock, irq_flags);
698
699         return xpSuccess;
700 }
701
702 void
703 xpc_process_channel_activity(struct xpc_partition *part)
704 {
705         unsigned long irq_flags;
706         u64 IPI_amo, IPI_flags;
707         struct xpc_channel *ch;
708         int ch_number;
709         u32 ch_flags;
710
711         IPI_amo = xpc_get_IPI_flags(part);
712
713         /*
714          * Initiate channel connections for registered channels.
715          *
716          * For each connected channel that has pending messages activate idle
717          * kthreads and/or create new kthreads as needed.
718          */
719
720         for (ch_number = 0; ch_number < part->nchannels; ch_number++) {
721                 ch = &part->channels[ch_number];
722
723                 /*
724                  * Process any open or close related IPI flags, and then deal
725                  * with connecting or disconnecting the channel as required.
726                  */
727
728                 IPI_flags = XPC_GET_IPI_FLAGS(IPI_amo, ch_number);
729
730                 if (XPC_ANY_OPENCLOSE_IPI_FLAGS_SET(IPI_flags))
731                         xpc_process_openclose_IPI(part, ch_number, IPI_flags);
732
733                 ch_flags = ch->flags;   /* need an atomic snapshot of flags */
734
735                 if (ch_flags & XPC_C_DISCONNECTING) {
736                         spin_lock_irqsave(&ch->lock, irq_flags);
737                         xpc_process_disconnect(ch, &irq_flags);
738                         spin_unlock_irqrestore(&ch->lock, irq_flags);
739                         continue;
740                 }
741
742                 if (part->act_state == XPC_P_DEACTIVATING)
743                         continue;
744
745                 if (!(ch_flags & XPC_C_CONNECTED)) {
746                         if (!(ch_flags & XPC_C_OPENREQUEST)) {
747                                 DBUG_ON(ch_flags & XPC_C_SETUP);
748                                 (void)xpc_connect_channel(ch);
749                         } else {
750                                 spin_lock_irqsave(&ch->lock, irq_flags);
751                                 xpc_process_connect(ch, &irq_flags);
752                                 spin_unlock_irqrestore(&ch->lock, irq_flags);
753                         }
754                         continue;
755                 }
756
757                 /*
758                  * Process any message related IPI flags, this may involve the
759                  * activation of kthreads to deliver any pending messages sent
760                  * from the other partition.
761                  */
762
763                 if (XPC_ANY_MSG_IPI_FLAGS_SET(IPI_flags))
764                         xpc_process_msg_IPI(part, ch_number);
765         }
766 }
767
768 /*
769  * XPC's heartbeat code calls this function to inform XPC that a partition is
770  * going down.  XPC responds by tearing down the XPartition Communication
771  * infrastructure used for the just downed partition.
772  *
773  * XPC's heartbeat code will never call this function and xpc_partition_up()
774  * at the same time. Nor will it ever make multiple calls to either function
775  * at the same time.
776  */
777 void
778 xpc_partition_going_down(struct xpc_partition *part, enum xp_retval reason)
779 {
780         unsigned long irq_flags;
781         int ch_number;
782         struct xpc_channel *ch;
783
784         dev_dbg(xpc_chan, "deactivating partition %d, reason=%d\n",
785                 XPC_PARTID(part), reason);
786
787         if (!xpc_part_ref(part)) {
788                 /* infrastructure for this partition isn't currently set up */
789                 return;
790         }
791
792         /* disconnect channels associated with the partition going down */
793
794         for (ch_number = 0; ch_number < part->nchannels; ch_number++) {
795                 ch = &part->channels[ch_number];
796
797                 xpc_msgqueue_ref(ch);
798                 spin_lock_irqsave(&ch->lock, irq_flags);
799
800                 XPC_DISCONNECT_CHANNEL(ch, reason, &irq_flags);
801
802                 spin_unlock_irqrestore(&ch->lock, irq_flags);
803                 xpc_msgqueue_deref(ch);
804         }
805
806         xpc_wakeup_channel_mgr(part);
807
808         xpc_part_deref(part);
809 }
810
811 /*
812  * Called by XP at the time of channel connection registration to cause
813  * XPC to establish connections to all currently active partitions.
814  */
815 void
816 xpc_initiate_connect(int ch_number)
817 {
818         short partid;
819         struct xpc_partition *part;
820         struct xpc_channel *ch;
821
822         DBUG_ON(ch_number < 0 || ch_number >= XPC_MAX_NCHANNELS);
823
824         for (partid = 0; partid < xp_max_npartitions; partid++) {
825                 part = &xpc_partitions[partid];
826
827                 if (xpc_part_ref(part)) {
828                         ch = &part->channels[ch_number];
829
830                         /*
831                          * Initiate the establishment of a connection on the
832                          * newly registered channel to the remote partition.
833                          */
834                         xpc_wakeup_channel_mgr(part);
835                         xpc_part_deref(part);
836                 }
837         }
838 }
839
840 void
841 xpc_connected_callout(struct xpc_channel *ch)
842 {
843         /* let the registerer know that a connection has been established */
844
845         if (ch->func != NULL) {
846                 dev_dbg(xpc_chan, "ch->func() called, reason=xpConnected, "
847                         "partid=%d, channel=%d\n", ch->partid, ch->number);
848
849                 ch->func(xpConnected, ch->partid, ch->number,
850                          (void *)(u64)ch->local_nentries, ch->key);
851
852                 dev_dbg(xpc_chan, "ch->func() returned, reason=xpConnected, "
853                         "partid=%d, channel=%d\n", ch->partid, ch->number);
854         }
855 }
856
857 /*
858  * Called by XP at the time of channel connection unregistration to cause
859  * XPC to teardown all current connections for the specified channel.
860  *
861  * Before returning xpc_initiate_disconnect() will wait until all connections
862  * on the specified channel have been closed/torndown. So the caller can be
863  * assured that they will not be receiving any more callouts from XPC to the
864  * function they registered via xpc_connect().
865  *
866  * Arguments:
867  *
868  *      ch_number - channel # to unregister.
869  */
870 void
871 xpc_initiate_disconnect(int ch_number)
872 {
873         unsigned long irq_flags;
874         short partid;
875         struct xpc_partition *part;
876         struct xpc_channel *ch;
877
878         DBUG_ON(ch_number < 0 || ch_number >= XPC_MAX_NCHANNELS);
879
880         /* initiate the channel disconnect for every active partition */
881         for (partid = 0; partid < xp_max_npartitions; partid++) {
882                 part = &xpc_partitions[partid];
883
884                 if (xpc_part_ref(part)) {
885                         ch = &part->channels[ch_number];
886                         xpc_msgqueue_ref(ch);
887
888                         spin_lock_irqsave(&ch->lock, irq_flags);
889
890                         if (!(ch->flags & XPC_C_DISCONNECTED)) {
891                                 ch->flags |= XPC_C_WDISCONNECT;
892
893                                 XPC_DISCONNECT_CHANNEL(ch, xpUnregistering,
894                                                        &irq_flags);
895                         }
896
897                         spin_unlock_irqrestore(&ch->lock, irq_flags);
898
899                         xpc_msgqueue_deref(ch);
900                         xpc_part_deref(part);
901                 }
902         }
903
904         xpc_disconnect_wait(ch_number);
905 }
906
907 /*
908  * To disconnect a channel, and reflect it back to all who may be waiting.
909  *
910  * An OPEN is not allowed until XPC_C_DISCONNECTING is cleared by
911  * xpc_process_disconnect(), and if set, XPC_C_WDISCONNECT is cleared by
912  * xpc_disconnect_wait().
913  *
914  * THE CHANNEL IS TO BE LOCKED BY THE CALLER AND WILL REMAIN LOCKED UPON RETURN.
915  */
916 void
917 xpc_disconnect_channel(const int line, struct xpc_channel *ch,
918                        enum xp_retval reason, unsigned long *irq_flags)
919 {
920         u32 channel_was_connected = (ch->flags & XPC_C_CONNECTED);
921
922         DBUG_ON(!spin_is_locked(&ch->lock));
923
924         if (ch->flags & (XPC_C_DISCONNECTING | XPC_C_DISCONNECTED))
925                 return;
926
927         DBUG_ON(!(ch->flags & (XPC_C_CONNECTING | XPC_C_CONNECTED)));
928
929         dev_dbg(xpc_chan, "reason=%d, line=%d, partid=%d, channel=%d\n",
930                 reason, line, ch->partid, ch->number);
931
932         XPC_SET_REASON(ch, reason, line);
933
934         ch->flags |= (XPC_C_CLOSEREQUEST | XPC_C_DISCONNECTING);
935         /* some of these may not have been set */
936         ch->flags &= ~(XPC_C_OPENREQUEST | XPC_C_OPENREPLY |
937                        XPC_C_ROPENREQUEST | XPC_C_ROPENREPLY |
938                        XPC_C_CONNECTING | XPC_C_CONNECTED);
939
940         xpc_send_channel_closerequest(ch, irq_flags);
941
942         if (channel_was_connected)
943                 ch->flags |= XPC_C_WASCONNECTED;
944
945         spin_unlock_irqrestore(&ch->lock, *irq_flags);
946
947         /* wake all idle kthreads so they can exit */
948         if (atomic_read(&ch->kthreads_idle) > 0) {
949                 wake_up_all(&ch->idle_wq);
950
951         } else if ((ch->flags & XPC_C_CONNECTEDCALLOUT_MADE) &&
952                    !(ch->flags & XPC_C_DISCONNECTINGCALLOUT)) {
953                 /* start a kthread that will do the xpDisconnecting callout */
954                 xpc_create_kthreads(ch, 1, 1);
955         }
956
957         /* wake those waiting to allocate an entry from the local msg queue */
958         if (atomic_read(&ch->n_on_msg_allocate_wq) > 0)
959                 wake_up(&ch->msg_allocate_wq);
960
961         spin_lock_irqsave(&ch->lock, *irq_flags);
962 }
963
964 void
965 xpc_disconnect_callout(struct xpc_channel *ch, enum xp_retval reason)
966 {
967         /*
968          * Let the channel's registerer know that the channel is being
969          * disconnected. We don't want to do this if the registerer was never
970          * informed of a connection being made.
971          */
972
973         if (ch->func != NULL) {
974                 dev_dbg(xpc_chan, "ch->func() called, reason=%d, partid=%d, "
975                         "channel=%d\n", reason, ch->partid, ch->number);
976
977                 ch->func(reason, ch->partid, ch->number, NULL, ch->key);
978
979                 dev_dbg(xpc_chan, "ch->func() returned, reason=%d, partid=%d, "
980                         "channel=%d\n", reason, ch->partid, ch->number);
981         }
982 }
983
984 /*
985  * Wait for a message entry to become available for the specified channel,
986  * but don't wait any longer than 1 jiffy.
987  */
988 enum xp_retval
989 xpc_allocate_msg_wait(struct xpc_channel *ch)
990 {
991         enum xp_retval ret;
992
993         if (ch->flags & XPC_C_DISCONNECTING) {
994                 DBUG_ON(ch->reason == xpInterrupted);
995                 return ch->reason;
996         }
997
998         atomic_inc(&ch->n_on_msg_allocate_wq);
999         ret = interruptible_sleep_on_timeout(&ch->msg_allocate_wq, 1);
1000         atomic_dec(&ch->n_on_msg_allocate_wq);
1001
1002         if (ch->flags & XPC_C_DISCONNECTING) {
1003                 ret = ch->reason;
1004                 DBUG_ON(ch->reason == xpInterrupted);
1005         } else if (ret == 0) {
1006                 ret = xpTimeout;
1007         } else {
1008                 ret = xpInterrupted;
1009         }
1010
1011         return ret;
1012 }
1013
1014 /*
1015  * Send a message that contains the user's payload on the specified channel
1016  * connected to the specified partition.
1017  *
1018  * NOTE that this routine can sleep waiting for a message entry to become
1019  * available. To not sleep, pass in the XPC_NOWAIT flag.
1020  *
1021  * Once sent, this routine will not wait for the message to be received, nor
1022  * will notification be given when it does happen.
1023  *
1024  * Arguments:
1025  *
1026  *      partid - ID of partition to which the channel is connected.
1027  *      ch_number - channel # to send message on.
1028  *      flags - see xp.h for valid flags.
1029  *      payload - pointer to the payload which is to be sent.
1030  *      payload_size - size of the payload in bytes.
1031  */
1032 enum xp_retval
1033 xpc_initiate_send(short partid, int ch_number, u32 flags, void *payload,
1034                   u16 payload_size)
1035 {
1036         struct xpc_partition *part = &xpc_partitions[partid];
1037         enum xp_retval ret = xpUnknownReason;
1038
1039         dev_dbg(xpc_chan, "payload=0x%p, partid=%d, channel=%d\n", payload,
1040                 partid, ch_number);
1041
1042         DBUG_ON(partid < 0 || partid >= xp_max_npartitions);
1043         DBUG_ON(ch_number < 0 || ch_number >= part->nchannels);
1044         DBUG_ON(payload == NULL);
1045
1046         if (xpc_part_ref(part)) {
1047                 ret = xpc_send_msg(&part->channels[ch_number], flags, payload,
1048                                    payload_size, 0, NULL, NULL);
1049                 xpc_part_deref(part);
1050         }
1051
1052         return ret;
1053 }
1054
1055 /*
1056  * Send a message that contains the user's payload on the specified channel
1057  * connected to the specified partition.
1058  *
1059  * NOTE that this routine can sleep waiting for a message entry to become
1060  * available. To not sleep, pass in the XPC_NOWAIT flag.
1061  *
1062  * This routine will not wait for the message to be sent or received.
1063  *
1064  * Once the remote end of the channel has received the message, the function
1065  * passed as an argument to xpc_initiate_send_notify() will be called. This
1066  * allows the sender to free up or re-use any buffers referenced by the
1067  * message, but does NOT mean the message has been processed at the remote
1068  * end by a receiver.
1069  *
1070  * If this routine returns an error, the caller's function will NOT be called.
1071  *
1072  * Arguments:
1073  *
1074  *      partid - ID of partition to which the channel is connected.
1075  *      ch_number - channel # to send message on.
1076  *      flags - see xp.h for valid flags.
1077  *      payload - pointer to the payload which is to be sent.
1078  *      payload_size - size of the payload in bytes.
1079  *      func - function to call with asynchronous notification of message
1080  *                receipt. THIS FUNCTION MUST BE NON-BLOCKING.
1081  *      key - user-defined key to be passed to the function when it's called.
1082  */
1083 enum xp_retval
1084 xpc_initiate_send_notify(short partid, int ch_number, u32 flags, void *payload,
1085                          u16 payload_size, xpc_notify_func func, void *key)
1086 {
1087         struct xpc_partition *part = &xpc_partitions[partid];
1088         enum xp_retval ret = xpUnknownReason;
1089
1090         dev_dbg(xpc_chan, "payload=0x%p, partid=%d, channel=%d\n", payload,
1091                 partid, ch_number);
1092
1093         DBUG_ON(partid < 0 || partid >= xp_max_npartitions);
1094         DBUG_ON(ch_number < 0 || ch_number >= part->nchannels);
1095         DBUG_ON(payload == NULL);
1096         DBUG_ON(func == NULL);
1097
1098         if (xpc_part_ref(part)) {
1099                 ret = xpc_send_msg(&part->channels[ch_number], flags, payload,
1100                                    payload_size, XPC_N_CALL, func, key);
1101                 xpc_part_deref(part);
1102         }
1103         return ret;
1104 }
1105
1106 /*
1107  * Deliver a message to its intended recipient.
1108  */
1109 void
1110 xpc_deliver_msg(struct xpc_channel *ch)
1111 {
1112         struct xpc_msg *msg;
1113
1114         msg = xpc_get_deliverable_msg(ch);
1115         if (msg != NULL) {
1116
1117                 /*
1118                  * This ref is taken to protect the payload itself from being
1119                  * freed before the user is finished with it, which the user
1120                  * indicates by calling xpc_initiate_received().
1121                  */
1122                 xpc_msgqueue_ref(ch);
1123
1124                 atomic_inc(&ch->kthreads_active);
1125
1126                 if (ch->func != NULL) {
1127                         dev_dbg(xpc_chan, "ch->func() called, msg=0x%p, "
1128                                 "msg_number=%ld, partid=%d, channel=%d\n",
1129                                 (void *)msg, msg->number, ch->partid,
1130                                 ch->number);
1131
1132                         /* deliver the message to its intended recipient */
1133                         ch->func(xpMsgReceived, ch->partid, ch->number,
1134                                  &msg->payload, ch->key);
1135
1136                         dev_dbg(xpc_chan, "ch->func() returned, msg=0x%p, "
1137                                 "msg_number=%ld, partid=%d, channel=%d\n",
1138                                 (void *)msg, msg->number, ch->partid,
1139                                 ch->number);
1140                 }
1141
1142                 atomic_dec(&ch->kthreads_active);
1143         }
1144 }
1145
1146 /*
1147  * Acknowledge receipt of a delivered message.
1148  *
1149  * If a message has XPC_M_INTERRUPT set, send an interrupt to the partition
1150  * that sent the message.
1151  *
1152  * This function, although called by users, does not call xpc_part_ref() to
1153  * ensure that the partition infrastructure is in place. It relies on the
1154  * fact that we called xpc_msgqueue_ref() in xpc_deliver_msg().
1155  *
1156  * Arguments:
1157  *
1158  *      partid - ID of partition to which the channel is connected.
1159  *      ch_number - channel # message received on.
1160  *      payload - pointer to the payload area allocated via
1161  *                      xpc_initiate_send() or xpc_initiate_send_notify().
1162  */
1163 void
1164 xpc_initiate_received(short partid, int ch_number, void *payload)
1165 {
1166         struct xpc_partition *part = &xpc_partitions[partid];
1167         struct xpc_channel *ch;
1168         struct xpc_msg *msg = XPC_MSG_ADDRESS(payload);
1169
1170         DBUG_ON(partid < 0 || partid >= xp_max_npartitions);
1171         DBUG_ON(ch_number < 0 || ch_number >= part->nchannels);
1172
1173         ch = &part->channels[ch_number];
1174         xpc_received_msg(ch, msg);
1175
1176         /* the call to xpc_msgqueue_ref() was done by xpc_deliver_msg()  */
1177         xpc_msgqueue_deref(ch);
1178 }