ceph: handle errors during osd client init
[linux-2.6.git] / fs / ceph / mds_client.c
1 #include "ceph_debug.h"
2
3 #include <linux/wait.h>
4 #include <linux/sched.h>
5
6 #include "mds_client.h"
7 #include "mon_client.h"
8 #include "super.h"
9 #include "messenger.h"
10 #include "decode.h"
11
12 /*
13  * A cluster of MDS (metadata server) daemons is responsible for
14  * managing the file system namespace (the directory hierarchy and
15  * inodes) and for coordinating shared access to storage.  Metadata is
16  * partitioning hierarchically across a number of servers, and that
17  * partition varies over time as the cluster adjusts the distribution
18  * in order to balance load.
19  *
20  * The MDS client is primarily responsible to managing synchronous
21  * metadata requests for operations like open, unlink, and so forth.
22  * If there is a MDS failure, we find out about it when we (possibly
23  * request and) receive a new MDS map, and can resubmit affected
24  * requests.
25  *
26  * For the most part, though, we take advantage of a lossless
27  * communications channel to the MDS, and do not need to worry about
28  * timing out or resubmitting requests.
29  *
30  * We maintain a stateful "session" with each MDS we interact with.
31  * Within each session, we sent periodic heartbeat messages to ensure
32  * any capabilities or leases we have been issues remain valid.  If
33  * the session times out and goes stale, our leases and capabilities
34  * are no longer valid.
35  */
36
37 static void __wake_requests(struct ceph_mds_client *mdsc,
38                             struct list_head *head);
39
40 const static struct ceph_connection_operations mds_con_ops;
41
42
43 /*
44  * mds reply parsing
45  */
46
47 /*
48  * parse individual inode info
49  */
50 static int parse_reply_info_in(void **p, void *end,
51                                struct ceph_mds_reply_info_in *info)
52 {
53         int err = -EIO;
54
55         info->in = *p;
56         *p += sizeof(struct ceph_mds_reply_inode) +
57                 sizeof(*info->in->fragtree.splits) *
58                 le32_to_cpu(info->in->fragtree.nsplits);
59
60         ceph_decode_32_safe(p, end, info->symlink_len, bad);
61         ceph_decode_need(p, end, info->symlink_len, bad);
62         info->symlink = *p;
63         *p += info->symlink_len;
64
65         ceph_decode_32_safe(p, end, info->xattr_len, bad);
66         ceph_decode_need(p, end, info->xattr_len, bad);
67         info->xattr_data = *p;
68         *p += info->xattr_len;
69         return 0;
70 bad:
71         return err;
72 }
73
74 /*
75  * parse a normal reply, which may contain a (dir+)dentry and/or a
76  * target inode.
77  */
78 static int parse_reply_info_trace(void **p, void *end,
79                                   struct ceph_mds_reply_info_parsed *info)
80 {
81         int err;
82
83         if (info->head->is_dentry) {
84                 err = parse_reply_info_in(p, end, &info->diri);
85                 if (err < 0)
86                         goto out_bad;
87
88                 if (unlikely(*p + sizeof(*info->dirfrag) > end))
89                         goto bad;
90                 info->dirfrag = *p;
91                 *p += sizeof(*info->dirfrag) +
92                         sizeof(u32)*le32_to_cpu(info->dirfrag->ndist);
93                 if (unlikely(*p > end))
94                         goto bad;
95
96                 ceph_decode_32_safe(p, end, info->dname_len, bad);
97                 ceph_decode_need(p, end, info->dname_len, bad);
98                 info->dname = *p;
99                 *p += info->dname_len;
100                 info->dlease = *p;
101                 *p += sizeof(*info->dlease);
102         }
103
104         if (info->head->is_target) {
105                 err = parse_reply_info_in(p, end, &info->targeti);
106                 if (err < 0)
107                         goto out_bad;
108         }
109
110         if (unlikely(*p != end))
111                 goto bad;
112         return 0;
113
114 bad:
115         err = -EIO;
116 out_bad:
117         pr_err("problem parsing mds trace %d\n", err);
118         return err;
119 }
120
121 /*
122  * parse readdir results
123  */
124 static int parse_reply_info_dir(void **p, void *end,
125                                 struct ceph_mds_reply_info_parsed *info)
126 {
127         u32 num, i = 0;
128         int err;
129
130         info->dir_dir = *p;
131         if (*p + sizeof(*info->dir_dir) > end)
132                 goto bad;
133         *p += sizeof(*info->dir_dir) +
134                 sizeof(u32)*le32_to_cpu(info->dir_dir->ndist);
135         if (*p > end)
136                 goto bad;
137
138         ceph_decode_need(p, end, sizeof(num) + 2, bad);
139         num = ceph_decode_32(p);
140         info->dir_end = ceph_decode_8(p);
141         info->dir_complete = ceph_decode_8(p);
142         if (num == 0)
143                 goto done;
144
145         /* alloc large array */
146         info->dir_nr = num;
147         info->dir_in = kcalloc(num, sizeof(*info->dir_in) +
148                                sizeof(*info->dir_dname) +
149                                sizeof(*info->dir_dname_len) +
150                                sizeof(*info->dir_dlease),
151                                GFP_NOFS);
152         if (info->dir_in == NULL) {
153                 err = -ENOMEM;
154                 goto out_bad;
155         }
156         info->dir_dname = (void *)(info->dir_in + num);
157         info->dir_dname_len = (void *)(info->dir_dname + num);
158         info->dir_dlease = (void *)(info->dir_dname_len + num);
159
160         while (num) {
161                 /* dentry */
162                 ceph_decode_need(p, end, sizeof(u32)*2, bad);
163                 info->dir_dname_len[i] = ceph_decode_32(p);
164                 ceph_decode_need(p, end, info->dir_dname_len[i], bad);
165                 info->dir_dname[i] = *p;
166                 *p += info->dir_dname_len[i];
167                 dout("parsed dir dname '%.*s'\n", info->dir_dname_len[i],
168                      info->dir_dname[i]);
169                 info->dir_dlease[i] = *p;
170                 *p += sizeof(struct ceph_mds_reply_lease);
171
172                 /* inode */
173                 err = parse_reply_info_in(p, end, &info->dir_in[i]);
174                 if (err < 0)
175                         goto out_bad;
176                 i++;
177                 num--;
178         }
179
180 done:
181         if (*p != end)
182                 goto bad;
183         return 0;
184
185 bad:
186         err = -EIO;
187 out_bad:
188         pr_err("problem parsing dir contents %d\n", err);
189         return err;
190 }
191
192 /*
193  * parse entire mds reply
194  */
195 static int parse_reply_info(struct ceph_msg *msg,
196                             struct ceph_mds_reply_info_parsed *info)
197 {
198         void *p, *end;
199         u32 len;
200         int err;
201
202         info->head = msg->front.iov_base;
203         p = msg->front.iov_base + sizeof(struct ceph_mds_reply_head);
204         end = p + msg->front.iov_len - sizeof(struct ceph_mds_reply_head);
205
206         /* trace */
207         ceph_decode_32_safe(&p, end, len, bad);
208         if (len > 0) {
209                 err = parse_reply_info_trace(&p, p+len, info);
210                 if (err < 0)
211                         goto out_bad;
212         }
213
214         /* dir content */
215         ceph_decode_32_safe(&p, end, len, bad);
216         if (len > 0) {
217                 err = parse_reply_info_dir(&p, p+len, info);
218                 if (err < 0)
219                         goto out_bad;
220         }
221
222         /* snap blob */
223         ceph_decode_32_safe(&p, end, len, bad);
224         info->snapblob_len = len;
225         info->snapblob = p;
226         p += len;
227
228         if (p != end)
229                 goto bad;
230         return 0;
231
232 bad:
233         err = -EIO;
234 out_bad:
235         pr_err("mds parse_reply err %d\n", err);
236         return err;
237 }
238
239 static void destroy_reply_info(struct ceph_mds_reply_info_parsed *info)
240 {
241         kfree(info->dir_in);
242 }
243
244
245 /*
246  * sessions
247  */
248 static const char *session_state_name(int s)
249 {
250         switch (s) {
251         case CEPH_MDS_SESSION_NEW: return "new";
252         case CEPH_MDS_SESSION_OPENING: return "opening";
253         case CEPH_MDS_SESSION_OPEN: return "open";
254         case CEPH_MDS_SESSION_HUNG: return "hung";
255         case CEPH_MDS_SESSION_CLOSING: return "closing";
256         case CEPH_MDS_SESSION_RECONNECTING: return "reconnecting";
257         default: return "???";
258         }
259 }
260
261 static struct ceph_mds_session *get_session(struct ceph_mds_session *s)
262 {
263         if (atomic_inc_not_zero(&s->s_ref)) {
264                 dout("mdsc get_session %p %d -> %d\n", s,
265                      atomic_read(&s->s_ref)-1, atomic_read(&s->s_ref));
266                 return s;
267         } else {
268                 dout("mdsc get_session %p 0 -- FAIL", s);
269                 return NULL;
270         }
271 }
272
273 void ceph_put_mds_session(struct ceph_mds_session *s)
274 {
275         dout("mdsc put_session %p %d -> %d\n", s,
276              atomic_read(&s->s_ref), atomic_read(&s->s_ref)-1);
277         if (atomic_dec_and_test(&s->s_ref))
278                 kfree(s);
279 }
280
281 /*
282  * called under mdsc->mutex
283  */
284 struct ceph_mds_session *__ceph_lookup_mds_session(struct ceph_mds_client *mdsc,
285                                                    int mds)
286 {
287         struct ceph_mds_session *session;
288
289         if (mds >= mdsc->max_sessions || mdsc->sessions[mds] == NULL)
290                 return NULL;
291         session = mdsc->sessions[mds];
292         dout("lookup_mds_session %p %d\n", session,
293              atomic_read(&session->s_ref));
294         get_session(session);
295         return session;
296 }
297
298 static bool __have_session(struct ceph_mds_client *mdsc, int mds)
299 {
300         if (mds >= mdsc->max_sessions)
301                 return false;
302         return mdsc->sessions[mds];
303 }
304
305 /*
306  * create+register a new session for given mds.
307  * called under mdsc->mutex.
308  */
309 static struct ceph_mds_session *register_session(struct ceph_mds_client *mdsc,
310                                                  int mds)
311 {
312         struct ceph_mds_session *s;
313
314         s = kzalloc(sizeof(*s), GFP_NOFS);
315         s->s_mdsc = mdsc;
316         s->s_mds = mds;
317         s->s_state = CEPH_MDS_SESSION_NEW;
318         s->s_ttl = 0;
319         s->s_seq = 0;
320         mutex_init(&s->s_mutex);
321
322         ceph_con_init(mdsc->client->msgr, &s->s_con);
323         s->s_con.private = s;
324         s->s_con.ops = &mds_con_ops;
325         s->s_con.peer_name.type = CEPH_ENTITY_TYPE_MDS;
326         s->s_con.peer_name.num = cpu_to_le64(mds);
327
328         spin_lock_init(&s->s_cap_lock);
329         s->s_cap_gen = 0;
330         s->s_cap_ttl = 0;
331         s->s_renew_requested = 0;
332         s->s_renew_seq = 0;
333         INIT_LIST_HEAD(&s->s_caps);
334         s->s_nr_caps = 0;
335         atomic_set(&s->s_ref, 1);
336         INIT_LIST_HEAD(&s->s_waiting);
337         INIT_LIST_HEAD(&s->s_unsafe);
338         s->s_num_cap_releases = 0;
339         INIT_LIST_HEAD(&s->s_cap_releases);
340         INIT_LIST_HEAD(&s->s_cap_releases_done);
341         INIT_LIST_HEAD(&s->s_cap_flushing);
342         INIT_LIST_HEAD(&s->s_cap_snaps_flushing);
343
344         dout("register_session mds%d\n", mds);
345         if (mds >= mdsc->max_sessions) {
346                 int newmax = 1 << get_count_order(mds+1);
347                 struct ceph_mds_session **sa;
348
349                 dout("register_session realloc to %d\n", newmax);
350                 sa = kcalloc(newmax, sizeof(void *), GFP_NOFS);
351                 if (sa == NULL)
352                         goto fail_realloc;
353                 if (mdsc->sessions) {
354                         memcpy(sa, mdsc->sessions,
355                                mdsc->max_sessions * sizeof(void *));
356                         kfree(mdsc->sessions);
357                 }
358                 mdsc->sessions = sa;
359                 mdsc->max_sessions = newmax;
360         }
361         mdsc->sessions[mds] = s;
362         atomic_inc(&s->s_ref);  /* one ref to sessions[], one to caller */
363
364         ceph_con_open(&s->s_con, ceph_mdsmap_get_addr(mdsc->mdsmap, mds));
365
366         return s;
367
368 fail_realloc:
369         kfree(s);
370         return ERR_PTR(-ENOMEM);
371 }
372
373 /*
374  * called under mdsc->mutex
375  */
376 static void unregister_session(struct ceph_mds_client *mdsc,
377                                struct ceph_mds_session *s)
378 {
379         dout("unregister_session mds%d %p\n", s->s_mds, s);
380         mdsc->sessions[s->s_mds] = NULL;
381         ceph_con_close(&s->s_con);
382         ceph_put_mds_session(s);
383 }
384
385 /*
386  * drop session refs in request.
387  *
388  * should be last request ref, or hold mdsc->mutex
389  */
390 static void put_request_session(struct ceph_mds_request *req)
391 {
392         if (req->r_session) {
393                 ceph_put_mds_session(req->r_session);
394                 req->r_session = NULL;
395         }
396 }
397
398 void ceph_mdsc_put_request(struct ceph_mds_request *req)
399 {
400         dout("mdsc put_request %p %d -> %d\n", req,
401              atomic_read(&req->r_ref), atomic_read(&req->r_ref)-1);
402         if (atomic_dec_and_test(&req->r_ref)) {
403                 if (req->r_request)
404                         ceph_msg_put(req->r_request);
405                 if (req->r_reply) {
406                         ceph_msg_put(req->r_reply);
407                         destroy_reply_info(&req->r_reply_info);
408                 }
409                 if (req->r_inode) {
410                         ceph_put_cap_refs(ceph_inode(req->r_inode),
411                                           CEPH_CAP_PIN);
412                         iput(req->r_inode);
413                 }
414                 if (req->r_locked_dir)
415                         ceph_put_cap_refs(ceph_inode(req->r_locked_dir),
416                                           CEPH_CAP_PIN);
417                 if (req->r_target_inode)
418                         iput(req->r_target_inode);
419                 if (req->r_dentry)
420                         dput(req->r_dentry);
421                 if (req->r_old_dentry) {
422                         ceph_put_cap_refs(
423                              ceph_inode(req->r_old_dentry->d_parent->d_inode),
424                              CEPH_CAP_PIN);
425                         dput(req->r_old_dentry);
426                 }
427                 kfree(req->r_path1);
428                 kfree(req->r_path2);
429                 put_request_session(req);
430                 ceph_unreserve_caps(&req->r_caps_reservation);
431                 kfree(req);
432         }
433 }
434
435 /*
436  * lookup session, bump ref if found.
437  *
438  * called under mdsc->mutex.
439  */
440 static struct ceph_mds_request *__lookup_request(struct ceph_mds_client *mdsc,
441                                              u64 tid)
442 {
443         struct ceph_mds_request *req;
444         req = radix_tree_lookup(&mdsc->request_tree, tid);
445         if (req)
446                 ceph_mdsc_get_request(req);
447         return req;
448 }
449
450 /*
451  * Register an in-flight request, and assign a tid.  Link to directory
452  * are modifying (if any).
453  *
454  * Called under mdsc->mutex.
455  */
456 static void __register_request(struct ceph_mds_client *mdsc,
457                                struct ceph_mds_request *req,
458                                struct inode *dir)
459 {
460         req->r_tid = ++mdsc->last_tid;
461         if (req->r_num_caps)
462                 ceph_reserve_caps(&req->r_caps_reservation, req->r_num_caps);
463         dout("__register_request %p tid %lld\n", req, req->r_tid);
464         ceph_mdsc_get_request(req);
465         radix_tree_insert(&mdsc->request_tree, req->r_tid, (void *)req);
466
467         if (dir) {
468                 struct ceph_inode_info *ci = ceph_inode(dir);
469
470                 spin_lock(&ci->i_unsafe_lock);
471                 req->r_unsafe_dir = dir;
472                 list_add_tail(&req->r_unsafe_dir_item, &ci->i_unsafe_dirops);
473                 spin_unlock(&ci->i_unsafe_lock);
474         }
475 }
476
477 static void __unregister_request(struct ceph_mds_client *mdsc,
478                                  struct ceph_mds_request *req)
479 {
480         dout("__unregister_request %p tid %lld\n", req, req->r_tid);
481         radix_tree_delete(&mdsc->request_tree, req->r_tid);
482         ceph_mdsc_put_request(req);
483
484         if (req->r_unsafe_dir) {
485                 struct ceph_inode_info *ci = ceph_inode(req->r_unsafe_dir);
486
487                 spin_lock(&ci->i_unsafe_lock);
488                 list_del_init(&req->r_unsafe_dir_item);
489                 spin_unlock(&ci->i_unsafe_lock);
490         }
491 }
492
493 /*
494  * Choose mds to send request to next.  If there is a hint set in the
495  * request (e.g., due to a prior forward hint from the mds), use that.
496  * Otherwise, consult frag tree and/or caps to identify the
497  * appropriate mds.  If all else fails, choose randomly.
498  *
499  * Called under mdsc->mutex.
500  */
501 static int __choose_mds(struct ceph_mds_client *mdsc,
502                         struct ceph_mds_request *req)
503 {
504         struct inode *inode;
505         struct ceph_inode_info *ci;
506         struct ceph_cap *cap;
507         int mode = req->r_direct_mode;
508         int mds = -1;
509         u32 hash = req->r_direct_hash;
510         bool is_hash = req->r_direct_is_hash;
511
512         /*
513          * is there a specific mds we should try?  ignore hint if we have
514          * no session and the mds is not up (active or recovering).
515          */
516         if (req->r_resend_mds >= 0 &&
517             (__have_session(mdsc, req->r_resend_mds) ||
518              ceph_mdsmap_get_state(mdsc->mdsmap, req->r_resend_mds) > 0)) {
519                 dout("choose_mds using resend_mds mds%d\n",
520                      req->r_resend_mds);
521                 return req->r_resend_mds;
522         }
523
524         if (mode == USE_RANDOM_MDS)
525                 goto random;
526
527         inode = NULL;
528         if (req->r_inode) {
529                 inode = req->r_inode;
530         } else if (req->r_dentry) {
531                 if (req->r_dentry->d_inode) {
532                         inode = req->r_dentry->d_inode;
533                 } else {
534                         inode = req->r_dentry->d_parent->d_inode;
535                         hash = req->r_dentry->d_name.hash;
536                         is_hash = true;
537                 }
538         }
539         dout("__choose_mds %p is_hash=%d (%d) mode %d\n", inode, (int)is_hash,
540              (int)hash, mode);
541         if (!inode)
542                 goto random;
543         ci = ceph_inode(inode);
544
545         if (is_hash && S_ISDIR(inode->i_mode)) {
546                 struct ceph_inode_frag frag;
547                 int found;
548
549                 ceph_choose_frag(ci, hash, &frag, &found);
550                 if (found) {
551                         if (mode == USE_ANY_MDS && frag.ndist > 0) {
552                                 u8 r;
553
554                                 /* choose a random replica */
555                                 get_random_bytes(&r, 1);
556                                 r %= frag.ndist;
557                                 mds = frag.dist[r];
558                                 dout("choose_mds %p %llx.%llx "
559                                      "frag %u mds%d (%d/%d)\n",
560                                      inode, ceph_vinop(inode),
561                                      frag.frag, frag.mds,
562                                      (int)r, frag.ndist);
563                                 return mds;
564                         }
565
566                         /* since this file/dir wasn't known to be
567                          * replicated, then we want to look for the
568                          * authoritative mds. */
569                         mode = USE_AUTH_MDS;
570                         if (frag.mds >= 0) {
571                                 /* choose auth mds */
572                                 mds = frag.mds;
573                                 dout("choose_mds %p %llx.%llx "
574                                      "frag %u mds%d (auth)\n",
575                                      inode, ceph_vinop(inode), frag.frag, mds);
576                                 return mds;
577                         }
578                 }
579         }
580
581         spin_lock(&inode->i_lock);
582         cap = NULL;
583         if (mode == USE_AUTH_MDS)
584                 cap = ci->i_auth_cap;
585         if (!cap && !RB_EMPTY_ROOT(&ci->i_caps))
586                 cap = rb_entry(rb_first(&ci->i_caps), struct ceph_cap, ci_node);
587         if (!cap) {
588                 spin_unlock(&inode->i_lock);
589                 goto random;
590         }
591         mds = cap->session->s_mds;
592         dout("choose_mds %p %llx.%llx mds%d (%scap %p)\n",
593              inode, ceph_vinop(inode), mds,
594              cap == ci->i_auth_cap ? "auth " : "", cap);
595         spin_unlock(&inode->i_lock);
596         return mds;
597
598 random:
599         mds = ceph_mdsmap_get_random_mds(mdsc->mdsmap);
600         dout("choose_mds chose random mds%d\n", mds);
601         return mds;
602 }
603
604
605 /*
606  * session messages
607  */
608 static struct ceph_msg *create_session_msg(u32 op, u64 seq)
609 {
610         struct ceph_msg *msg;
611         struct ceph_mds_session_head *h;
612
613         msg = ceph_msg_new(CEPH_MSG_CLIENT_SESSION, sizeof(*h), 0, 0, NULL);
614         if (IS_ERR(msg)) {
615                 pr_err("create_session_msg ENOMEM creating msg\n");
616                 return ERR_PTR(PTR_ERR(msg));
617         }
618         h = msg->front.iov_base;
619         h->op = cpu_to_le32(op);
620         h->seq = cpu_to_le64(seq);
621         return msg;
622 }
623
624 /*
625  * send session open request.
626  *
627  * called under mdsc->mutex
628  */
629 static int __open_session(struct ceph_mds_client *mdsc,
630                           struct ceph_mds_session *session)
631 {
632         struct ceph_msg *msg;
633         int mstate;
634         int mds = session->s_mds;
635         int err = 0;
636
637         /* wait for mds to go active? */
638         mstate = ceph_mdsmap_get_state(mdsc->mdsmap, mds);
639         dout("open_session to mds%d (%s)\n", mds,
640              ceph_mds_state_name(mstate));
641         session->s_state = CEPH_MDS_SESSION_OPENING;
642         session->s_renew_requested = jiffies;
643
644         /* send connect message */
645         msg = create_session_msg(CEPH_SESSION_REQUEST_OPEN, session->s_seq);
646         if (IS_ERR(msg)) {
647                 err = PTR_ERR(msg);
648                 goto out;
649         }
650         ceph_con_send(&session->s_con, msg);
651
652 out:
653         return 0;
654 }
655
656 /*
657  * session caps
658  */
659
660 /*
661  * Free preallocated cap messages assigned to this session
662  */
663 static void cleanup_cap_releases(struct ceph_mds_session *session)
664 {
665         struct ceph_msg *msg;
666
667         spin_lock(&session->s_cap_lock);
668         while (!list_empty(&session->s_cap_releases)) {
669                 msg = list_first_entry(&session->s_cap_releases,
670                                        struct ceph_msg, list_head);
671                 list_del_init(&msg->list_head);
672                 ceph_msg_put(msg);
673         }
674         while (!list_empty(&session->s_cap_releases_done)) {
675                 msg = list_first_entry(&session->s_cap_releases_done,
676                                        struct ceph_msg, list_head);
677                 list_del_init(&msg->list_head);
678                 ceph_msg_put(msg);
679         }
680         spin_unlock(&session->s_cap_lock);
681 }
682
683 /*
684  * Helper to safely iterate over all caps associated with a session.
685  *
686  * caller must hold session s_mutex
687  */
688 static int iterate_session_caps(struct ceph_mds_session *session,
689                                  int (*cb)(struct inode *, struct ceph_cap *,
690                                             void *), void *arg)
691 {
692         struct ceph_cap *cap, *ncap;
693         struct inode *inode;
694         int ret;
695
696         dout("iterate_session_caps %p mds%d\n", session, session->s_mds);
697         spin_lock(&session->s_cap_lock);
698         list_for_each_entry_safe(cap, ncap, &session->s_caps, session_caps) {
699                 inode = igrab(&cap->ci->vfs_inode);
700                 if (!inode)
701                         continue;
702                 spin_unlock(&session->s_cap_lock);
703                 ret = cb(inode, cap, arg);
704                 iput(inode);
705                 if (ret < 0)
706                         return ret;
707                 spin_lock(&session->s_cap_lock);
708         }
709         spin_unlock(&session->s_cap_lock);
710
711         return 0;
712 }
713
714 static int remove_session_caps_cb(struct inode *inode, struct ceph_cap *cap,
715                                    void *arg)
716 {
717         struct ceph_inode_info *ci = ceph_inode(inode);
718         dout("removing cap %p, ci is %p, inode is %p\n",
719              cap, ci, &ci->vfs_inode);
720         ceph_remove_cap(cap);
721         return 0;
722 }
723
724 /*
725  * caller must hold session s_mutex
726  */
727 static void remove_session_caps(struct ceph_mds_session *session)
728 {
729         dout("remove_session_caps on %p\n", session);
730         iterate_session_caps(session, remove_session_caps_cb, NULL);
731         BUG_ON(session->s_nr_caps > 0);
732         cleanup_cap_releases(session);
733 }
734
735 /*
736  * wake up any threads waiting on this session's caps.  if the cap is
737  * old (didn't get renewed on the client reconnect), remove it now.
738  *
739  * caller must hold s_mutex.
740  */
741 static int wake_up_session_cb(struct inode *inode, struct ceph_cap *cap,
742                               void *arg)
743 {
744         wake_up(&ceph_inode(inode)->i_cap_wq);
745         return 0;
746 }
747
748 static void wake_up_session_caps(struct ceph_mds_session *session)
749 {
750         dout("wake_up_session_caps %p mds%d\n", session, session->s_mds);
751         iterate_session_caps(session, wake_up_session_cb, NULL);
752 }
753
754 /*
755  * Send periodic message to MDS renewing all currently held caps.  The
756  * ack will reset the expiration for all caps from this session.
757  *
758  * caller holds s_mutex
759  */
760 static int send_renew_caps(struct ceph_mds_client *mdsc,
761                            struct ceph_mds_session *session)
762 {
763         struct ceph_msg *msg;
764         int state;
765
766         if (time_after_eq(jiffies, session->s_cap_ttl) &&
767             time_after_eq(session->s_cap_ttl, session->s_renew_requested))
768                 pr_info("mds%d caps stale\n", session->s_mds);
769
770         /* do not try to renew caps until a recovering mds has reconnected
771          * with its clients. */
772         state = ceph_mdsmap_get_state(mdsc->mdsmap, session->s_mds);
773         if (state < CEPH_MDS_STATE_RECONNECT) {
774                 dout("send_renew_caps ignoring mds%d (%s)\n",
775                      session->s_mds, ceph_mds_state_name(state));
776                 return 0;
777         }
778
779         dout("send_renew_caps to mds%d (%s)\n", session->s_mds,
780                 ceph_mds_state_name(state));
781         session->s_renew_requested = jiffies;
782         msg = create_session_msg(CEPH_SESSION_REQUEST_RENEWCAPS,
783                                  ++session->s_renew_seq);
784         if (IS_ERR(msg))
785                 return PTR_ERR(msg);
786         ceph_con_send(&session->s_con, msg);
787         return 0;
788 }
789
790 /*
791  * Note new cap ttl, and any transition from stale -> not stale (fresh?).
792  */
793 static void renewed_caps(struct ceph_mds_client *mdsc,
794                          struct ceph_mds_session *session, int is_renew)
795 {
796         int was_stale;
797         int wake = 0;
798
799         spin_lock(&session->s_cap_lock);
800         was_stale = is_renew && (session->s_cap_ttl == 0 ||
801                                  time_after_eq(jiffies, session->s_cap_ttl));
802
803         session->s_cap_ttl = session->s_renew_requested +
804                 mdsc->mdsmap->m_session_timeout*HZ;
805
806         if (was_stale) {
807                 if (time_before(jiffies, session->s_cap_ttl)) {
808                         pr_info("mds%d caps renewed\n", session->s_mds);
809                         wake = 1;
810                 } else {
811                         pr_info("mds%d caps still stale\n", session->s_mds);
812                 }
813         }
814         dout("renewed_caps mds%d ttl now %lu, was %s, now %s\n",
815              session->s_mds, session->s_cap_ttl, was_stale ? "stale" : "fresh",
816              time_before(jiffies, session->s_cap_ttl) ? "stale" : "fresh");
817         spin_unlock(&session->s_cap_lock);
818
819         if (wake)
820                 wake_up_session_caps(session);
821 }
822
823 /*
824  * send a session close request
825  */
826 static int request_close_session(struct ceph_mds_client *mdsc,
827                                  struct ceph_mds_session *session)
828 {
829         struct ceph_msg *msg;
830         int err = 0;
831
832         dout("request_close_session mds%d state %s seq %lld\n",
833              session->s_mds, session_state_name(session->s_state),
834              session->s_seq);
835         msg = create_session_msg(CEPH_SESSION_REQUEST_CLOSE, session->s_seq);
836         if (IS_ERR(msg))
837                 err = PTR_ERR(msg);
838         else
839                 ceph_con_send(&session->s_con, msg);
840         return err;
841 }
842
843 /*
844  * Called with s_mutex held.
845  */
846 static int __close_session(struct ceph_mds_client *mdsc,
847                          struct ceph_mds_session *session)
848 {
849         if (session->s_state >= CEPH_MDS_SESSION_CLOSING)
850                 return 0;
851         session->s_state = CEPH_MDS_SESSION_CLOSING;
852         return request_close_session(mdsc, session);
853 }
854
855 /*
856  * Trim old(er) caps.
857  *
858  * Because we can't cache an inode without one or more caps, we do
859  * this indirectly: if a cap is unused, we prune its aliases, at which
860  * point the inode will hopefully get dropped to.
861  *
862  * Yes, this is a bit sloppy.  Our only real goal here is to respond to
863  * memory pressure from the MDS, though, so it needn't be perfect.
864  */
865 static int trim_caps_cb(struct inode *inode, struct ceph_cap *cap, void *arg)
866 {
867         struct ceph_mds_session *session = arg;
868         struct ceph_inode_info *ci = ceph_inode(inode);
869         int used, oissued, mine;
870
871         if (session->s_trim_caps <= 0)
872                 return -1;
873
874         spin_lock(&inode->i_lock);
875         mine = cap->issued | cap->implemented;
876         used = __ceph_caps_used(ci);
877         oissued = __ceph_caps_issued_other(ci, cap);
878
879         dout("trim_caps_cb %p cap %p mine %s oissued %s used %s\n",
880              inode, cap, ceph_cap_string(mine), ceph_cap_string(oissued),
881              ceph_cap_string(used));
882         if (ci->i_dirty_caps)
883                 goto out;   /* dirty caps */
884         if ((used & ~oissued) & mine)
885                 goto out;   /* we need these caps */
886
887         session->s_trim_caps--;
888         if (oissued) {
889                 /* we aren't the only cap.. just remove us */
890                 __ceph_remove_cap(cap, NULL);
891         } else {
892                 /* try to drop referring dentries */
893                 spin_unlock(&inode->i_lock);
894                 d_prune_aliases(inode);
895                 dout("trim_caps_cb %p cap %p  pruned, count now %d\n",
896                      inode, cap, atomic_read(&inode->i_count));
897                 return 0;
898         }
899
900 out:
901         spin_unlock(&inode->i_lock);
902         return 0;
903 }
904
905 /*
906  * Trim session cap count down to some max number.
907  */
908 static int trim_caps(struct ceph_mds_client *mdsc,
909                      struct ceph_mds_session *session,
910                      int max_caps)
911 {
912         int trim_caps = session->s_nr_caps - max_caps;
913
914         dout("trim_caps mds%d start: %d / %d, trim %d\n",
915              session->s_mds, session->s_nr_caps, max_caps, trim_caps);
916         if (trim_caps > 0) {
917                 session->s_trim_caps = trim_caps;
918                 iterate_session_caps(session, trim_caps_cb, session);
919                 dout("trim_caps mds%d done: %d / %d, trimmed %d\n",
920                      session->s_mds, session->s_nr_caps, max_caps,
921                         trim_caps - session->s_trim_caps);
922         }
923         return 0;
924 }
925
926 /*
927  * Allocate cap_release messages.  If there is a partially full message
928  * in the queue, try to allocate enough to cover it's remainder, so that
929  * we can send it immediately.
930  *
931  * Called under s_mutex.
932  */
933 static int add_cap_releases(struct ceph_mds_client *mdsc,
934                             struct ceph_mds_session *session,
935                             int extra)
936 {
937         struct ceph_msg *msg;
938         struct ceph_mds_cap_release *head;
939         int err = -ENOMEM;
940
941         if (extra < 0)
942                 extra = mdsc->client->mount_args->cap_release_safety;
943
944         spin_lock(&session->s_cap_lock);
945
946         if (!list_empty(&session->s_cap_releases)) {
947                 msg = list_first_entry(&session->s_cap_releases,
948                                        struct ceph_msg,
949                                  list_head);
950                 head = msg->front.iov_base;
951                 extra += CEPH_CAPS_PER_RELEASE - le32_to_cpu(head->num);
952         }
953
954         while (session->s_num_cap_releases < session->s_nr_caps + extra) {
955                 spin_unlock(&session->s_cap_lock);
956                 msg = ceph_msg_new(CEPH_MSG_CLIENT_CAPRELEASE, PAGE_CACHE_SIZE,
957                                    0, 0, NULL);
958                 if (!msg)
959                         goto out_unlocked;
960                 dout("add_cap_releases %p msg %p now %d\n", session, msg,
961                      (int)msg->front.iov_len);
962                 head = msg->front.iov_base;
963                 head->num = cpu_to_le32(0);
964                 msg->front.iov_len = sizeof(*head);
965                 spin_lock(&session->s_cap_lock);
966                 list_add(&msg->list_head, &session->s_cap_releases);
967                 session->s_num_cap_releases += CEPH_CAPS_PER_RELEASE;
968         }
969
970         if (!list_empty(&session->s_cap_releases)) {
971                 msg = list_first_entry(&session->s_cap_releases,
972                                        struct ceph_msg,
973                                        list_head);
974                 head = msg->front.iov_base;
975                 if (head->num) {
976                         dout(" queueing non-full %p (%d)\n", msg,
977                              le32_to_cpu(head->num));
978                         list_move_tail(&msg->list_head,
979                                       &session->s_cap_releases_done);
980                         session->s_num_cap_releases -=
981                                 CEPH_CAPS_PER_RELEASE - le32_to_cpu(head->num);
982                 }
983         }
984         err = 0;
985         spin_unlock(&session->s_cap_lock);
986 out_unlocked:
987         return err;
988 }
989
990 /*
991  * flush all dirty inode data to disk.
992  *
993  * returns true if we've flushed through want_flush_seq
994  */
995 static int check_cap_flush(struct ceph_mds_client *mdsc, u64 want_flush_seq)
996 {
997         int mds, ret = 1;
998
999         dout("check_cap_flush want %lld\n", want_flush_seq);
1000         mutex_lock(&mdsc->mutex);
1001         for (mds = 0; ret && mds < mdsc->max_sessions; mds++) {
1002                 struct ceph_mds_session *session = mdsc->sessions[mds];
1003
1004                 if (!session)
1005                         continue;
1006                 get_session(session);
1007                 mutex_unlock(&mdsc->mutex);
1008
1009                 mutex_lock(&session->s_mutex);
1010                 if (!list_empty(&session->s_cap_flushing)) {
1011                         struct ceph_inode_info *ci =
1012                                 list_entry(session->s_cap_flushing.next,
1013                                            struct ceph_inode_info,
1014                                            i_flushing_item);
1015                         struct inode *inode = &ci->vfs_inode;
1016
1017                         spin_lock(&inode->i_lock);
1018                         if (ci->i_cap_flush_seq <= want_flush_seq) {
1019                                 dout("check_cap_flush still flushing %p "
1020                                      "seq %lld <= %lld to mds%d\n", inode,
1021                                      ci->i_cap_flush_seq, want_flush_seq,
1022                                      session->s_mds);
1023                                 ret = 0;
1024                         }
1025                         spin_unlock(&inode->i_lock);
1026                 }
1027                 mutex_unlock(&session->s_mutex);
1028                 ceph_put_mds_session(session);
1029
1030                 if (!ret)
1031                         return ret;
1032                 mutex_lock(&mdsc->mutex);
1033         }
1034
1035         mutex_unlock(&mdsc->mutex);
1036         dout("check_cap_flush ok, flushed thru %lld\n", want_flush_seq);
1037         return ret;
1038 }
1039
1040 /*
1041  * called under s_mutex
1042  */
1043 static void send_cap_releases(struct ceph_mds_client *mdsc,
1044                        struct ceph_mds_session *session)
1045 {
1046         struct ceph_msg *msg;
1047
1048         dout("send_cap_releases mds%d\n", session->s_mds);
1049         while (1) {
1050                 spin_lock(&session->s_cap_lock);
1051                 if (list_empty(&session->s_cap_releases_done))
1052                         break;
1053                 msg = list_first_entry(&session->s_cap_releases_done,
1054                                  struct ceph_msg, list_head);
1055                 list_del_init(&msg->list_head);
1056                 spin_unlock(&session->s_cap_lock);
1057                 msg->hdr.front_len = cpu_to_le32(msg->front.iov_len);
1058                 dout("send_cap_releases mds%d %p\n", session->s_mds, msg);
1059                 ceph_con_send(&session->s_con, msg);
1060         }
1061         spin_unlock(&session->s_cap_lock);
1062 }
1063
1064 /*
1065  * requests
1066  */
1067
1068 /*
1069  * Create an mds request.
1070  */
1071 struct ceph_mds_request *
1072 ceph_mdsc_create_request(struct ceph_mds_client *mdsc, int op, int mode)
1073 {
1074         struct ceph_mds_request *req = kzalloc(sizeof(*req), GFP_NOFS);
1075
1076         if (!req)
1077                 return ERR_PTR(-ENOMEM);
1078
1079         req->r_started = jiffies;
1080         req->r_resend_mds = -1;
1081         INIT_LIST_HEAD(&req->r_unsafe_dir_item);
1082         req->r_fmode = -1;
1083         atomic_set(&req->r_ref, 1);  /* one for request_tree, one for caller */
1084         INIT_LIST_HEAD(&req->r_wait);
1085         init_completion(&req->r_completion);
1086         init_completion(&req->r_safe_completion);
1087         INIT_LIST_HEAD(&req->r_unsafe_item);
1088
1089         req->r_op = op;
1090         req->r_direct_mode = mode;
1091         return req;
1092 }
1093
1094 /*
1095  * return oldest (lowest) tid in request tree, 0 if none.
1096  *
1097  * called under mdsc->mutex.
1098  */
1099 static u64 __get_oldest_tid(struct ceph_mds_client *mdsc)
1100 {
1101         struct ceph_mds_request *first;
1102         if (radix_tree_gang_lookup(&mdsc->request_tree,
1103                                    (void **)&first, 0, 1) <= 0)
1104                 return 0;
1105         return first->r_tid;
1106 }
1107
1108 /*
1109  * Build a dentry's path.  Allocate on heap; caller must kfree.  Based
1110  * on build_path_from_dentry in fs/cifs/dir.c.
1111  *
1112  * If @stop_on_nosnap, generate path relative to the first non-snapped
1113  * inode.
1114  *
1115  * Encode hidden .snap dirs as a double /, i.e.
1116  *   foo/.snap/bar -> foo//bar
1117  */
1118 char *ceph_mdsc_build_path(struct dentry *dentry, int *plen, u64 *base,
1119                            int stop_on_nosnap)
1120 {
1121         struct dentry *temp;
1122         char *path;
1123         int len, pos;
1124
1125         if (dentry == NULL)
1126                 return ERR_PTR(-EINVAL);
1127
1128 retry:
1129         len = 0;
1130         for (temp = dentry; !IS_ROOT(temp);) {
1131                 struct inode *inode = temp->d_inode;
1132                 if (inode && ceph_snap(inode) == CEPH_SNAPDIR)
1133                         len++;  /* slash only */
1134                 else if (stop_on_nosnap && inode &&
1135                          ceph_snap(inode) == CEPH_NOSNAP)
1136                         break;
1137                 else
1138                         len += 1 + temp->d_name.len;
1139                 temp = temp->d_parent;
1140                 if (temp == NULL) {
1141                         pr_err("build_path_dentry corrupt dentry %p\n", dentry);
1142                         return ERR_PTR(-EINVAL);
1143                 }
1144         }
1145         if (len)
1146                 len--;  /* no leading '/' */
1147
1148         path = kmalloc(len+1, GFP_NOFS);
1149         if (path == NULL)
1150                 return ERR_PTR(-ENOMEM);
1151         pos = len;
1152         path[pos] = 0;  /* trailing null */
1153         for (temp = dentry; !IS_ROOT(temp) && pos != 0; ) {
1154                 struct inode *inode = temp->d_inode;
1155
1156                 if (inode && ceph_snap(inode) == CEPH_SNAPDIR) {
1157                         dout("build_path_dentry path+%d: %p SNAPDIR\n",
1158                              pos, temp);
1159                 } else if (stop_on_nosnap && inode &&
1160                            ceph_snap(inode) == CEPH_NOSNAP) {
1161                         break;
1162                 } else {
1163                         pos -= temp->d_name.len;
1164                         if (pos < 0)
1165                                 break;
1166                         strncpy(path + pos, temp->d_name.name,
1167                                 temp->d_name.len);
1168                         dout("build_path_dentry path+%d: %p '%.*s'\n",
1169                              pos, temp, temp->d_name.len, path + pos);
1170                 }
1171                 if (pos)
1172                         path[--pos] = '/';
1173                 temp = temp->d_parent;
1174                 if (temp == NULL) {
1175                         pr_err("build_path_dentry corrupt dentry\n");
1176                         kfree(path);
1177                         return ERR_PTR(-EINVAL);
1178                 }
1179         }
1180         if (pos != 0) {
1181                 pr_err("build_path_dentry did not end path lookup where "
1182                        "expected, namelen is %d, pos is %d\n", len, pos);
1183                 /* presumably this is only possible if racing with a
1184                    rename of one of the parent directories (we can not
1185                    lock the dentries above us to prevent this, but
1186                    retrying should be harmless) */
1187                 kfree(path);
1188                 goto retry;
1189         }
1190
1191         *base = ceph_ino(temp->d_inode);
1192         *plen = len;
1193         dout("build_path_dentry on %p %d built %llx '%.*s'\n",
1194              dentry, atomic_read(&dentry->d_count), *base, len, path);
1195         return path;
1196 }
1197
1198 static int build_dentry_path(struct dentry *dentry,
1199                              const char **ppath, int *ppathlen, u64 *pino,
1200                              int *pfreepath)
1201 {
1202         char *path;
1203
1204         if (ceph_snap(dentry->d_parent->d_inode) == CEPH_NOSNAP) {
1205                 *pino = ceph_ino(dentry->d_parent->d_inode);
1206                 *ppath = dentry->d_name.name;
1207                 *ppathlen = dentry->d_name.len;
1208                 return 0;
1209         }
1210         path = ceph_mdsc_build_path(dentry, ppathlen, pino, 1);
1211         if (IS_ERR(path))
1212                 return PTR_ERR(path);
1213         *ppath = path;
1214         *pfreepath = 1;
1215         return 0;
1216 }
1217
1218 static int build_inode_path(struct inode *inode,
1219                             const char **ppath, int *ppathlen, u64 *pino,
1220                             int *pfreepath)
1221 {
1222         struct dentry *dentry;
1223         char *path;
1224
1225         if (ceph_snap(inode) == CEPH_NOSNAP) {
1226                 *pino = ceph_ino(inode);
1227                 *ppathlen = 0;
1228                 return 0;
1229         }
1230         dentry = d_find_alias(inode);
1231         path = ceph_mdsc_build_path(dentry, ppathlen, pino, 1);
1232         dput(dentry);
1233         if (IS_ERR(path))
1234                 return PTR_ERR(path);
1235         *ppath = path;
1236         *pfreepath = 1;
1237         return 0;
1238 }
1239
1240 /*
1241  * request arguments may be specified via an inode *, a dentry *, or
1242  * an explicit ino+path.
1243  */
1244 static int set_request_path_attr(struct inode *rinode, struct dentry *rdentry,
1245                                   const char *rpath, u64 rino,
1246                                   const char **ppath, int *pathlen,
1247                                   u64 *ino, int *freepath)
1248 {
1249         int r = 0;
1250
1251         if (rinode) {
1252                 r = build_inode_path(rinode, ppath, pathlen, ino, freepath);
1253                 dout(" inode %p %llx.%llx\n", rinode, ceph_ino(rinode),
1254                      ceph_snap(rinode));
1255         } else if (rdentry) {
1256                 r = build_dentry_path(rdentry, ppath, pathlen, ino, freepath);
1257                 dout(" dentry %p %llx/%.*s\n", rdentry, *ino, *pathlen,
1258                      *ppath);
1259         } else if (rpath) {
1260                 *ino = rino;
1261                 *ppath = rpath;
1262                 *pathlen = strlen(rpath);
1263                 dout(" path %.*s\n", *pathlen, rpath);
1264         }
1265
1266         return r;
1267 }
1268
1269 /*
1270  * called under mdsc->mutex
1271  */
1272 static struct ceph_msg *create_request_message(struct ceph_mds_client *mdsc,
1273                                                struct ceph_mds_request *req,
1274                                                int mds)
1275 {
1276         struct ceph_msg *msg;
1277         struct ceph_mds_request_head *head;
1278         const char *path1 = NULL;
1279         const char *path2 = NULL;
1280         u64 ino1 = 0, ino2 = 0;
1281         int pathlen1 = 0, pathlen2 = 0;
1282         int freepath1 = 0, freepath2 = 0;
1283         int len;
1284         u16 releases;
1285         void *p, *end;
1286         int ret;
1287
1288         ret = set_request_path_attr(req->r_inode, req->r_dentry,
1289                               req->r_path1, req->r_ino1.ino,
1290                               &path1, &pathlen1, &ino1, &freepath1);
1291         if (ret < 0) {
1292                 msg = ERR_PTR(ret);
1293                 goto out;
1294         }
1295
1296         ret = set_request_path_attr(NULL, req->r_old_dentry,
1297                               req->r_path2, req->r_ino2.ino,
1298                               &path2, &pathlen2, &ino2, &freepath2);
1299         if (ret < 0) {
1300                 msg = ERR_PTR(ret);
1301                 goto out_free1;
1302         }
1303
1304         len = sizeof(*head) +
1305                 pathlen1 + pathlen2 + 2*(sizeof(u32) + sizeof(u64));
1306
1307         /* calculate (max) length for cap releases */
1308         len += sizeof(struct ceph_mds_request_release) *
1309                 (!!req->r_inode_drop + !!req->r_dentry_drop +
1310                  !!req->r_old_inode_drop + !!req->r_old_dentry_drop);
1311         if (req->r_dentry_drop)
1312                 len += req->r_dentry->d_name.len;
1313         if (req->r_old_dentry_drop)
1314                 len += req->r_old_dentry->d_name.len;
1315
1316         msg = ceph_msg_new(CEPH_MSG_CLIENT_REQUEST, len, 0, 0, NULL);
1317         if (IS_ERR(msg))
1318                 goto out_free2;
1319
1320         head = msg->front.iov_base;
1321         p = msg->front.iov_base + sizeof(*head);
1322         end = msg->front.iov_base + msg->front.iov_len;
1323
1324         head->mdsmap_epoch = cpu_to_le32(mdsc->mdsmap->m_epoch);
1325         head->op = cpu_to_le32(req->r_op);
1326         head->caller_uid = cpu_to_le32(current_fsuid());
1327         head->caller_gid = cpu_to_le32(current_fsgid());
1328         head->args = req->r_args;
1329
1330         ceph_encode_filepath(&p, end, ino1, path1);
1331         ceph_encode_filepath(&p, end, ino2, path2);
1332
1333         /* cap releases */
1334         releases = 0;
1335         if (req->r_inode_drop)
1336                 releases += ceph_encode_inode_release(&p,
1337                       req->r_inode ? req->r_inode : req->r_dentry->d_inode,
1338                       mds, req->r_inode_drop, req->r_inode_unless, 0);
1339         if (req->r_dentry_drop)
1340                 releases += ceph_encode_dentry_release(&p, req->r_dentry,
1341                        mds, req->r_dentry_drop, req->r_dentry_unless);
1342         if (req->r_old_dentry_drop)
1343                 releases += ceph_encode_dentry_release(&p, req->r_old_dentry,
1344                        mds, req->r_old_dentry_drop, req->r_old_dentry_unless);
1345         if (req->r_old_inode_drop)
1346                 releases += ceph_encode_inode_release(&p,
1347                       req->r_old_dentry->d_inode,
1348                       mds, req->r_old_inode_drop, req->r_old_inode_unless, 0);
1349         head->num_releases = cpu_to_le16(releases);
1350
1351         BUG_ON(p > end);
1352         msg->front.iov_len = p - msg->front.iov_base;
1353         msg->hdr.front_len = cpu_to_le32(msg->front.iov_len);
1354
1355         msg->pages = req->r_pages;
1356         msg->nr_pages = req->r_num_pages;
1357         msg->hdr.data_len = cpu_to_le32(req->r_data_len);
1358         msg->hdr.data_off = cpu_to_le16(0);
1359
1360 out_free2:
1361         if (freepath2)
1362                 kfree((char *)path2);
1363 out_free1:
1364         if (freepath1)
1365                 kfree((char *)path1);
1366 out:
1367         return msg;
1368 }
1369
1370 /*
1371  * called under mdsc->mutex if error, under no mutex if
1372  * success.
1373  */
1374 static void complete_request(struct ceph_mds_client *mdsc,
1375                              struct ceph_mds_request *req)
1376 {
1377         if (req->r_callback)
1378                 req->r_callback(mdsc, req);
1379         else
1380                 complete(&req->r_completion);
1381 }
1382
1383 /*
1384  * called under mdsc->mutex
1385  */
1386 static int __prepare_send_request(struct ceph_mds_client *mdsc,
1387                                   struct ceph_mds_request *req,
1388                                   int mds)
1389 {
1390         struct ceph_mds_request_head *rhead;
1391         struct ceph_msg *msg;
1392         int flags = 0;
1393
1394         req->r_mds = mds;
1395         req->r_attempts++;
1396         dout("prepare_send_request %p tid %lld %s (attempt %d)\n", req,
1397              req->r_tid, ceph_mds_op_name(req->r_op), req->r_attempts);
1398
1399         if (req->r_request) {
1400                 ceph_msg_put(req->r_request);
1401                 req->r_request = NULL;
1402         }
1403         msg = create_request_message(mdsc, req, mds);
1404         if (IS_ERR(msg)) {
1405                 req->r_reply = ERR_PTR(PTR_ERR(msg));
1406                 complete_request(mdsc, req);
1407                 return -PTR_ERR(msg);
1408         }
1409         req->r_request = msg;
1410
1411         rhead = msg->front.iov_base;
1412         rhead->tid = cpu_to_le64(req->r_tid);
1413         rhead->oldest_client_tid = cpu_to_le64(__get_oldest_tid(mdsc));
1414         if (req->r_got_unsafe)
1415                 flags |= CEPH_MDS_FLAG_REPLAY;
1416         if (req->r_locked_dir)
1417                 flags |= CEPH_MDS_FLAG_WANT_DENTRY;
1418         rhead->flags = cpu_to_le32(flags);
1419         rhead->num_fwd = req->r_num_fwd;
1420         rhead->num_retry = req->r_attempts - 1;
1421
1422         dout(" r_locked_dir = %p\n", req->r_locked_dir);
1423
1424         if (req->r_target_inode && req->r_got_unsafe)
1425                 rhead->ino = cpu_to_le64(ceph_ino(req->r_target_inode));
1426         else
1427                 rhead->ino = 0;
1428         return 0;
1429 }
1430
1431 /*
1432  * send request, or put it on the appropriate wait list.
1433  */
1434 static int __do_request(struct ceph_mds_client *mdsc,
1435                         struct ceph_mds_request *req)
1436 {
1437         struct ceph_mds_session *session = NULL;
1438         int mds = -1;
1439         int err = -EAGAIN;
1440
1441         if (req->r_reply)
1442                 goto out;
1443
1444         if (req->r_timeout &&
1445             time_after_eq(jiffies, req->r_started + req->r_timeout)) {
1446                 dout("do_request timed out\n");
1447                 err = -EIO;
1448                 goto finish;
1449         }
1450
1451         mds = __choose_mds(mdsc, req);
1452         if (mds < 0 ||
1453             ceph_mdsmap_get_state(mdsc->mdsmap, mds) < CEPH_MDS_STATE_ACTIVE) {
1454                 dout("do_request no mds or not active, waiting for map\n");
1455                 list_add(&req->r_wait, &mdsc->waiting_for_map);
1456                 goto out;
1457         }
1458
1459         /* get, open session */
1460         session = __ceph_lookup_mds_session(mdsc, mds);
1461         if (!session)
1462                 session = register_session(mdsc, mds);
1463         dout("do_request mds%d session %p state %s\n", mds, session,
1464              session_state_name(session->s_state));
1465         if (session->s_state != CEPH_MDS_SESSION_OPEN &&
1466             session->s_state != CEPH_MDS_SESSION_HUNG) {
1467                 if (session->s_state == CEPH_MDS_SESSION_NEW ||
1468                     session->s_state == CEPH_MDS_SESSION_CLOSING)
1469                         __open_session(mdsc, session);
1470                 list_add(&req->r_wait, &session->s_waiting);
1471                 goto out_session;
1472         }
1473
1474         /* send request */
1475         req->r_session = get_session(session);
1476         req->r_resend_mds = -1;   /* forget any previous mds hint */
1477
1478         if (req->r_request_started == 0)   /* note request start time */
1479                 req->r_request_started = jiffies;
1480
1481         err = __prepare_send_request(mdsc, req, mds);
1482         if (!err) {
1483                 ceph_msg_get(req->r_request);
1484                 ceph_con_send(&session->s_con, req->r_request);
1485         }
1486
1487 out_session:
1488         ceph_put_mds_session(session);
1489 out:
1490         return err;
1491
1492 finish:
1493         req->r_reply = ERR_PTR(err);
1494         complete_request(mdsc, req);
1495         goto out;
1496 }
1497
1498 /*
1499  * called under mdsc->mutex
1500  */
1501 static void __wake_requests(struct ceph_mds_client *mdsc,
1502                             struct list_head *head)
1503 {
1504         struct ceph_mds_request *req, *nreq;
1505
1506         list_for_each_entry_safe(req, nreq, head, r_wait) {
1507                 list_del_init(&req->r_wait);
1508                 __do_request(mdsc, req);
1509         }
1510 }
1511
1512 /*
1513  * Wake up threads with requests pending for @mds, so that they can
1514  * resubmit their requests to a possibly different mds.  If @all is set,
1515  * wake up if their requests has been forwarded to @mds, too.
1516  */
1517 static void kick_requests(struct ceph_mds_client *mdsc, int mds, int all)
1518 {
1519         struct ceph_mds_request *reqs[10];
1520         u64 nexttid = 0;
1521         int i, got;
1522
1523         dout("kick_requests mds%d\n", mds);
1524         while (nexttid <= mdsc->last_tid) {
1525                 got = radix_tree_gang_lookup(&mdsc->request_tree,
1526                                              (void **)&reqs, nexttid, 10);
1527                 if (got == 0)
1528                         break;
1529                 nexttid = reqs[got-1]->r_tid + 1;
1530                 for (i = 0; i < got; i++) {
1531                         if (reqs[i]->r_got_unsafe)
1532                                 continue;
1533                         if (reqs[i]->r_session &&
1534                             reqs[i]->r_session->s_mds == mds) {
1535                                 dout(" kicking tid %llu\n", reqs[i]->r_tid);
1536                                 put_request_session(reqs[i]);
1537                                 __do_request(mdsc, reqs[i]);
1538                         }
1539                 }
1540         }
1541 }
1542
1543 void ceph_mdsc_submit_request(struct ceph_mds_client *mdsc,
1544                               struct ceph_mds_request *req)
1545 {
1546         dout("submit_request on %p\n", req);
1547         mutex_lock(&mdsc->mutex);
1548         __register_request(mdsc, req, NULL);
1549         __do_request(mdsc, req);
1550         mutex_unlock(&mdsc->mutex);
1551 }
1552
1553 /*
1554  * Synchrously perform an mds request.  Take care of all of the
1555  * session setup, forwarding, retry details.
1556  */
1557 int ceph_mdsc_do_request(struct ceph_mds_client *mdsc,
1558                          struct inode *dir,
1559                          struct ceph_mds_request *req)
1560 {
1561         int err;
1562
1563         dout("do_request on %p\n", req);
1564
1565         /* take CAP_PIN refs for r_inode, r_locked_dir, r_old_dentry */
1566         if (req->r_inode)
1567                 ceph_get_cap_refs(ceph_inode(req->r_inode), CEPH_CAP_PIN);
1568         if (req->r_locked_dir)
1569                 ceph_get_cap_refs(ceph_inode(req->r_locked_dir), CEPH_CAP_PIN);
1570         if (req->r_old_dentry)
1571                 ceph_get_cap_refs(
1572                         ceph_inode(req->r_old_dentry->d_parent->d_inode),
1573                         CEPH_CAP_PIN);
1574
1575         /* issue */
1576         mutex_lock(&mdsc->mutex);
1577         __register_request(mdsc, req, dir);
1578         __do_request(mdsc, req);
1579
1580         /* wait */
1581         if (!req->r_reply) {
1582                 mutex_unlock(&mdsc->mutex);
1583                 if (req->r_timeout) {
1584                         err = wait_for_completion_timeout(&req->r_completion,
1585                                                           req->r_timeout);
1586                         if (err > 0)
1587                                 err = 0;
1588                         else if (err == 0)
1589                                 req->r_reply = ERR_PTR(-EIO);
1590                 } else {
1591                         wait_for_completion(&req->r_completion);
1592                 }
1593                 mutex_lock(&mdsc->mutex);
1594         }
1595
1596         if (IS_ERR(req->r_reply)) {
1597                 err = PTR_ERR(req->r_reply);
1598                 req->r_reply = NULL;
1599
1600                 /* clean up */
1601                 __unregister_request(mdsc, req);
1602                 if (!list_empty(&req->r_unsafe_item))
1603                         list_del_init(&req->r_unsafe_item);
1604                 complete(&req->r_safe_completion);
1605         } else if (req->r_err) {
1606                 err = req->r_err;
1607         } else {
1608                 err = le32_to_cpu(req->r_reply_info.head->result);
1609         }
1610         mutex_unlock(&mdsc->mutex);
1611
1612         dout("do_request %p done, result %d\n", req, err);
1613         return err;
1614 }
1615
1616 /*
1617  * Handle mds reply.
1618  *
1619  * We take the session mutex and parse and process the reply immediately.
1620  * This preserves the logical ordering of replies, capabilities, etc., sent
1621  * by the MDS as they are applied to our local cache.
1622  */
1623 static void handle_reply(struct ceph_mds_session *session, struct ceph_msg *msg)
1624 {
1625         struct ceph_mds_client *mdsc = session->s_mdsc;
1626         struct ceph_mds_request *req;
1627         struct ceph_mds_reply_head *head = msg->front.iov_base;
1628         struct ceph_mds_reply_info_parsed *rinfo;  /* parsed reply info */
1629         u64 tid;
1630         int err, result;
1631         int mds;
1632
1633         if (msg->hdr.src.name.type != CEPH_ENTITY_TYPE_MDS)
1634                 return;
1635         if (msg->front.iov_len < sizeof(*head)) {
1636                 pr_err("mdsc_handle_reply got corrupt (short) reply\n");
1637                 return;
1638         }
1639
1640         /* get request, session */
1641         tid = le64_to_cpu(head->tid);
1642         mutex_lock(&mdsc->mutex);
1643         req = __lookup_request(mdsc, tid);
1644         if (!req) {
1645                 dout("handle_reply on unknown tid %llu\n", tid);
1646                 mutex_unlock(&mdsc->mutex);
1647                 return;
1648         }
1649         dout("handle_reply %p\n", req);
1650         mds = le64_to_cpu(msg->hdr.src.name.num);
1651
1652         /* correct session? */
1653         if (!req->r_session && req->r_session != session) {
1654                 pr_err("mdsc_handle_reply got %llu on session mds%d"
1655                        " not mds%d\n", tid, session->s_mds,
1656                        req->r_session ? req->r_session->s_mds : -1);
1657                 mutex_unlock(&mdsc->mutex);
1658                 goto out;
1659         }
1660
1661         /* dup? */
1662         if ((req->r_got_unsafe && !head->safe) ||
1663             (req->r_got_safe && head->safe)) {
1664                 pr_warning("got a dup %s reply on %llu from mds%d\n",
1665                            head->safe ? "safe" : "unsafe", tid, mds);
1666                 mutex_unlock(&mdsc->mutex);
1667                 goto out;
1668         }
1669
1670         result = le32_to_cpu(head->result);
1671
1672         /*
1673          * Tolerate 2 consecutive ESTALEs from the same mds.
1674          * FIXME: we should be looking at the cap migrate_seq.
1675          */
1676         if (result == -ESTALE) {
1677                 req->r_direct_mode = USE_AUTH_MDS;
1678                 req->r_num_stale++;
1679                 if (req->r_num_stale <= 2) {
1680                         __do_request(mdsc, req);
1681                         mutex_unlock(&mdsc->mutex);
1682                         goto out;
1683                 }
1684         } else {
1685                 req->r_num_stale = 0;
1686         }
1687
1688         if (head->safe) {
1689                 req->r_got_safe = true;
1690                 __unregister_request(mdsc, req);
1691                 complete(&req->r_safe_completion);
1692
1693                 if (req->r_got_unsafe) {
1694                         /*
1695                          * We already handled the unsafe response, now do the
1696                          * cleanup.  No need to examine the response; the MDS
1697                          * doesn't include any result info in the safe
1698                          * response.  And even if it did, there is nothing
1699                          * useful we could do with a revised return value.
1700                          */
1701                         dout("got safe reply %llu, mds%d\n", tid, mds);
1702                         list_del_init(&req->r_unsafe_item);
1703
1704                         /* last unsafe request during umount? */
1705                         if (mdsc->stopping && !__get_oldest_tid(mdsc))
1706                                 complete(&mdsc->safe_umount_waiters);
1707                         mutex_unlock(&mdsc->mutex);
1708                         goto out;
1709                 }
1710         }
1711
1712         BUG_ON(req->r_reply);
1713
1714         if (!head->safe) {
1715                 req->r_got_unsafe = true;
1716                 list_add_tail(&req->r_unsafe_item, &req->r_session->s_unsafe);
1717         }
1718
1719         dout("handle_reply tid %lld result %d\n", tid, result);
1720         rinfo = &req->r_reply_info;
1721         err = parse_reply_info(msg, rinfo);
1722         mutex_unlock(&mdsc->mutex);
1723
1724         mutex_lock(&session->s_mutex);
1725         if (err < 0) {
1726                 pr_err("mdsc_handle_reply got corrupt reply mds%d\n", mds);
1727                 goto out_err;
1728         }
1729
1730         /* snap trace */
1731         if (rinfo->snapblob_len) {
1732                 down_write(&mdsc->snap_rwsem);
1733                 ceph_update_snap_trace(mdsc, rinfo->snapblob,
1734                                rinfo->snapblob + rinfo->snapblob_len,
1735                                le32_to_cpu(head->op) == CEPH_MDS_OP_RMSNAP);
1736                 downgrade_write(&mdsc->snap_rwsem);
1737         } else {
1738                 down_read(&mdsc->snap_rwsem);
1739         }
1740
1741         /* insert trace into our cache */
1742         err = ceph_fill_trace(mdsc->client->sb, req, req->r_session);
1743         if (err == 0) {
1744                 if (result == 0 && rinfo->dir_nr)
1745                         ceph_readdir_prepopulate(req, req->r_session);
1746                 ceph_unreserve_caps(&req->r_caps_reservation);
1747         }
1748
1749         up_read(&mdsc->snap_rwsem);
1750 out_err:
1751         if (err) {
1752                 req->r_err = err;
1753         } else {
1754                 req->r_reply = msg;
1755                 ceph_msg_get(msg);
1756         }
1757
1758         add_cap_releases(mdsc, req->r_session, -1);
1759         mutex_unlock(&session->s_mutex);
1760
1761         /* kick calling process */
1762         complete_request(mdsc, req);
1763 out:
1764         ceph_mdsc_put_request(req);
1765         return;
1766 }
1767
1768
1769
1770 /*
1771  * handle mds notification that our request has been forwarded.
1772  */
1773 static void handle_forward(struct ceph_mds_client *mdsc, struct ceph_msg *msg)
1774 {
1775         struct ceph_mds_request *req;
1776         u64 tid;
1777         u32 next_mds;
1778         u32 fwd_seq;
1779         u8 must_resend;
1780         int err = -EINVAL;
1781         void *p = msg->front.iov_base;
1782         void *end = p + msg->front.iov_len;
1783         int from_mds, state;
1784
1785         if (msg->hdr.src.name.type != CEPH_ENTITY_TYPE_MDS)
1786                 goto bad;
1787         from_mds = le64_to_cpu(msg->hdr.src.name.num);
1788
1789         ceph_decode_need(&p, end, sizeof(u64)+2*sizeof(u32), bad);
1790         tid = ceph_decode_64(&p);
1791         next_mds = ceph_decode_32(&p);
1792         fwd_seq = ceph_decode_32(&p);
1793         must_resend = ceph_decode_8(&p);
1794
1795         WARN_ON(must_resend);  /* shouldn't happen. */
1796
1797         mutex_lock(&mdsc->mutex);
1798         req = __lookup_request(mdsc, tid);
1799         if (!req) {
1800                 dout("forward %llu dne\n", tid);
1801                 goto out;  /* dup reply? */
1802         }
1803
1804         state = mdsc->sessions[next_mds]->s_state;
1805         if (fwd_seq <= req->r_num_fwd) {
1806                 dout("forward %llu to mds%d - old seq %d <= %d\n",
1807                      tid, next_mds, req->r_num_fwd, fwd_seq);
1808         } else {
1809                 /* resend. forward race not possible; mds would drop */
1810                 dout("forward %llu to mds%d (we resend)\n", tid, next_mds);
1811                 req->r_num_fwd = fwd_seq;
1812                 req->r_resend_mds = next_mds;
1813                 put_request_session(req);
1814                 __do_request(mdsc, req);
1815         }
1816         ceph_mdsc_put_request(req);
1817 out:
1818         mutex_unlock(&mdsc->mutex);
1819         return;
1820
1821 bad:
1822         pr_err("mdsc_handle_forward decode error err=%d\n", err);
1823 }
1824
1825 /*
1826  * handle a mds session control message
1827  */
1828 static void handle_session(struct ceph_mds_session *session,
1829                            struct ceph_msg *msg)
1830 {
1831         struct ceph_mds_client *mdsc = session->s_mdsc;
1832         u32 op;
1833         u64 seq;
1834         int mds;
1835         struct ceph_mds_session_head *h = msg->front.iov_base;
1836         int wake = 0;
1837
1838         if (msg->hdr.src.name.type != CEPH_ENTITY_TYPE_MDS)
1839                 return;
1840         mds = le64_to_cpu(msg->hdr.src.name.num);
1841
1842         /* decode */
1843         if (msg->front.iov_len != sizeof(*h))
1844                 goto bad;
1845         op = le32_to_cpu(h->op);
1846         seq = le64_to_cpu(h->seq);
1847
1848         mutex_lock(&mdsc->mutex);
1849         /* FIXME: this ttl calculation is generous */
1850         session->s_ttl = jiffies + HZ*mdsc->mdsmap->m_session_autoclose;
1851         mutex_unlock(&mdsc->mutex);
1852
1853         mutex_lock(&session->s_mutex);
1854
1855         dout("handle_session mds%d %s %p state %s seq %llu\n",
1856              mds, ceph_session_op_name(op), session,
1857              session_state_name(session->s_state), seq);
1858
1859         if (session->s_state == CEPH_MDS_SESSION_HUNG) {
1860                 session->s_state = CEPH_MDS_SESSION_OPEN;
1861                 pr_info("mds%d came back\n", session->s_mds);
1862         }
1863
1864         switch (op) {
1865         case CEPH_SESSION_OPEN:
1866                 session->s_state = CEPH_MDS_SESSION_OPEN;
1867                 renewed_caps(mdsc, session, 0);
1868                 wake = 1;
1869                 if (mdsc->stopping)
1870                         __close_session(mdsc, session);
1871                 break;
1872
1873         case CEPH_SESSION_RENEWCAPS:
1874                 if (session->s_renew_seq == seq)
1875                         renewed_caps(mdsc, session, 1);
1876                 break;
1877
1878         case CEPH_SESSION_CLOSE:
1879                 unregister_session(mdsc, session);
1880                 remove_session_caps(session);
1881                 wake = 1; /* for good measure */
1882                 complete(&mdsc->session_close_waiters);
1883                 kick_requests(mdsc, mds, 0);      /* cur only */
1884                 break;
1885
1886         case CEPH_SESSION_STALE:
1887                 pr_info("mds%d caps went stale, renewing\n",
1888                         session->s_mds);
1889                 spin_lock(&session->s_cap_lock);
1890                 session->s_cap_gen++;
1891                 session->s_cap_ttl = 0;
1892                 spin_unlock(&session->s_cap_lock);
1893                 send_renew_caps(mdsc, session);
1894                 break;
1895
1896         case CEPH_SESSION_RECALL_STATE:
1897                 trim_caps(mdsc, session, le32_to_cpu(h->max_caps));
1898                 break;
1899
1900         default:
1901                 pr_err("mdsc_handle_session bad op %d mds%d\n", op, mds);
1902                 WARN_ON(1);
1903         }
1904
1905         mutex_unlock(&session->s_mutex);
1906         if (wake) {
1907                 mutex_lock(&mdsc->mutex);
1908                 __wake_requests(mdsc, &session->s_waiting);
1909                 mutex_unlock(&mdsc->mutex);
1910         }
1911         return;
1912
1913 bad:
1914         pr_err("mdsc_handle_session corrupt message mds%d len %d\n", mds,
1915                (int)msg->front.iov_len);
1916         return;
1917 }
1918
1919
1920 /*
1921  * called under session->mutex.
1922  */
1923 static void replay_unsafe_requests(struct ceph_mds_client *mdsc,
1924                                    struct ceph_mds_session *session)
1925 {
1926         struct ceph_mds_request *req, *nreq;
1927         int err;
1928
1929         dout("replay_unsafe_requests mds%d\n", session->s_mds);
1930
1931         mutex_lock(&mdsc->mutex);
1932         list_for_each_entry_safe(req, nreq, &session->s_unsafe, r_unsafe_item) {
1933                 err = __prepare_send_request(mdsc, req, session->s_mds);
1934                 if (!err) {
1935                         ceph_msg_get(req->r_request);
1936                         ceph_con_send(&session->s_con, req->r_request);
1937                 }
1938         }
1939         mutex_unlock(&mdsc->mutex);
1940 }
1941
1942 /*
1943  * Encode information about a cap for a reconnect with the MDS.
1944  */
1945 struct encode_caps_data {
1946         void **pp;
1947         void *end;
1948         int *num_caps;
1949 };
1950
1951 static int encode_caps_cb(struct inode *inode, struct ceph_cap *cap,
1952                           void *arg)
1953 {
1954         struct ceph_mds_cap_reconnect *rec;
1955         struct ceph_inode_info *ci;
1956         struct encode_caps_data *data = (struct encode_caps_data *)arg;
1957         void *p = *(data->pp);
1958         void *end = data->end;
1959         char *path;
1960         int pathlen, err;
1961         u64 pathbase;
1962         struct dentry *dentry;
1963
1964         ci = cap->ci;
1965
1966         dout(" adding %p ino %llx.%llx cap %p %lld %s\n",
1967              inode, ceph_vinop(inode), cap, cap->cap_id,
1968              ceph_cap_string(cap->issued));
1969         ceph_decode_need(&p, end, sizeof(u64), needmore);
1970         ceph_encode_64(&p, ceph_ino(inode));
1971
1972         dentry = d_find_alias(inode);
1973         if (dentry) {
1974                 path = ceph_mdsc_build_path(dentry, &pathlen, &pathbase, 0);
1975                 if (IS_ERR(path)) {
1976                         err = PTR_ERR(path);
1977                         BUG_ON(err);
1978                 }
1979         } else {
1980                 path = NULL;
1981                 pathlen = 0;
1982         }
1983         ceph_decode_need(&p, end, pathlen+4, needmore);
1984         ceph_encode_string(&p, end, path, pathlen);
1985
1986         ceph_decode_need(&p, end, sizeof(*rec), needmore);
1987         rec = p;
1988         p += sizeof(*rec);
1989         BUG_ON(p > end);
1990         spin_lock(&inode->i_lock);
1991         cap->seq = 0;        /* reset cap seq */
1992         cap->issue_seq = 0;  /* and issue_seq */
1993         rec->cap_id = cpu_to_le64(cap->cap_id);
1994         rec->pathbase = cpu_to_le64(pathbase);
1995         rec->wanted = cpu_to_le32(__ceph_caps_wanted(ci));
1996         rec->issued = cpu_to_le32(cap->issued);
1997         rec->size = cpu_to_le64(inode->i_size);
1998         ceph_encode_timespec(&rec->mtime, &inode->i_mtime);
1999         ceph_encode_timespec(&rec->atime, &inode->i_atime);
2000         rec->snaprealm = cpu_to_le64(ci->i_snap_realm->ino);
2001         spin_unlock(&inode->i_lock);
2002
2003         kfree(path);
2004         dput(dentry);
2005         (*data->num_caps)++;
2006         *(data->pp) = p;
2007         return 0;
2008 needmore:
2009         return -ENOSPC;
2010 }
2011
2012
2013 /*
2014  * If an MDS fails and recovers, clients need to reconnect in order to
2015  * reestablish shared state.  This includes all caps issued through
2016  * this session _and_ the snap_realm hierarchy.  Because it's not
2017  * clear which snap realms the mds cares about, we send everything we
2018  * know about.. that ensures we'll then get any new info the
2019  * recovering MDS might have.
2020  *
2021  * This is a relatively heavyweight operation, but it's rare.
2022  *
2023  * called with mdsc->mutex held.
2024  */
2025 static void send_mds_reconnect(struct ceph_mds_client *mdsc, int mds)
2026 {
2027         struct ceph_mds_session *session;
2028         struct ceph_msg *reply;
2029         int newlen, len = 4 + 1;
2030         void *p, *end;
2031         int err;
2032         int num_caps, num_realms = 0;
2033         int got;
2034         u64 next_snap_ino = 0;
2035         __le32 *pnum_caps, *pnum_realms;
2036         struct encode_caps_data iter_args;
2037
2038         pr_info("reconnect to recovering mds%d\n", mds);
2039
2040         /* find session */
2041         session = __ceph_lookup_mds_session(mdsc, mds);
2042         mutex_unlock(&mdsc->mutex);    /* drop lock for duration */
2043
2044         if (session) {
2045                 mutex_lock(&session->s_mutex);
2046
2047                 session->s_state = CEPH_MDS_SESSION_RECONNECTING;
2048                 session->s_seq = 0;
2049
2050                 ceph_con_open(&session->s_con,
2051                               ceph_mdsmap_get_addr(mdsc->mdsmap, mds));
2052
2053                 /* replay unsafe requests */
2054                 replay_unsafe_requests(mdsc, session);
2055
2056                 /* estimate needed space */
2057                 len += session->s_nr_caps *
2058                         (100+sizeof(struct ceph_mds_cap_reconnect));
2059                 pr_info("estimating i need %d bytes for %d caps\n",
2060                      len, session->s_nr_caps);
2061         } else {
2062                 dout("no session for mds%d, will send short reconnect\n",
2063                      mds);
2064         }
2065
2066         down_read(&mdsc->snap_rwsem);
2067
2068 retry:
2069         /* build reply */
2070         reply = ceph_msg_new(CEPH_MSG_CLIENT_RECONNECT, len, 0, 0, NULL);
2071         if (IS_ERR(reply)) {
2072                 err = PTR_ERR(reply);
2073                 pr_err("send_mds_reconnect ENOMEM on %d for mds%d\n",
2074                        len, mds);
2075                 goto out;
2076         }
2077         p = reply->front.iov_base;
2078         end = p + len;
2079
2080         if (!session) {
2081                 ceph_encode_8(&p, 1); /* session was closed */
2082                 ceph_encode_32(&p, 0);
2083                 goto send;
2084         }
2085         dout("session %p state %s\n", session,
2086              session_state_name(session->s_state));
2087
2088         /* traverse this session's caps */
2089         ceph_encode_8(&p, 0);
2090         pnum_caps = p;
2091         ceph_encode_32(&p, session->s_nr_caps);
2092         num_caps = 0;
2093
2094         iter_args.pp = &p;
2095         iter_args.end = end;
2096         iter_args.num_caps = &num_caps;
2097         err = iterate_session_caps(session, encode_caps_cb, &iter_args);
2098         if (err == -ENOSPC)
2099                 goto needmore;
2100         if (err < 0)
2101                 goto out;
2102         *pnum_caps = cpu_to_le32(num_caps);
2103
2104         /*
2105          * snaprealms.  we provide mds with the ino, seq (version), and
2106          * parent for all of our realms.  If the mds has any newer info,
2107          * it will tell us.
2108          */
2109         next_snap_ino = 0;
2110         /* save some space for the snaprealm count */
2111         pnum_realms = p;
2112         ceph_decode_need(&p, end, sizeof(*pnum_realms), needmore);
2113         p += sizeof(*pnum_realms);
2114         num_realms = 0;
2115         while (1) {
2116                 struct ceph_snap_realm *realm;
2117                 struct ceph_mds_snaprealm_reconnect *sr_rec;
2118                 got = radix_tree_gang_lookup(&mdsc->snap_realms,
2119                                              (void **)&realm, next_snap_ino, 1);
2120                 if (!got)
2121                         break;
2122
2123                 dout(" adding snap realm %llx seq %lld parent %llx\n",
2124                      realm->ino, realm->seq, realm->parent_ino);
2125                 ceph_decode_need(&p, end, sizeof(*sr_rec), needmore);
2126                 sr_rec = p;
2127                 sr_rec->ino = cpu_to_le64(realm->ino);
2128                 sr_rec->seq = cpu_to_le64(realm->seq);
2129                 sr_rec->parent = cpu_to_le64(realm->parent_ino);
2130                 p += sizeof(*sr_rec);
2131                 num_realms++;
2132                 next_snap_ino = realm->ino + 1;
2133         }
2134         *pnum_realms = cpu_to_le32(num_realms);
2135
2136 send:
2137         reply->front.iov_len = p - reply->front.iov_base;
2138         reply->hdr.front_len = cpu_to_le32(reply->front.iov_len);
2139         dout("final len was %u (guessed %d)\n",
2140              (unsigned)reply->front.iov_len, len);
2141         ceph_con_send(&session->s_con, reply);
2142
2143         if (session) {
2144                 session->s_state = CEPH_MDS_SESSION_OPEN;
2145                 __wake_requests(mdsc, &session->s_waiting);
2146         }
2147
2148 out:
2149         up_read(&mdsc->snap_rwsem);
2150         if (session) {
2151                 mutex_unlock(&session->s_mutex);
2152                 ceph_put_mds_session(session);
2153         }
2154         mutex_lock(&mdsc->mutex);
2155         return;
2156
2157 needmore:
2158         /*
2159          * we need a larger buffer.  this doesn't very accurately
2160          * factor in snap realms, but it's safe.
2161          */
2162         num_caps += num_realms;
2163         newlen = len * ((100 * (session->s_nr_caps+3)) / (num_caps + 1)) / 100;
2164         pr_info("i guessed %d, and did %d of %d caps, retrying with %d\n",
2165              len, num_caps, session->s_nr_caps, newlen);
2166         len = newlen;
2167         ceph_msg_put(reply);
2168         goto retry;
2169 }
2170
2171
2172 /*
2173  * compare old and new mdsmaps, kicking requests
2174  * and closing out old connections as necessary
2175  *
2176  * called under mdsc->mutex.
2177  */
2178 static void check_new_map(struct ceph_mds_client *mdsc,
2179                           struct ceph_mdsmap *newmap,
2180                           struct ceph_mdsmap *oldmap)
2181 {
2182         int i;
2183         int oldstate, newstate;
2184         struct ceph_mds_session *s;
2185
2186         dout("check_new_map new %u old %u\n",
2187              newmap->m_epoch, oldmap->m_epoch);
2188
2189         for (i = 0; i < oldmap->m_max_mds && i < mdsc->max_sessions; i++) {
2190                 if (mdsc->sessions[i] == NULL)
2191                         continue;
2192                 s = mdsc->sessions[i];
2193                 oldstate = ceph_mdsmap_get_state(oldmap, i);
2194                 newstate = ceph_mdsmap_get_state(newmap, i);
2195
2196                 dout("check_new_map mds%d state %s -> %s (session %s)\n",
2197                      i, ceph_mds_state_name(oldstate),
2198                      ceph_mds_state_name(newstate),
2199                      session_state_name(s->s_state));
2200
2201                 if (memcmp(ceph_mdsmap_get_addr(oldmap, i),
2202                            ceph_mdsmap_get_addr(newmap, i),
2203                            sizeof(struct ceph_entity_addr))) {
2204                         if (s->s_state == CEPH_MDS_SESSION_OPENING) {
2205                                 /* the session never opened, just close it
2206                                  * out now */
2207                                 __wake_requests(mdsc, &s->s_waiting);
2208                                 unregister_session(mdsc, s);
2209                         } else {
2210                                 /* just close it */
2211                                 mutex_unlock(&mdsc->mutex);
2212                                 mutex_lock(&s->s_mutex);
2213                                 mutex_lock(&mdsc->mutex);
2214                                 ceph_con_close(&s->s_con);
2215                                 mutex_unlock(&s->s_mutex);
2216                                 s->s_state = CEPH_MDS_SESSION_RESTARTING;
2217                         }
2218
2219                         /* kick any requests waiting on the recovering mds */
2220                         kick_requests(mdsc, i, 1);
2221                 } else if (oldstate == newstate) {
2222                         continue;  /* nothing new with this mds */
2223                 }
2224
2225                 /*
2226                  * send reconnect?
2227                  */
2228                 if (s->s_state == CEPH_MDS_SESSION_RESTARTING &&
2229                     newstate >= CEPH_MDS_STATE_RECONNECT)
2230                         send_mds_reconnect(mdsc, i);
2231
2232                 /*
2233                  * kick requests on any mds that has gone active.
2234                  *
2235                  * kick requests on cur or forwarder: we may have sent
2236                  * the request to mds1, mds1 told us it forwarded it
2237                  * to mds2, but then we learn mds1 failed and can't be
2238                  * sure it successfully forwarded our request before
2239                  * it died.
2240                  */
2241                 if (oldstate < CEPH_MDS_STATE_ACTIVE &&
2242                     newstate >= CEPH_MDS_STATE_ACTIVE) {
2243                         pr_info("mds%d reconnect completed\n", s->s_mds);
2244                         kick_requests(mdsc, i, 1);
2245                         ceph_kick_flushing_caps(mdsc, s);
2246                 }
2247         }
2248 }
2249
2250
2251
2252 /*
2253  * leases
2254  */
2255
2256 /*
2257  * caller must hold session s_mutex, dentry->d_lock
2258  */
2259 void __ceph_mdsc_drop_dentry_lease(struct dentry *dentry)
2260 {
2261         struct ceph_dentry_info *di = ceph_dentry(dentry);
2262
2263         ceph_put_mds_session(di->lease_session);
2264         di->lease_session = NULL;
2265 }
2266
2267 static void handle_lease(struct ceph_mds_client *mdsc, struct ceph_msg *msg)
2268 {
2269         struct super_block *sb = mdsc->client->sb;
2270         struct inode *inode;
2271         struct ceph_mds_session *session;
2272         struct ceph_inode_info *ci;
2273         struct dentry *parent, *dentry;
2274         struct ceph_dentry_info *di;
2275         int mds;
2276         struct ceph_mds_lease *h = msg->front.iov_base;
2277         struct ceph_vino vino;
2278         int mask;
2279         struct qstr dname;
2280         int release = 0;
2281
2282         if (msg->hdr.src.name.type != CEPH_ENTITY_TYPE_MDS)
2283                 return;
2284         mds = le64_to_cpu(msg->hdr.src.name.num);
2285         dout("handle_lease from mds%d\n", mds);
2286
2287         /* decode */
2288         if (msg->front.iov_len < sizeof(*h) + sizeof(u32))
2289                 goto bad;
2290         vino.ino = le64_to_cpu(h->ino);
2291         vino.snap = CEPH_NOSNAP;
2292         mask = le16_to_cpu(h->mask);
2293         dname.name = (void *)h + sizeof(*h) + sizeof(u32);
2294         dname.len = msg->front.iov_len - sizeof(*h) - sizeof(u32);
2295         if (dname.len != get_unaligned_le32(h+1))
2296                 goto bad;
2297
2298         /* find session */
2299         mutex_lock(&mdsc->mutex);
2300         session = __ceph_lookup_mds_session(mdsc, mds);
2301         mutex_unlock(&mdsc->mutex);
2302         if (!session) {
2303                 pr_err("handle_lease got lease but no session mds%d\n", mds);
2304                 return;
2305         }
2306
2307         mutex_lock(&session->s_mutex);
2308         session->s_seq++;
2309
2310         /* lookup inode */
2311         inode = ceph_find_inode(sb, vino);
2312         dout("handle_lease '%s', mask %d, ino %llx %p\n",
2313              ceph_lease_op_name(h->action), mask, vino.ino, inode);
2314         if (inode == NULL) {
2315                 dout("handle_lease no inode %llx\n", vino.ino);
2316                 goto release;
2317         }
2318         ci = ceph_inode(inode);
2319
2320         /* dentry */
2321         parent = d_find_alias(inode);
2322         if (!parent) {
2323                 dout("no parent dentry on inode %p\n", inode);
2324                 WARN_ON(1);
2325                 goto release;  /* hrm... */
2326         }
2327         dname.hash = full_name_hash(dname.name, dname.len);
2328         dentry = d_lookup(parent, &dname);
2329         dput(parent);
2330         if (!dentry)
2331                 goto release;
2332
2333         spin_lock(&dentry->d_lock);
2334         di = ceph_dentry(dentry);
2335         switch (h->action) {
2336         case CEPH_MDS_LEASE_REVOKE:
2337                 if (di && di->lease_session == session) {
2338                         h->seq = cpu_to_le32(di->lease_seq);
2339                         __ceph_mdsc_drop_dentry_lease(dentry);
2340                 }
2341                 release = 1;
2342                 break;
2343
2344         case CEPH_MDS_LEASE_RENEW:
2345                 if (di && di->lease_session == session &&
2346                     di->lease_gen == session->s_cap_gen &&
2347                     di->lease_renew_from &&
2348                     di->lease_renew_after == 0) {
2349                         unsigned long duration =
2350                                 le32_to_cpu(h->duration_ms) * HZ / 1000;
2351
2352                         di->lease_seq = le32_to_cpu(h->seq);
2353                         dentry->d_time = di->lease_renew_from + duration;
2354                         di->lease_renew_after = di->lease_renew_from +
2355                                 (duration >> 1);
2356                         di->lease_renew_from = 0;
2357                 }
2358                 break;
2359         }
2360         spin_unlock(&dentry->d_lock);
2361         dput(dentry);
2362
2363         if (!release)
2364                 goto out;
2365
2366 release:
2367         /* let's just reuse the same message */
2368         h->action = CEPH_MDS_LEASE_REVOKE_ACK;
2369         ceph_msg_get(msg);
2370         ceph_con_send(&session->s_con, msg);
2371
2372 out:
2373         iput(inode);
2374         mutex_unlock(&session->s_mutex);
2375         ceph_put_mds_session(session);
2376         return;
2377
2378 bad:
2379         pr_err("corrupt lease message\n");
2380 }
2381
2382 void ceph_mdsc_lease_send_msg(struct ceph_mds_session *session,
2383                               struct inode *inode,
2384                               struct dentry *dentry, char action,
2385                               u32 seq)
2386 {
2387         struct ceph_msg *msg;
2388         struct ceph_mds_lease *lease;
2389         int len = sizeof(*lease) + sizeof(u32);
2390         int dnamelen = 0;
2391
2392         dout("lease_send_msg inode %p dentry %p %s to mds%d\n",
2393              inode, dentry, ceph_lease_op_name(action), session->s_mds);
2394         dnamelen = dentry->d_name.len;
2395         len += dnamelen;
2396
2397         msg = ceph_msg_new(CEPH_MSG_CLIENT_LEASE, len, 0, 0, NULL);
2398         if (IS_ERR(msg))
2399                 return;
2400         lease = msg->front.iov_base;
2401         lease->action = action;
2402         lease->mask = cpu_to_le16(CEPH_LOCK_DN);
2403         lease->ino = cpu_to_le64(ceph_vino(inode).ino);
2404         lease->first = lease->last = cpu_to_le64(ceph_vino(inode).snap);
2405         lease->seq = cpu_to_le32(seq);
2406         put_unaligned_le32(dnamelen, lease + 1);
2407         memcpy((void *)(lease + 1) + 4, dentry->d_name.name, dnamelen);
2408
2409         /*
2410          * if this is a preemptive lease RELEASE, no need to
2411          * flush request stream, since the actual request will
2412          * soon follow.
2413          */
2414         msg->more_to_follow = (action == CEPH_MDS_LEASE_RELEASE);
2415
2416         ceph_con_send(&session->s_con, msg);
2417 }
2418
2419 /*
2420  * Preemptively release a lease we expect to invalidate anyway.
2421  * Pass @inode always, @dentry is optional.
2422  */
2423 void ceph_mdsc_lease_release(struct ceph_mds_client *mdsc, struct inode *inode,
2424                              struct dentry *dentry, int mask)
2425 {
2426         struct ceph_dentry_info *di;
2427         struct ceph_mds_session *session;
2428         u32 seq;
2429
2430         BUG_ON(inode == NULL);
2431         BUG_ON(dentry == NULL);
2432         BUG_ON(mask != CEPH_LOCK_DN);
2433
2434         /* is dentry lease valid? */
2435         spin_lock(&dentry->d_lock);
2436         di = ceph_dentry(dentry);
2437         if (!di || !di->lease_session ||
2438             di->lease_session->s_mds < 0 ||
2439             di->lease_gen != di->lease_session->s_cap_gen ||
2440             !time_before(jiffies, dentry->d_time)) {
2441                 dout("lease_release inode %p dentry %p -- "
2442                      "no lease on %d\n",
2443                      inode, dentry, mask);
2444                 spin_unlock(&dentry->d_lock);
2445                 return;
2446         }
2447
2448         /* we do have a lease on this dentry; note mds and seq */
2449         session = ceph_get_mds_session(di->lease_session);
2450         seq = di->lease_seq;
2451         __ceph_mdsc_drop_dentry_lease(dentry);
2452         spin_unlock(&dentry->d_lock);
2453
2454         dout("lease_release inode %p dentry %p mask %d to mds%d\n",
2455              inode, dentry, mask, session->s_mds);
2456         ceph_mdsc_lease_send_msg(session, inode, dentry,
2457                                  CEPH_MDS_LEASE_RELEASE, seq);
2458         ceph_put_mds_session(session);
2459 }
2460
2461 /*
2462  * drop all leases (and dentry refs) in preparation for umount
2463  */
2464 static void drop_leases(struct ceph_mds_client *mdsc)
2465 {
2466         int i;
2467
2468         dout("drop_leases\n");
2469         mutex_lock(&mdsc->mutex);
2470         for (i = 0; i < mdsc->max_sessions; i++) {
2471                 struct ceph_mds_session *s = __ceph_lookup_mds_session(mdsc, i);
2472                 if (!s)
2473                         continue;
2474                 mutex_unlock(&mdsc->mutex);
2475                 mutex_lock(&s->s_mutex);
2476                 mutex_unlock(&s->s_mutex);
2477                 ceph_put_mds_session(s);
2478                 mutex_lock(&mdsc->mutex);
2479         }
2480         mutex_unlock(&mdsc->mutex);
2481 }
2482
2483
2484
2485 /*
2486  * delayed work -- periodically trim expired leases, renew caps with mds
2487  */
2488 static void schedule_delayed(struct ceph_mds_client *mdsc)
2489 {
2490         int delay = 5;
2491         unsigned hz = round_jiffies_relative(HZ * delay);
2492         schedule_delayed_work(&mdsc->delayed_work, hz);
2493 }
2494
2495 static void delayed_work(struct work_struct *work)
2496 {
2497         int i;
2498         struct ceph_mds_client *mdsc =
2499                 container_of(work, struct ceph_mds_client, delayed_work.work);
2500         int renew_interval;
2501         int renew_caps;
2502
2503         dout("mdsc delayed_work\n");
2504         ceph_check_delayed_caps(mdsc);
2505
2506         mutex_lock(&mdsc->mutex);
2507         renew_interval = mdsc->mdsmap->m_session_timeout >> 2;
2508         renew_caps = time_after_eq(jiffies, HZ*renew_interval +
2509                                    mdsc->last_renew_caps);
2510         if (renew_caps)
2511                 mdsc->last_renew_caps = jiffies;
2512
2513         for (i = 0; i < mdsc->max_sessions; i++) {
2514                 struct ceph_mds_session *s = __ceph_lookup_mds_session(mdsc, i);
2515                 if (s == NULL)
2516                         continue;
2517                 if (s->s_state == CEPH_MDS_SESSION_CLOSING) {
2518                         dout("resending session close request for mds%d\n",
2519                              s->s_mds);
2520                         request_close_session(mdsc, s);
2521                         ceph_put_mds_session(s);
2522                         continue;
2523                 }
2524                 if (s->s_ttl && time_after(jiffies, s->s_ttl)) {
2525                         if (s->s_state == CEPH_MDS_SESSION_OPEN) {
2526                                 s->s_state = CEPH_MDS_SESSION_HUNG;
2527                                 pr_info("mds%d hung\n", s->s_mds);
2528                         }
2529                 }
2530                 if (s->s_state < CEPH_MDS_SESSION_OPEN) {
2531                         /* this mds is failed or recovering, just wait */
2532                         ceph_put_mds_session(s);
2533                         continue;
2534                 }
2535                 mutex_unlock(&mdsc->mutex);
2536
2537                 mutex_lock(&s->s_mutex);
2538                 if (renew_caps)
2539                         send_renew_caps(mdsc, s);
2540                 else
2541                         ceph_con_keepalive(&s->s_con);
2542                 add_cap_releases(mdsc, s, -1);
2543                 send_cap_releases(mdsc, s);
2544                 mutex_unlock(&s->s_mutex);
2545                 ceph_put_mds_session(s);
2546
2547                 mutex_lock(&mdsc->mutex);
2548         }
2549         mutex_unlock(&mdsc->mutex);
2550
2551         schedule_delayed(mdsc);
2552 }
2553
2554
2555 int ceph_mdsc_init(struct ceph_mds_client *mdsc, struct ceph_client *client)
2556 {
2557         mdsc->client = client;
2558         mutex_init(&mdsc->mutex);
2559         mdsc->mdsmap = kzalloc(sizeof(*mdsc->mdsmap), GFP_NOFS);
2560         init_completion(&mdsc->safe_umount_waiters);
2561         init_completion(&mdsc->session_close_waiters);
2562         INIT_LIST_HEAD(&mdsc->waiting_for_map);
2563         mdsc->sessions = NULL;
2564         mdsc->max_sessions = 0;
2565         mdsc->stopping = 0;
2566         init_rwsem(&mdsc->snap_rwsem);
2567         INIT_RADIX_TREE(&mdsc->snap_realms, GFP_NOFS);
2568         INIT_LIST_HEAD(&mdsc->snap_empty);
2569         spin_lock_init(&mdsc->snap_empty_lock);
2570         mdsc->last_tid = 0;
2571         INIT_RADIX_TREE(&mdsc->request_tree, GFP_NOFS);
2572         INIT_DELAYED_WORK(&mdsc->delayed_work, delayed_work);
2573         mdsc->last_renew_caps = jiffies;
2574         INIT_LIST_HEAD(&mdsc->cap_delay_list);
2575         spin_lock_init(&mdsc->cap_delay_lock);
2576         INIT_LIST_HEAD(&mdsc->snap_flush_list);
2577         spin_lock_init(&mdsc->snap_flush_lock);
2578         mdsc->cap_flush_seq = 0;
2579         INIT_LIST_HEAD(&mdsc->cap_dirty);
2580         mdsc->num_cap_flushing = 0;
2581         spin_lock_init(&mdsc->cap_dirty_lock);
2582         init_waitqueue_head(&mdsc->cap_flushing_wq);
2583         spin_lock_init(&mdsc->dentry_lru_lock);
2584         INIT_LIST_HEAD(&mdsc->dentry_lru);
2585         return 0;
2586 }
2587
2588 /*
2589  * Wait for safe replies on open mds requests.  If we time out, drop
2590  * all requests from the tree to avoid dangling dentry refs.
2591  */
2592 static void wait_requests(struct ceph_mds_client *mdsc)
2593 {
2594         struct ceph_mds_request *req;
2595         struct ceph_client *client = mdsc->client;
2596
2597         mutex_lock(&mdsc->mutex);
2598         if (__get_oldest_tid(mdsc)) {
2599                 mutex_unlock(&mdsc->mutex);
2600                 dout("wait_requests waiting for requests\n");
2601                 wait_for_completion_timeout(&mdsc->safe_umount_waiters,
2602                                     client->mount_args->mount_timeout * HZ);
2603                 mutex_lock(&mdsc->mutex);
2604
2605                 /* tear down remaining requests */
2606                 while (radix_tree_gang_lookup(&mdsc->request_tree,
2607                                               (void **)&req, 0, 1)) {
2608                         dout("wait_requests timed out on tid %llu\n",
2609                              req->r_tid);
2610                         radix_tree_delete(&mdsc->request_tree, req->r_tid);
2611                         ceph_mdsc_put_request(req);
2612                 }
2613         }
2614         mutex_unlock(&mdsc->mutex);
2615         dout("wait_requests done\n");
2616 }
2617
2618 /*
2619  * called before mount is ro, and before dentries are torn down.
2620  * (hmm, does this still race with new lookups?)
2621  */
2622 void ceph_mdsc_pre_umount(struct ceph_mds_client *mdsc)
2623 {
2624         dout("pre_umount\n");
2625         mdsc->stopping = 1;
2626
2627         drop_leases(mdsc);
2628         ceph_flush_dirty_caps(mdsc);
2629         wait_requests(mdsc);
2630 }
2631
2632 /*
2633  * wait for all write mds requests to flush.
2634  */
2635 static void wait_unsafe_requests(struct ceph_mds_client *mdsc, u64 want_tid)
2636 {
2637         struct ceph_mds_request *req;
2638         u64 next_tid = 0;
2639         int got;
2640
2641         mutex_lock(&mdsc->mutex);
2642         dout("wait_unsafe_requests want %lld\n", want_tid);
2643         while (1) {
2644                 got = radix_tree_gang_lookup(&mdsc->request_tree, (void **)&req,
2645                                              next_tid, 1);
2646                 if (!got)
2647                         break;
2648                 if (req->r_tid > want_tid)
2649                         break;
2650
2651                 next_tid = req->r_tid + 1;
2652                 if ((req->r_op & CEPH_MDS_OP_WRITE) == 0)
2653                         continue;  /* not a write op */
2654
2655                 ceph_mdsc_get_request(req);
2656                 mutex_unlock(&mdsc->mutex);
2657                 dout("wait_unsafe_requests  wait on %llu (want %llu)\n",
2658                      req->r_tid, want_tid);
2659                 wait_for_completion(&req->r_safe_completion);
2660                 mutex_lock(&mdsc->mutex);
2661                 ceph_mdsc_put_request(req);
2662         }
2663         mutex_unlock(&mdsc->mutex);
2664         dout("wait_unsafe_requests done\n");
2665 }
2666
2667 void ceph_mdsc_sync(struct ceph_mds_client *mdsc)
2668 {
2669         u64 want_tid, want_flush;
2670
2671         dout("sync\n");
2672         mutex_lock(&mdsc->mutex);
2673         want_tid = mdsc->last_tid;
2674         want_flush = mdsc->cap_flush_seq;
2675         mutex_unlock(&mdsc->mutex);
2676         dout("sync want tid %lld flush_seq %lld\n", want_tid, want_flush);
2677
2678         ceph_flush_dirty_caps(mdsc);
2679
2680         wait_unsafe_requests(mdsc, want_tid);
2681         wait_event(mdsc->cap_flushing_wq, check_cap_flush(mdsc, want_flush));
2682 }
2683
2684
2685 /*
2686  * called after sb is ro.
2687  */
2688 void ceph_mdsc_close_sessions(struct ceph_mds_client *mdsc)
2689 {
2690         struct ceph_mds_session *session;
2691         int i;
2692         int n;
2693         struct ceph_client *client = mdsc->client;
2694         unsigned long started, timeout = client->mount_args->mount_timeout * HZ;
2695
2696         dout("close_sessions\n");
2697
2698         mutex_lock(&mdsc->mutex);
2699
2700         /* close sessions */
2701         started = jiffies;
2702         while (time_before(jiffies, started + timeout)) {
2703                 dout("closing sessions\n");
2704                 n = 0;
2705                 for (i = 0; i < mdsc->max_sessions; i++) {
2706                         session = __ceph_lookup_mds_session(mdsc, i);
2707                         if (!session)
2708                                 continue;
2709                         mutex_unlock(&mdsc->mutex);
2710                         mutex_lock(&session->s_mutex);
2711                         __close_session(mdsc, session);
2712                         mutex_unlock(&session->s_mutex);
2713                         ceph_put_mds_session(session);
2714                         mutex_lock(&mdsc->mutex);
2715                         n++;
2716                 }
2717                 if (n == 0)
2718                         break;
2719
2720                 if (client->mount_state == CEPH_MOUNT_SHUTDOWN)
2721                         break;
2722
2723                 dout("waiting for sessions to close\n");
2724                 mutex_unlock(&mdsc->mutex);
2725                 wait_for_completion_timeout(&mdsc->session_close_waiters,
2726                                             timeout);
2727                 mutex_lock(&mdsc->mutex);
2728         }
2729
2730         /* tear down remaining sessions */
2731         for (i = 0; i < mdsc->max_sessions; i++) {
2732                 if (mdsc->sessions[i]) {
2733                         session = get_session(mdsc->sessions[i]);
2734                         unregister_session(mdsc, session);
2735                         mutex_unlock(&mdsc->mutex);
2736                         mutex_lock(&session->s_mutex);
2737                         remove_session_caps(session);
2738                         mutex_unlock(&session->s_mutex);
2739                         ceph_put_mds_session(session);
2740                         mutex_lock(&mdsc->mutex);
2741                 }
2742         }
2743
2744         WARN_ON(!list_empty(&mdsc->cap_delay_list));
2745
2746         mutex_unlock(&mdsc->mutex);
2747
2748         ceph_cleanup_empty_realms(mdsc);
2749
2750         cancel_delayed_work_sync(&mdsc->delayed_work); /* cancel timer */
2751
2752         dout("stopped\n");
2753 }
2754
2755 void ceph_mdsc_stop(struct ceph_mds_client *mdsc)
2756 {
2757         dout("stop\n");
2758         cancel_delayed_work_sync(&mdsc->delayed_work); /* cancel timer */
2759         if (mdsc->mdsmap)
2760                 ceph_mdsmap_destroy(mdsc->mdsmap);
2761         kfree(mdsc->sessions);
2762 }
2763
2764
2765 /*
2766  * handle mds map update.
2767  */
2768 void ceph_mdsc_handle_map(struct ceph_mds_client *mdsc, struct ceph_msg *msg)
2769 {
2770         u32 epoch;
2771         u32 maplen;
2772         void *p = msg->front.iov_base;
2773         void *end = p + msg->front.iov_len;
2774         struct ceph_mdsmap *newmap, *oldmap;
2775         struct ceph_fsid fsid;
2776         int err = -EINVAL;
2777
2778         ceph_decode_need(&p, end, sizeof(fsid)+2*sizeof(u32), bad);
2779         ceph_decode_copy(&p, &fsid, sizeof(fsid));
2780         if (ceph_fsid_compare(&fsid, &mdsc->client->monc.monmap->fsid)) {
2781                 pr_err("got mdsmap with wrong fsid\n");
2782                 return;
2783         }
2784         epoch = ceph_decode_32(&p);
2785         maplen = ceph_decode_32(&p);
2786         dout("handle_map epoch %u len %d\n", epoch, (int)maplen);
2787
2788         /* do we need it? */
2789         ceph_monc_got_mdsmap(&mdsc->client->monc, epoch);
2790         mutex_lock(&mdsc->mutex);
2791         if (mdsc->mdsmap && epoch <= mdsc->mdsmap->m_epoch) {
2792                 dout("handle_map epoch %u <= our %u\n",
2793                      epoch, mdsc->mdsmap->m_epoch);
2794                 mutex_unlock(&mdsc->mutex);
2795                 return;
2796         }
2797
2798         newmap = ceph_mdsmap_decode(&p, end);
2799         if (IS_ERR(newmap)) {
2800                 err = PTR_ERR(newmap);
2801                 goto bad_unlock;
2802         }
2803
2804         /* swap into place */
2805         if (mdsc->mdsmap) {
2806                 oldmap = mdsc->mdsmap;
2807                 mdsc->mdsmap = newmap;
2808                 check_new_map(mdsc, newmap, oldmap);
2809                 ceph_mdsmap_destroy(oldmap);
2810         } else {
2811                 mdsc->mdsmap = newmap;  /* first mds map */
2812         }
2813         mdsc->client->sb->s_maxbytes = mdsc->mdsmap->m_max_file_size;
2814
2815         __wake_requests(mdsc, &mdsc->waiting_for_map);
2816
2817         mutex_unlock(&mdsc->mutex);
2818         schedule_delayed(mdsc);
2819         return;
2820
2821 bad_unlock:
2822         mutex_unlock(&mdsc->mutex);
2823 bad:
2824         pr_err("error decoding mdsmap %d\n", err);
2825         return;
2826 }
2827
2828 static struct ceph_connection *con_get(struct ceph_connection *con)
2829 {
2830         struct ceph_mds_session *s = con->private;
2831
2832         if (get_session(s)) {
2833                 dout("mdsc con_get %p %d -> %d\n", s,
2834                      atomic_read(&s->s_ref) - 1, atomic_read(&s->s_ref));
2835                 return con;
2836         }
2837         dout("mdsc con_get %p FAIL\n", s);
2838         return NULL;
2839 }
2840
2841 static void con_put(struct ceph_connection *con)
2842 {
2843         struct ceph_mds_session *s = con->private;
2844
2845         dout("mdsc con_put %p %d -> %d\n", s, atomic_read(&s->s_ref),
2846              atomic_read(&s->s_ref) - 1);
2847         ceph_put_mds_session(s);
2848 }
2849
2850 /*
2851  * if the client is unresponsive for long enough, the mds will kill
2852  * the session entirely.
2853  */
2854 static void peer_reset(struct ceph_connection *con)
2855 {
2856         struct ceph_mds_session *s = con->private;
2857
2858         pr_err("mds%d gave us the boot.  IMPLEMENT RECONNECT.\n",
2859                s->s_mds);
2860 }
2861
2862 static void dispatch(struct ceph_connection *con, struct ceph_msg *msg)
2863 {
2864         struct ceph_mds_session *s = con->private;
2865         struct ceph_mds_client *mdsc = s->s_mdsc;
2866         int type = le16_to_cpu(msg->hdr.type);
2867
2868         switch (type) {
2869         case CEPH_MSG_MDS_MAP:
2870                 ceph_mdsc_handle_map(mdsc, msg);
2871                 break;
2872         case CEPH_MSG_CLIENT_SESSION:
2873                 handle_session(s, msg);
2874                 break;
2875         case CEPH_MSG_CLIENT_REPLY:
2876                 handle_reply(s, msg);
2877                 break;
2878         case CEPH_MSG_CLIENT_REQUEST_FORWARD:
2879                 handle_forward(mdsc, msg);
2880                 break;
2881         case CEPH_MSG_CLIENT_CAPS:
2882                 ceph_handle_caps(s, msg);
2883                 break;
2884         case CEPH_MSG_CLIENT_SNAP:
2885                 ceph_handle_snap(mdsc, msg);
2886                 break;
2887         case CEPH_MSG_CLIENT_LEASE:
2888                 handle_lease(mdsc, msg);
2889                 break;
2890
2891         default:
2892                 pr_err("received unknown message type %d %s\n", type,
2893                        ceph_msg_type_name(type));
2894         }
2895         ceph_msg_put(msg);
2896 }
2897
2898 const static struct ceph_connection_operations mds_con_ops = {
2899         .get = con_get,
2900         .put = con_put,
2901         .dispatch = dispatch,
2902         .peer_reset = peer_reset,
2903         .alloc_msg = ceph_alloc_msg,
2904         .alloc_middle = ceph_alloc_middle,
2905 };
2906
2907
2908
2909
2910 /* eof */