828417ae16f95130a193be94753c5fde7da6e254
[linux-3.10.git] / fs / ceph / mds_client.c
1 #include "ceph_debug.h"
2
3 #include <linux/wait.h>
4 #include <linux/sched.h>
5
6 #include "mds_client.h"
7 #include "mon_client.h"
8 #include "super.h"
9 #include "messenger.h"
10 #include "decode.h"
11
12 /*
13  * A cluster of MDS (metadata server) daemons is responsible for
14  * managing the file system namespace (the directory hierarchy and
15  * inodes) and for coordinating shared access to storage.  Metadata is
16  * partitioning hierarchically across a number of servers, and that
17  * partition varies over time as the cluster adjusts the distribution
18  * in order to balance load.
19  *
20  * The MDS client is primarily responsible to managing synchronous
21  * metadata requests for operations like open, unlink, and so forth.
22  * If there is a MDS failure, we find out about it when we (possibly
23  * request and) receive a new MDS map, and can resubmit affected
24  * requests.
25  *
26  * For the most part, though, we take advantage of a lossless
27  * communications channel to the MDS, and do not need to worry about
28  * timing out or resubmitting requests.
29  *
30  * We maintain a stateful "session" with each MDS we interact with.
31  * Within each session, we sent periodic heartbeat messages to ensure
32  * any capabilities or leases we have been issues remain valid.  If
33  * the session times out and goes stale, our leases and capabilities
34  * are no longer valid.
35  */
36
37 static void __wake_requests(struct ceph_mds_client *mdsc,
38                             struct list_head *head);
39
40 const static struct ceph_connection_operations mds_con_ops;
41
42
43 /*
44  * mds reply parsing
45  */
46
47 /*
48  * parse individual inode info
49  */
50 static int parse_reply_info_in(void **p, void *end,
51                                struct ceph_mds_reply_info_in *info)
52 {
53         int err = -EIO;
54
55         info->in = *p;
56         *p += sizeof(struct ceph_mds_reply_inode) +
57                 sizeof(*info->in->fragtree.splits) *
58                 le32_to_cpu(info->in->fragtree.nsplits);
59
60         ceph_decode_32_safe(p, end, info->symlink_len, bad);
61         ceph_decode_need(p, end, info->symlink_len, bad);
62         info->symlink = *p;
63         *p += info->symlink_len;
64
65         ceph_decode_32_safe(p, end, info->xattr_len, bad);
66         ceph_decode_need(p, end, info->xattr_len, bad);
67         info->xattr_data = *p;
68         *p += info->xattr_len;
69         return 0;
70 bad:
71         return err;
72 }
73
74 /*
75  * parse a normal reply, which may contain a (dir+)dentry and/or a
76  * target inode.
77  */
78 static int parse_reply_info_trace(void **p, void *end,
79                                   struct ceph_mds_reply_info_parsed *info)
80 {
81         int err;
82
83         if (info->head->is_dentry) {
84                 err = parse_reply_info_in(p, end, &info->diri);
85                 if (err < 0)
86                         goto out_bad;
87
88                 if (unlikely(*p + sizeof(*info->dirfrag) > end))
89                         goto bad;
90                 info->dirfrag = *p;
91                 *p += sizeof(*info->dirfrag) +
92                         sizeof(u32)*le32_to_cpu(info->dirfrag->ndist);
93                 if (unlikely(*p > end))
94                         goto bad;
95
96                 ceph_decode_32_safe(p, end, info->dname_len, bad);
97                 ceph_decode_need(p, end, info->dname_len, bad);
98                 info->dname = *p;
99                 *p += info->dname_len;
100                 info->dlease = *p;
101                 *p += sizeof(*info->dlease);
102         }
103
104         if (info->head->is_target) {
105                 err = parse_reply_info_in(p, end, &info->targeti);
106                 if (err < 0)
107                         goto out_bad;
108         }
109
110         if (unlikely(*p != end))
111                 goto bad;
112         return 0;
113
114 bad:
115         err = -EIO;
116 out_bad:
117         pr_err("problem parsing mds trace %d\n", err);
118         return err;
119 }
120
121 /*
122  * parse readdir results
123  */
124 static int parse_reply_info_dir(void **p, void *end,
125                                 struct ceph_mds_reply_info_parsed *info)
126 {
127         u32 num, i = 0;
128         int err;
129
130         info->dir_dir = *p;
131         if (*p + sizeof(*info->dir_dir) > end)
132                 goto bad;
133         *p += sizeof(*info->dir_dir) +
134                 sizeof(u32)*le32_to_cpu(info->dir_dir->ndist);
135         if (*p > end)
136                 goto bad;
137
138         ceph_decode_need(p, end, sizeof(num) + 2, bad);
139         num = ceph_decode_32(p);
140         info->dir_end = ceph_decode_8(p);
141         info->dir_complete = ceph_decode_8(p);
142         if (num == 0)
143                 goto done;
144
145         /* alloc large array */
146         info->dir_nr = num;
147         info->dir_in = kcalloc(num, sizeof(*info->dir_in) +
148                                sizeof(*info->dir_dname) +
149                                sizeof(*info->dir_dname_len) +
150                                sizeof(*info->dir_dlease),
151                                GFP_NOFS);
152         if (info->dir_in == NULL) {
153                 err = -ENOMEM;
154                 goto out_bad;
155         }
156         info->dir_dname = (void *)(info->dir_in + num);
157         info->dir_dname_len = (void *)(info->dir_dname + num);
158         info->dir_dlease = (void *)(info->dir_dname_len + num);
159
160         while (num) {
161                 /* dentry */
162                 ceph_decode_need(p, end, sizeof(u32)*2, bad);
163                 info->dir_dname_len[i] = ceph_decode_32(p);
164                 ceph_decode_need(p, end, info->dir_dname_len[i], bad);
165                 info->dir_dname[i] = *p;
166                 *p += info->dir_dname_len[i];
167                 dout("parsed dir dname '%.*s'\n", info->dir_dname_len[i],
168                      info->dir_dname[i]);
169                 info->dir_dlease[i] = *p;
170                 *p += sizeof(struct ceph_mds_reply_lease);
171
172                 /* inode */
173                 err = parse_reply_info_in(p, end, &info->dir_in[i]);
174                 if (err < 0)
175                         goto out_bad;
176                 i++;
177                 num--;
178         }
179
180 done:
181         if (*p != end)
182                 goto bad;
183         return 0;
184
185 bad:
186         err = -EIO;
187 out_bad:
188         pr_err("problem parsing dir contents %d\n", err);
189         return err;
190 }
191
192 /*
193  * parse entire mds reply
194  */
195 static int parse_reply_info(struct ceph_msg *msg,
196                             struct ceph_mds_reply_info_parsed *info)
197 {
198         void *p, *end;
199         u32 len;
200         int err;
201
202         info->head = msg->front.iov_base;
203         p = msg->front.iov_base + sizeof(struct ceph_mds_reply_head);
204         end = p + msg->front.iov_len - sizeof(struct ceph_mds_reply_head);
205
206         /* trace */
207         ceph_decode_32_safe(&p, end, len, bad);
208         if (len > 0) {
209                 err = parse_reply_info_trace(&p, p+len, info);
210                 if (err < 0)
211                         goto out_bad;
212         }
213
214         /* dir content */
215         ceph_decode_32_safe(&p, end, len, bad);
216         if (len > 0) {
217                 err = parse_reply_info_dir(&p, p+len, info);
218                 if (err < 0)
219                         goto out_bad;
220         }
221
222         /* snap blob */
223         ceph_decode_32_safe(&p, end, len, bad);
224         info->snapblob_len = len;
225         info->snapblob = p;
226         p += len;
227
228         if (p != end)
229                 goto bad;
230         return 0;
231
232 bad:
233         err = -EIO;
234 out_bad:
235         pr_err("mds parse_reply err %d\n", err);
236         return err;
237 }
238
239 static void destroy_reply_info(struct ceph_mds_reply_info_parsed *info)
240 {
241         kfree(info->dir_in);
242 }
243
244
245 /*
246  * sessions
247  */
248 static const char *session_state_name(int s)
249 {
250         switch (s) {
251         case CEPH_MDS_SESSION_NEW: return "new";
252         case CEPH_MDS_SESSION_OPENING: return "opening";
253         case CEPH_MDS_SESSION_OPEN: return "open";
254         case CEPH_MDS_SESSION_HUNG: return "hung";
255         case CEPH_MDS_SESSION_CLOSING: return "closing";
256         case CEPH_MDS_SESSION_RECONNECTING: return "reconnecting";
257         default: return "???";
258         }
259 }
260
261 static struct ceph_mds_session *get_session(struct ceph_mds_session *s)
262 {
263         if (atomic_inc_not_zero(&s->s_ref)) {
264                 dout("mdsc get_session %p %d -> %d\n", s,
265                      atomic_read(&s->s_ref)-1, atomic_read(&s->s_ref));
266                 return s;
267         } else {
268                 dout("mdsc get_session %p 0 -- FAIL", s);
269                 return NULL;
270         }
271 }
272
273 void ceph_put_mds_session(struct ceph_mds_session *s)
274 {
275         dout("mdsc put_session %p %d -> %d\n", s,
276              atomic_read(&s->s_ref), atomic_read(&s->s_ref)-1);
277         if (atomic_dec_and_test(&s->s_ref)) {
278                 ceph_con_shutdown(&s->s_con);
279                 kfree(s);
280         }
281 }
282
283 /*
284  * called under mdsc->mutex
285  */
286 struct ceph_mds_session *__ceph_lookup_mds_session(struct ceph_mds_client *mdsc,
287                                                    int mds)
288 {
289         struct ceph_mds_session *session;
290
291         if (mds >= mdsc->max_sessions || mdsc->sessions[mds] == NULL)
292                 return NULL;
293         session = mdsc->sessions[mds];
294         dout("lookup_mds_session %p %d\n", session,
295              atomic_read(&session->s_ref));
296         get_session(session);
297         return session;
298 }
299
300 static bool __have_session(struct ceph_mds_client *mdsc, int mds)
301 {
302         if (mds >= mdsc->max_sessions)
303                 return false;
304         return mdsc->sessions[mds];
305 }
306
307 /*
308  * create+register a new session for given mds.
309  * called under mdsc->mutex.
310  */
311 static struct ceph_mds_session *register_session(struct ceph_mds_client *mdsc,
312                                                  int mds)
313 {
314         struct ceph_mds_session *s;
315
316         s = kzalloc(sizeof(*s), GFP_NOFS);
317         s->s_mdsc = mdsc;
318         s->s_mds = mds;
319         s->s_state = CEPH_MDS_SESSION_NEW;
320         s->s_ttl = 0;
321         s->s_seq = 0;
322         mutex_init(&s->s_mutex);
323
324         ceph_con_init(mdsc->client->msgr, &s->s_con);
325         s->s_con.private = s;
326         s->s_con.ops = &mds_con_ops;
327         s->s_con.peer_name.type = CEPH_ENTITY_TYPE_MDS;
328         s->s_con.peer_name.num = cpu_to_le64(mds);
329         ceph_con_open(&s->s_con, ceph_mdsmap_get_addr(mdsc->mdsmap, mds));
330
331         spin_lock_init(&s->s_cap_lock);
332         s->s_recon_gen = 0;
333         s->s_cap_gen = 0;
334         s->s_cap_ttl = 0;
335         s->s_renew_requested = 0;
336         s->s_renew_seq = 0;
337         INIT_LIST_HEAD(&s->s_caps);
338         s->s_nr_caps = 0;
339         atomic_set(&s->s_ref, 1);
340         INIT_LIST_HEAD(&s->s_waiting);
341         INIT_LIST_HEAD(&s->s_unsafe);
342         s->s_num_cap_releases = 0;
343         INIT_LIST_HEAD(&s->s_cap_releases);
344         INIT_LIST_HEAD(&s->s_cap_releases_done);
345         INIT_LIST_HEAD(&s->s_cap_flushing);
346         INIT_LIST_HEAD(&s->s_cap_snaps_flushing);
347
348         dout("register_session mds%d\n", mds);
349         if (mds >= mdsc->max_sessions) {
350                 int newmax = 1 << get_count_order(mds+1);
351                 struct ceph_mds_session **sa;
352
353                 dout("register_session realloc to %d\n", newmax);
354                 sa = kcalloc(newmax, sizeof(void *), GFP_NOFS);
355                 if (sa == NULL)
356                         return ERR_PTR(-ENOMEM);
357                 if (mdsc->sessions) {
358                         memcpy(sa, mdsc->sessions,
359                                mdsc->max_sessions * sizeof(void *));
360                         kfree(mdsc->sessions);
361                 }
362                 mdsc->sessions = sa;
363                 mdsc->max_sessions = newmax;
364         }
365         mdsc->sessions[mds] = s;
366         atomic_inc(&s->s_ref);  /* one ref to sessions[], one to caller */
367         return s;
368 }
369
370 /*
371  * called under mdsc->mutex
372  */
373 static void unregister_session(struct ceph_mds_client *mdsc, int mds)
374 {
375         dout("unregister_session mds%d %p\n", mds, mdsc->sessions[mds]);
376         ceph_put_mds_session(mdsc->sessions[mds]);
377         mdsc->sessions[mds] = NULL;
378 }
379
380 /*
381  * drop session refs in request.
382  *
383  * should be last request ref, or hold mdsc->mutex
384  */
385 static void put_request_session(struct ceph_mds_request *req)
386 {
387         if (req->r_session) {
388                 ceph_put_mds_session(req->r_session);
389                 req->r_session = NULL;
390         }
391 }
392
393 void ceph_mdsc_put_request(struct ceph_mds_request *req)
394 {
395         dout("mdsc put_request %p %d -> %d\n", req,
396              atomic_read(&req->r_ref), atomic_read(&req->r_ref)-1);
397         if (atomic_dec_and_test(&req->r_ref)) {
398                 if (req->r_request)
399                         ceph_msg_put(req->r_request);
400                 if (req->r_reply) {
401                         ceph_msg_put(req->r_reply);
402                         destroy_reply_info(&req->r_reply_info);
403                 }
404                 if (req->r_inode) {
405                         ceph_put_cap_refs(ceph_inode(req->r_inode),
406                                           CEPH_CAP_PIN);
407                         iput(req->r_inode);
408                 }
409                 if (req->r_locked_dir)
410                         ceph_put_cap_refs(ceph_inode(req->r_locked_dir),
411                                           CEPH_CAP_PIN);
412                 if (req->r_target_inode)
413                         iput(req->r_target_inode);
414                 if (req->r_dentry)
415                         dput(req->r_dentry);
416                 if (req->r_old_dentry) {
417                         ceph_put_cap_refs(
418                              ceph_inode(req->r_old_dentry->d_parent->d_inode),
419                              CEPH_CAP_PIN);
420                         dput(req->r_old_dentry);
421                 }
422                 kfree(req->r_path1);
423                 kfree(req->r_path2);
424                 put_request_session(req);
425                 ceph_unreserve_caps(&req->r_caps_reservation);
426                 kfree(req);
427         }
428 }
429
430 /*
431  * lookup session, bump ref if found.
432  *
433  * called under mdsc->mutex.
434  */
435 static struct ceph_mds_request *__lookup_request(struct ceph_mds_client *mdsc,
436                                              u64 tid)
437 {
438         struct ceph_mds_request *req;
439         req = radix_tree_lookup(&mdsc->request_tree, tid);
440         if (req)
441                 ceph_mdsc_get_request(req);
442         return req;
443 }
444
445 /*
446  * Register an in-flight request, and assign a tid.  Link to directory
447  * are modifying (if any).
448  *
449  * Called under mdsc->mutex.
450  */
451 static void __register_request(struct ceph_mds_client *mdsc,
452                                struct ceph_mds_request *req,
453                                struct inode *dir)
454 {
455         req->r_tid = ++mdsc->last_tid;
456         if (req->r_num_caps)
457                 ceph_reserve_caps(&req->r_caps_reservation, req->r_num_caps);
458         dout("__register_request %p tid %lld\n", req, req->r_tid);
459         ceph_mdsc_get_request(req);
460         radix_tree_insert(&mdsc->request_tree, req->r_tid, (void *)req);
461
462         if (dir) {
463                 struct ceph_inode_info *ci = ceph_inode(dir);
464
465                 spin_lock(&ci->i_unsafe_lock);
466                 req->r_unsafe_dir = dir;
467                 list_add_tail(&req->r_unsafe_dir_item, &ci->i_unsafe_dirops);
468                 spin_unlock(&ci->i_unsafe_lock);
469         }
470 }
471
472 static void __unregister_request(struct ceph_mds_client *mdsc,
473                                  struct ceph_mds_request *req)
474 {
475         dout("__unregister_request %p tid %lld\n", req, req->r_tid);
476         radix_tree_delete(&mdsc->request_tree, req->r_tid);
477         ceph_mdsc_put_request(req);
478
479         if (req->r_unsafe_dir) {
480                 struct ceph_inode_info *ci = ceph_inode(req->r_unsafe_dir);
481
482                 spin_lock(&ci->i_unsafe_lock);
483                 list_del_init(&req->r_unsafe_dir_item);
484                 spin_unlock(&ci->i_unsafe_lock);
485         }
486 }
487
488 /*
489  * Choose mds to send request to next.  If there is a hint set in the
490  * request (e.g., due to a prior forward hint from the mds), use that.
491  * Otherwise, consult frag tree and/or caps to identify the
492  * appropriate mds.  If all else fails, choose randomly.
493  *
494  * Called under mdsc->mutex.
495  */
496 static int __choose_mds(struct ceph_mds_client *mdsc,
497                         struct ceph_mds_request *req)
498 {
499         struct inode *inode;
500         struct ceph_inode_info *ci;
501         struct ceph_cap *cap;
502         int mode = req->r_direct_mode;
503         int mds = -1;
504         u32 hash = req->r_direct_hash;
505         bool is_hash = req->r_direct_is_hash;
506
507         /*
508          * is there a specific mds we should try?  ignore hint if we have
509          * no session and the mds is not up (active or recovering).
510          */
511         if (req->r_resend_mds >= 0 &&
512             (__have_session(mdsc, req->r_resend_mds) ||
513              ceph_mdsmap_get_state(mdsc->mdsmap, req->r_resend_mds) > 0)) {
514                 dout("choose_mds using resend_mds mds%d\n",
515                      req->r_resend_mds);
516                 return req->r_resend_mds;
517         }
518
519         if (mode == USE_RANDOM_MDS)
520                 goto random;
521
522         inode = NULL;
523         if (req->r_inode) {
524                 inode = req->r_inode;
525         } else if (req->r_dentry) {
526                 if (req->r_dentry->d_inode) {
527                         inode = req->r_dentry->d_inode;
528                 } else {
529                         inode = req->r_dentry->d_parent->d_inode;
530                         hash = req->r_dentry->d_name.hash;
531                         is_hash = true;
532                 }
533         }
534         dout("__choose_mds %p is_hash=%d (%d) mode %d\n", inode, (int)is_hash,
535              (int)hash, mode);
536         if (!inode)
537                 goto random;
538         ci = ceph_inode(inode);
539
540         if (is_hash && S_ISDIR(inode->i_mode)) {
541                 struct ceph_inode_frag frag;
542                 int found;
543
544                 ceph_choose_frag(ci, hash, &frag, &found);
545                 if (found) {
546                         if (mode == USE_ANY_MDS && frag.ndist > 0) {
547                                 u8 r;
548
549                                 /* choose a random replica */
550                                 get_random_bytes(&r, 1);
551                                 r %= frag.ndist;
552                                 mds = frag.dist[r];
553                                 dout("choose_mds %p %llx.%llx "
554                                      "frag %u mds%d (%d/%d)\n",
555                                      inode, ceph_vinop(inode),
556                                      frag.frag, frag.mds,
557                                      (int)r, frag.ndist);
558                                 return mds;
559                         }
560
561                         /* since this file/dir wasn't known to be
562                          * replicated, then we want to look for the
563                          * authoritative mds. */
564                         mode = USE_AUTH_MDS;
565                         if (frag.mds >= 0) {
566                                 /* choose auth mds */
567                                 mds = frag.mds;
568                                 dout("choose_mds %p %llx.%llx "
569                                      "frag %u mds%d (auth)\n",
570                                      inode, ceph_vinop(inode), frag.frag, mds);
571                                 return mds;
572                         }
573                 }
574         }
575
576         spin_lock(&inode->i_lock);
577         cap = NULL;
578         if (mode == USE_AUTH_MDS)
579                 cap = ci->i_auth_cap;
580         if (!cap && !RB_EMPTY_ROOT(&ci->i_caps))
581                 cap = rb_entry(rb_first(&ci->i_caps), struct ceph_cap, ci_node);
582         if (!cap) {
583                 spin_unlock(&inode->i_lock);
584                 goto random;
585         }
586         mds = cap->session->s_mds;
587         dout("choose_mds %p %llx.%llx mds%d (%scap %p)\n",
588              inode, ceph_vinop(inode), mds,
589              cap == ci->i_auth_cap ? "auth " : "", cap);
590         spin_unlock(&inode->i_lock);
591         return mds;
592
593 random:
594         mds = ceph_mdsmap_get_random_mds(mdsc->mdsmap);
595         dout("choose_mds chose random mds%d\n", mds);
596         return mds;
597 }
598
599
600 /*
601  * session messages
602  */
603 static struct ceph_msg *create_session_msg(u32 op, u64 seq)
604 {
605         struct ceph_msg *msg;
606         struct ceph_mds_session_head *h;
607
608         msg = ceph_msg_new(CEPH_MSG_CLIENT_SESSION, sizeof(*h), 0, 0, NULL);
609         if (IS_ERR(msg)) {
610                 pr_err("create_session_msg ENOMEM creating msg\n");
611                 return ERR_PTR(PTR_ERR(msg));
612         }
613         h = msg->front.iov_base;
614         h->op = cpu_to_le32(op);
615         h->seq = cpu_to_le64(seq);
616         return msg;
617 }
618
619 /*
620  * send session open request.
621  *
622  * called under mdsc->mutex
623  */
624 static int __open_session(struct ceph_mds_client *mdsc,
625                           struct ceph_mds_session *session)
626 {
627         struct ceph_msg *msg;
628         int mstate;
629         int mds = session->s_mds;
630         int err = 0;
631
632         /* wait for mds to go active? */
633         mstate = ceph_mdsmap_get_state(mdsc->mdsmap, mds);
634         dout("open_session to mds%d (%s)\n", mds,
635              ceph_mds_state_name(mstate));
636         session->s_state = CEPH_MDS_SESSION_OPENING;
637         session->s_renew_requested = jiffies;
638
639         /* send connect message */
640         msg = create_session_msg(CEPH_SESSION_REQUEST_OPEN, session->s_seq);
641         if (IS_ERR(msg)) {
642                 err = PTR_ERR(msg);
643                 goto out;
644         }
645         ceph_con_send(&session->s_con, msg);
646
647 out:
648         return 0;
649 }
650
651 /*
652  * session caps
653  */
654
655 /*
656  * Free preallocated cap messages assigned to this session
657  */
658 static void cleanup_cap_releases(struct ceph_mds_session *session)
659 {
660         struct ceph_msg *msg;
661
662         spin_lock(&session->s_cap_lock);
663         while (!list_empty(&session->s_cap_releases)) {
664                 msg = list_first_entry(&session->s_cap_releases,
665                                        struct ceph_msg, list_head);
666                 list_del_init(&msg->list_head);
667                 ceph_msg_put(msg);
668         }
669         while (!list_empty(&session->s_cap_releases_done)) {
670                 msg = list_first_entry(&session->s_cap_releases_done,
671                                        struct ceph_msg, list_head);
672                 list_del_init(&msg->list_head);
673                 ceph_msg_put(msg);
674         }
675         spin_unlock(&session->s_cap_lock);
676 }
677
678 /*
679  * Helper to safely iterate over all caps associated with a session.
680  *
681  * caller must hold session s_mutex
682  */
683 static int iterate_session_caps(struct ceph_mds_session *session,
684                                  int (*cb)(struct inode *, struct ceph_cap *,
685                                             void *), void *arg)
686 {
687         struct ceph_cap *cap, *ncap;
688         struct inode *inode;
689         int ret;
690
691         dout("iterate_session_caps %p mds%d\n", session, session->s_mds);
692         spin_lock(&session->s_cap_lock);
693         list_for_each_entry_safe(cap, ncap, &session->s_caps, session_caps) {
694                 inode = igrab(&cap->ci->vfs_inode);
695                 if (!inode)
696                         continue;
697                 spin_unlock(&session->s_cap_lock);
698                 ret = cb(inode, cap, arg);
699                 iput(inode);
700                 if (ret < 0)
701                         return ret;
702                 spin_lock(&session->s_cap_lock);
703         }
704         spin_unlock(&session->s_cap_lock);
705
706         return 0;
707 }
708
709 static int remove_session_caps_cb(struct inode *inode, struct ceph_cap *cap,
710                                    void *arg)
711 {
712         struct ceph_inode_info *ci = ceph_inode(inode);
713         dout("removing cap %p, ci is %p, inode is %p\n",
714              cap, ci, &ci->vfs_inode);
715         ceph_remove_cap(cap);
716         return 0;
717 }
718
719 /*
720  * caller must hold session s_mutex
721  */
722 static void remove_session_caps(struct ceph_mds_session *session)
723 {
724         dout("remove_session_caps on %p\n", session);
725         iterate_session_caps(session, remove_session_caps_cb, NULL);
726         BUG_ON(session->s_nr_caps > 0);
727         cleanup_cap_releases(session);
728 }
729
730 /*
731  * wake up any threads waiting on this session's caps.  if the cap is
732  * old (didn't get renewed on the client reconnect), remove it now.
733  *
734  * caller must hold s_mutex.
735  */
736 static int wake_up_session_cb(struct inode *inode, struct ceph_cap *cap,
737                               void *arg)
738 {
739         struct ceph_mds_session *session = arg;
740
741         spin_lock(&inode->i_lock);
742         if (cap->recon_gen != session->s_recon_gen) {
743                 pr_err("failed reconnect %p %llx.%llx cap %p "
744                        "(recon_gen %d < session %d)\n", inode,
745                        ceph_vinop(inode), cap,
746                        cap->recon_gen, session->s_recon_gen);
747                 __ceph_remove_cap(cap, NULL);
748         }
749         wake_up(&ceph_inode(inode)->i_cap_wq);
750         spin_unlock(&inode->i_lock);
751         return 0;
752 }
753
754 static void wake_up_session_caps(struct ceph_mds_session *session)
755 {
756         dout("wake_up_session_caps %p mds%d\n", session, session->s_mds);
757         iterate_session_caps(session, wake_up_session_cb, session);
758 }
759
760 /*
761  * Send periodic message to MDS renewing all currently held caps.  The
762  * ack will reset the expiration for all caps from this session.
763  *
764  * caller holds s_mutex
765  */
766 static int send_renew_caps(struct ceph_mds_client *mdsc,
767                            struct ceph_mds_session *session)
768 {
769         struct ceph_msg *msg;
770         int state;
771
772         if (time_after_eq(jiffies, session->s_cap_ttl) &&
773             time_after_eq(session->s_cap_ttl, session->s_renew_requested))
774                 pr_info("mds%d caps stale\n", session->s_mds);
775
776         /* do not try to renew caps until a recovering mds has reconnected
777          * with its clients. */
778         state = ceph_mdsmap_get_state(mdsc->mdsmap, session->s_mds);
779         if (state < CEPH_MDS_STATE_RECONNECT) {
780                 dout("send_renew_caps ignoring mds%d (%s)\n",
781                      session->s_mds, ceph_mds_state_name(state));
782                 return 0;
783         }
784
785         dout("send_renew_caps to mds%d (%s)\n", session->s_mds,
786                 ceph_mds_state_name(state));
787         session->s_renew_requested = jiffies;
788         msg = create_session_msg(CEPH_SESSION_REQUEST_RENEWCAPS,
789                                  ++session->s_renew_seq);
790         if (IS_ERR(msg))
791                 return PTR_ERR(msg);
792         ceph_con_send(&session->s_con, msg);
793         return 0;
794 }
795
796 /*
797  * Note new cap ttl, and any transition from stale -> not stale (fresh?).
798  */
799 static void renewed_caps(struct ceph_mds_client *mdsc,
800                          struct ceph_mds_session *session, int is_renew)
801 {
802         int was_stale;
803         int wake = 0;
804
805         spin_lock(&session->s_cap_lock);
806         was_stale = is_renew && (session->s_cap_ttl == 0 ||
807                                  time_after_eq(jiffies, session->s_cap_ttl));
808
809         session->s_cap_ttl = session->s_renew_requested +
810                 mdsc->mdsmap->m_session_timeout*HZ;
811
812         if (was_stale) {
813                 if (time_before(jiffies, session->s_cap_ttl)) {
814                         pr_info("mds%d caps renewed\n", session->s_mds);
815                         wake = 1;
816                 } else {
817                         pr_info("mds%d caps still stale\n", session->s_mds);
818                 }
819         }
820         dout("renewed_caps mds%d ttl now %lu, was %s, now %s\n",
821              session->s_mds, session->s_cap_ttl, was_stale ? "stale" : "fresh",
822              time_before(jiffies, session->s_cap_ttl) ? "stale" : "fresh");
823         spin_unlock(&session->s_cap_lock);
824
825         if (wake)
826                 wake_up_session_caps(session);
827 }
828
829 /*
830  * send a session close request
831  */
832 static int request_close_session(struct ceph_mds_client *mdsc,
833                                  struct ceph_mds_session *session)
834 {
835         struct ceph_msg *msg;
836         int err = 0;
837
838         dout("request_close_session mds%d state %s seq %lld\n",
839              session->s_mds, session_state_name(session->s_state),
840              session->s_seq);
841         msg = create_session_msg(CEPH_SESSION_REQUEST_CLOSE, session->s_seq);
842         if (IS_ERR(msg))
843                 err = PTR_ERR(msg);
844         else
845                 ceph_con_send(&session->s_con, msg);
846         return err;
847 }
848
849 /*
850  * Called with s_mutex held.
851  */
852 static int __close_session(struct ceph_mds_client *mdsc,
853                          struct ceph_mds_session *session)
854 {
855         if (session->s_state >= CEPH_MDS_SESSION_CLOSING)
856                 return 0;
857         session->s_state = CEPH_MDS_SESSION_CLOSING;
858         return request_close_session(mdsc, session);
859 }
860
861 /*
862  * Trim old(er) caps.
863  *
864  * Because we can't cache an inode without one or more caps, we do
865  * this indirectly: if a cap is unused, we prune its aliases, at which
866  * point the inode will hopefully get dropped to.
867  *
868  * Yes, this is a bit sloppy.  Our only real goal here is to respond to
869  * memory pressure from the MDS, though, so it needn't be perfect.
870  */
871 static int trim_caps_cb(struct inode *inode, struct ceph_cap *cap, void *arg)
872 {
873         struct ceph_mds_session *session = arg;
874         struct ceph_inode_info *ci = ceph_inode(inode);
875         int used, oissued, mine;
876
877         if (session->s_trim_caps <= 0)
878                 return -1;
879
880         spin_lock(&inode->i_lock);
881         mine = cap->issued | cap->implemented;
882         used = __ceph_caps_used(ci);
883         oissued = __ceph_caps_issued_other(ci, cap);
884
885         dout("trim_caps_cb %p cap %p mine %s oissued %s used %s\n",
886              inode, cap, ceph_cap_string(mine), ceph_cap_string(oissued),
887              ceph_cap_string(used));
888         if (ci->i_dirty_caps)
889                 goto out;   /* dirty caps */
890         if ((used & ~oissued) & mine)
891                 goto out;   /* we need these caps */
892
893         session->s_trim_caps--;
894         if (oissued) {
895                 /* we aren't the only cap.. just remove us */
896                 __ceph_remove_cap(cap, NULL);
897         } else {
898                 /* try to drop referring dentries */
899                 spin_unlock(&inode->i_lock);
900                 d_prune_aliases(inode);
901                 dout("trim_caps_cb %p cap %p  pruned, count now %d\n",
902                      inode, cap, atomic_read(&inode->i_count));
903                 return 0;
904         }
905
906 out:
907         spin_unlock(&inode->i_lock);
908         return 0;
909 }
910
911 /*
912  * Trim session cap count down to some max number.
913  */
914 static int trim_caps(struct ceph_mds_client *mdsc,
915                      struct ceph_mds_session *session,
916                      int max_caps)
917 {
918         int trim_caps = session->s_nr_caps - max_caps;
919
920         dout("trim_caps mds%d start: %d / %d, trim %d\n",
921              session->s_mds, session->s_nr_caps, max_caps, trim_caps);
922         if (trim_caps > 0) {
923                 session->s_trim_caps = trim_caps;
924                 iterate_session_caps(session, trim_caps_cb, session);
925                 dout("trim_caps mds%d done: %d / %d, trimmed %d\n",
926                      session->s_mds, session->s_nr_caps, max_caps,
927                         trim_caps - session->s_trim_caps);
928         }
929         return 0;
930 }
931
932 /*
933  * Allocate cap_release messages.  If there is a partially full message
934  * in the queue, try to allocate enough to cover it's remainder, so that
935  * we can send it immediately.
936  *
937  * Called under s_mutex.
938  */
939 static int add_cap_releases(struct ceph_mds_client *mdsc,
940                             struct ceph_mds_session *session,
941                             int extra)
942 {
943         struct ceph_msg *msg;
944         struct ceph_mds_cap_release *head;
945         int err = -ENOMEM;
946
947         if (extra < 0)
948                 extra = mdsc->client->mount_args->cap_release_safety;
949
950         spin_lock(&session->s_cap_lock);
951
952         if (!list_empty(&session->s_cap_releases)) {
953                 msg = list_first_entry(&session->s_cap_releases,
954                                        struct ceph_msg,
955                                  list_head);
956                 head = msg->front.iov_base;
957                 extra += CEPH_CAPS_PER_RELEASE - le32_to_cpu(head->num);
958         }
959
960         while (session->s_num_cap_releases < session->s_nr_caps + extra) {
961                 spin_unlock(&session->s_cap_lock);
962                 msg = ceph_msg_new(CEPH_MSG_CLIENT_CAPRELEASE, PAGE_CACHE_SIZE,
963                                    0, 0, NULL);
964                 if (!msg)
965                         goto out_unlocked;
966                 dout("add_cap_releases %p msg %p now %d\n", session, msg,
967                      (int)msg->front.iov_len);
968                 head = msg->front.iov_base;
969                 head->num = cpu_to_le32(0);
970                 msg->front.iov_len = sizeof(*head);
971                 spin_lock(&session->s_cap_lock);
972                 list_add(&msg->list_head, &session->s_cap_releases);
973                 session->s_num_cap_releases += CEPH_CAPS_PER_RELEASE;
974         }
975
976         if (!list_empty(&session->s_cap_releases)) {
977                 msg = list_first_entry(&session->s_cap_releases,
978                                        struct ceph_msg,
979                                        list_head);
980                 head = msg->front.iov_base;
981                 if (head->num) {
982                         dout(" queueing non-full %p (%d)\n", msg,
983                              le32_to_cpu(head->num));
984                         list_move_tail(&msg->list_head,
985                                       &session->s_cap_releases_done);
986                         session->s_num_cap_releases -=
987                                 CEPH_CAPS_PER_RELEASE - le32_to_cpu(head->num);
988                 }
989         }
990         err = 0;
991         spin_unlock(&session->s_cap_lock);
992 out_unlocked:
993         return err;
994 }
995
996 /*
997  * flush all dirty inode data to disk.
998  *
999  * returns true if we've flushed through want_flush_seq
1000  */
1001 static int check_cap_flush(struct ceph_mds_client *mdsc, u64 want_flush_seq)
1002 {
1003         int mds, ret = 1;
1004
1005         dout("check_cap_flush want %lld\n", want_flush_seq);
1006         mutex_lock(&mdsc->mutex);
1007         for (mds = 0; ret && mds < mdsc->max_sessions; mds++) {
1008                 struct ceph_mds_session *session = mdsc->sessions[mds];
1009
1010                 if (!session)
1011                         continue;
1012                 get_session(session);
1013                 mutex_unlock(&mdsc->mutex);
1014
1015                 mutex_lock(&session->s_mutex);
1016                 if (!list_empty(&session->s_cap_flushing)) {
1017                         struct ceph_inode_info *ci =
1018                                 list_entry(session->s_cap_flushing.next,
1019                                            struct ceph_inode_info,
1020                                            i_flushing_item);
1021                         struct inode *inode = &ci->vfs_inode;
1022
1023                         spin_lock(&inode->i_lock);
1024                         if (ci->i_cap_flush_seq <= want_flush_seq) {
1025                                 dout("check_cap_flush still flushing %p "
1026                                      "seq %lld <= %lld to mds%d\n", inode,
1027                                      ci->i_cap_flush_seq, want_flush_seq,
1028                                      session->s_mds);
1029                                 ret = 0;
1030                         }
1031                         spin_unlock(&inode->i_lock);
1032                 }
1033                 mutex_unlock(&session->s_mutex);
1034                 ceph_put_mds_session(session);
1035
1036                 if (!ret)
1037                         return ret;
1038                 mutex_lock(&mdsc->mutex);
1039         }
1040
1041         mutex_unlock(&mdsc->mutex);
1042         dout("check_cap_flush ok, flushed thru %lld\n", want_flush_seq);
1043         return ret;
1044 }
1045
1046 /*
1047  * called under s_mutex
1048  */
1049 static void send_cap_releases(struct ceph_mds_client *mdsc,
1050                        struct ceph_mds_session *session)
1051 {
1052         struct ceph_msg *msg;
1053
1054         dout("send_cap_releases mds%d\n", session->s_mds);
1055         while (1) {
1056                 spin_lock(&session->s_cap_lock);
1057                 if (list_empty(&session->s_cap_releases_done))
1058                         break;
1059                 msg = list_first_entry(&session->s_cap_releases_done,
1060                                  struct ceph_msg, list_head);
1061                 list_del_init(&msg->list_head);
1062                 spin_unlock(&session->s_cap_lock);
1063                 msg->hdr.front_len = cpu_to_le32(msg->front.iov_len);
1064                 dout("send_cap_releases mds%d %p\n", session->s_mds, msg);
1065                 ceph_con_send(&session->s_con, msg);
1066         }
1067         spin_unlock(&session->s_cap_lock);
1068 }
1069
1070 /*
1071  * requests
1072  */
1073
1074 /*
1075  * Create an mds request.
1076  */
1077 struct ceph_mds_request *
1078 ceph_mdsc_create_request(struct ceph_mds_client *mdsc, int op, int mode)
1079 {
1080         struct ceph_mds_request *req = kzalloc(sizeof(*req), GFP_NOFS);
1081
1082         if (!req)
1083                 return ERR_PTR(-ENOMEM);
1084
1085         req->r_started = jiffies;
1086         req->r_resend_mds = -1;
1087         INIT_LIST_HEAD(&req->r_unsafe_dir_item);
1088         req->r_fmode = -1;
1089         atomic_set(&req->r_ref, 1);  /* one for request_tree, one for caller */
1090         INIT_LIST_HEAD(&req->r_wait);
1091         init_completion(&req->r_completion);
1092         init_completion(&req->r_safe_completion);
1093         INIT_LIST_HEAD(&req->r_unsafe_item);
1094
1095         req->r_op = op;
1096         req->r_direct_mode = mode;
1097         return req;
1098 }
1099
1100 /*
1101  * return oldest (lowest) tid in request tree, 0 if none.
1102  *
1103  * called under mdsc->mutex.
1104  */
1105 static u64 __get_oldest_tid(struct ceph_mds_client *mdsc)
1106 {
1107         struct ceph_mds_request *first;
1108         if (radix_tree_gang_lookup(&mdsc->request_tree,
1109                                    (void **)&first, 0, 1) <= 0)
1110                 return 0;
1111         return first->r_tid;
1112 }
1113
1114 /*
1115  * Build a dentry's path.  Allocate on heap; caller must kfree.  Based
1116  * on build_path_from_dentry in fs/cifs/dir.c.
1117  *
1118  * If @stop_on_nosnap, generate path relative to the first non-snapped
1119  * inode.
1120  *
1121  * Encode hidden .snap dirs as a double /, i.e.
1122  *   foo/.snap/bar -> foo//bar
1123  */
1124 char *ceph_mdsc_build_path(struct dentry *dentry, int *plen, u64 *base,
1125                            int stop_on_nosnap)
1126 {
1127         struct dentry *temp;
1128         char *path;
1129         int len, pos;
1130
1131         if (dentry == NULL)
1132                 return ERR_PTR(-EINVAL);
1133
1134 retry:
1135         len = 0;
1136         for (temp = dentry; !IS_ROOT(temp);) {
1137                 struct inode *inode = temp->d_inode;
1138                 if (inode && ceph_snap(inode) == CEPH_SNAPDIR)
1139                         len++;  /* slash only */
1140                 else if (stop_on_nosnap && inode &&
1141                          ceph_snap(inode) == CEPH_NOSNAP)
1142                         break;
1143                 else
1144                         len += 1 + temp->d_name.len;
1145                 temp = temp->d_parent;
1146                 if (temp == NULL) {
1147                         pr_err("build_path_dentry corrupt dentry %p\n", dentry);
1148                         return ERR_PTR(-EINVAL);
1149                 }
1150         }
1151         if (len)
1152                 len--;  /* no leading '/' */
1153
1154         path = kmalloc(len+1, GFP_NOFS);
1155         if (path == NULL)
1156                 return ERR_PTR(-ENOMEM);
1157         pos = len;
1158         path[pos] = 0;  /* trailing null */
1159         for (temp = dentry; !IS_ROOT(temp) && pos != 0; ) {
1160                 struct inode *inode = temp->d_inode;
1161
1162                 if (inode && ceph_snap(inode) == CEPH_SNAPDIR) {
1163                         dout("build_path_dentry path+%d: %p SNAPDIR\n",
1164                              pos, temp);
1165                 } else if (stop_on_nosnap && inode &&
1166                            ceph_snap(inode) == CEPH_NOSNAP) {
1167                         break;
1168                 } else {
1169                         pos -= temp->d_name.len;
1170                         if (pos < 0)
1171                                 break;
1172                         strncpy(path + pos, temp->d_name.name,
1173                                 temp->d_name.len);
1174                         dout("build_path_dentry path+%d: %p '%.*s'\n",
1175                              pos, temp, temp->d_name.len, path + pos);
1176                 }
1177                 if (pos)
1178                         path[--pos] = '/';
1179                 temp = temp->d_parent;
1180                 if (temp == NULL) {
1181                         pr_err("build_path_dentry corrupt dentry\n");
1182                         kfree(path);
1183                         return ERR_PTR(-EINVAL);
1184                 }
1185         }
1186         if (pos != 0) {
1187                 pr_err("build_path_dentry did not end path lookup where "
1188                        "expected, namelen is %d, pos is %d\n", len, pos);
1189                 /* presumably this is only possible if racing with a
1190                    rename of one of the parent directories (we can not
1191                    lock the dentries above us to prevent this, but
1192                    retrying should be harmless) */
1193                 kfree(path);
1194                 goto retry;
1195         }
1196
1197         *base = ceph_ino(temp->d_inode);
1198         *plen = len;
1199         dout("build_path_dentry on %p %d built %llx '%.*s'\n",
1200              dentry, atomic_read(&dentry->d_count), *base, len, path);
1201         return path;
1202 }
1203
1204 static int build_dentry_path(struct dentry *dentry,
1205                              const char **ppath, int *ppathlen, u64 *pino,
1206                              int *pfreepath)
1207 {
1208         char *path;
1209
1210         if (ceph_snap(dentry->d_parent->d_inode) == CEPH_NOSNAP) {
1211                 *pino = ceph_ino(dentry->d_parent->d_inode);
1212                 *ppath = dentry->d_name.name;
1213                 *ppathlen = dentry->d_name.len;
1214                 return 0;
1215         }
1216         path = ceph_mdsc_build_path(dentry, ppathlen, pino, 1);
1217         if (IS_ERR(path))
1218                 return PTR_ERR(path);
1219         *ppath = path;
1220         *pfreepath = 1;
1221         return 0;
1222 }
1223
1224 static int build_inode_path(struct inode *inode,
1225                             const char **ppath, int *ppathlen, u64 *pino,
1226                             int *pfreepath)
1227 {
1228         struct dentry *dentry;
1229         char *path;
1230
1231         if (ceph_snap(inode) == CEPH_NOSNAP) {
1232                 *pino = ceph_ino(inode);
1233                 *ppathlen = 0;
1234                 return 0;
1235         }
1236         dentry = d_find_alias(inode);
1237         path = ceph_mdsc_build_path(dentry, ppathlen, pino, 1);
1238         dput(dentry);
1239         if (IS_ERR(path))
1240                 return PTR_ERR(path);
1241         *ppath = path;
1242         *pfreepath = 1;
1243         return 0;
1244 }
1245
1246 /*
1247  * request arguments may be specified via an inode *, a dentry *, or
1248  * an explicit ino+path.
1249  */
1250 static int set_request_path_attr(struct inode *rinode, struct dentry *rdentry,
1251                                   const char *rpath, u64 rino,
1252                                   const char **ppath, int *pathlen,
1253                                   u64 *ino, int *freepath)
1254 {
1255         int r = 0;
1256
1257         if (rinode) {
1258                 r = build_inode_path(rinode, ppath, pathlen, ino, freepath);
1259                 dout(" inode %p %llx.%llx\n", rinode, ceph_ino(rinode),
1260                      ceph_snap(rinode));
1261         } else if (rdentry) {
1262                 r = build_dentry_path(rdentry, ppath, pathlen, ino, freepath);
1263                 dout(" dentry %p %llx/%.*s\n", rdentry, *ino, *pathlen,
1264                      *ppath);
1265         } else if (rpath) {
1266                 *ino = rino;
1267                 *ppath = rpath;
1268                 *pathlen = strlen(rpath);
1269                 dout(" path %.*s\n", *pathlen, rpath);
1270         }
1271
1272         return r;
1273 }
1274
1275 /*
1276  * called under mdsc->mutex
1277  */
1278 static struct ceph_msg *create_request_message(struct ceph_mds_client *mdsc,
1279                                                struct ceph_mds_request *req,
1280                                                int mds)
1281 {
1282         struct ceph_msg *msg;
1283         struct ceph_mds_request_head *head;
1284         const char *path1 = NULL;
1285         const char *path2 = NULL;
1286         u64 ino1 = 0, ino2 = 0;
1287         int pathlen1 = 0, pathlen2 = 0;
1288         int freepath1 = 0, freepath2 = 0;
1289         int len;
1290         u16 releases;
1291         void *p, *end;
1292         int ret;
1293
1294         ret = set_request_path_attr(req->r_inode, req->r_dentry,
1295                               req->r_path1, req->r_ino1.ino,
1296                               &path1, &pathlen1, &ino1, &freepath1);
1297         if (ret < 0) {
1298                 msg = ERR_PTR(ret);
1299                 goto out;
1300         }
1301
1302         ret = set_request_path_attr(NULL, req->r_old_dentry,
1303                               req->r_path2, req->r_ino2.ino,
1304                               &path2, &pathlen2, &ino2, &freepath2);
1305         if (ret < 0) {
1306                 msg = ERR_PTR(ret);
1307                 goto out_free1;
1308         }
1309
1310         len = sizeof(*head) +
1311                 pathlen1 + pathlen2 + 2*(sizeof(u32) + sizeof(u64));
1312
1313         /* calculate (max) length for cap releases */
1314         len += sizeof(struct ceph_mds_request_release) *
1315                 (!!req->r_inode_drop + !!req->r_dentry_drop +
1316                  !!req->r_old_inode_drop + !!req->r_old_dentry_drop);
1317         if (req->r_dentry_drop)
1318                 len += req->r_dentry->d_name.len;
1319         if (req->r_old_dentry_drop)
1320                 len += req->r_old_dentry->d_name.len;
1321
1322         msg = ceph_msg_new(CEPH_MSG_CLIENT_REQUEST, len, 0, 0, NULL);
1323         if (IS_ERR(msg))
1324                 goto out_free2;
1325
1326         head = msg->front.iov_base;
1327         p = msg->front.iov_base + sizeof(*head);
1328         end = msg->front.iov_base + msg->front.iov_len;
1329
1330         head->mdsmap_epoch = cpu_to_le32(mdsc->mdsmap->m_epoch);
1331         head->op = cpu_to_le32(req->r_op);
1332         head->caller_uid = cpu_to_le32(current_fsuid());
1333         head->caller_gid = cpu_to_le32(current_fsgid());
1334         head->args = req->r_args;
1335
1336         ceph_encode_filepath(&p, end, ino1, path1);
1337         ceph_encode_filepath(&p, end, ino2, path2);
1338
1339         /* cap releases */
1340         releases = 0;
1341         if (req->r_inode_drop)
1342                 releases += ceph_encode_inode_release(&p,
1343                       req->r_inode ? req->r_inode : req->r_dentry->d_inode,
1344                       mds, req->r_inode_drop, req->r_inode_unless, 0);
1345         if (req->r_dentry_drop)
1346                 releases += ceph_encode_dentry_release(&p, req->r_dentry,
1347                        mds, req->r_dentry_drop, req->r_dentry_unless);
1348         if (req->r_old_dentry_drop)
1349                 releases += ceph_encode_dentry_release(&p, req->r_old_dentry,
1350                        mds, req->r_old_dentry_drop, req->r_old_dentry_unless);
1351         if (req->r_old_inode_drop)
1352                 releases += ceph_encode_inode_release(&p,
1353                       req->r_old_dentry->d_inode,
1354                       mds, req->r_old_inode_drop, req->r_old_inode_unless, 0);
1355         head->num_releases = cpu_to_le16(releases);
1356
1357         BUG_ON(p > end);
1358         msg->front.iov_len = p - msg->front.iov_base;
1359         msg->hdr.front_len = cpu_to_le32(msg->front.iov_len);
1360
1361         msg->pages = req->r_pages;
1362         msg->nr_pages = req->r_num_pages;
1363         msg->hdr.data_len = cpu_to_le32(req->r_data_len);
1364         msg->hdr.data_off = cpu_to_le16(0);
1365
1366 out_free2:
1367         if (freepath2)
1368                 kfree((char *)path2);
1369 out_free1:
1370         if (freepath1)
1371                 kfree((char *)path1);
1372 out:
1373         return msg;
1374 }
1375
1376 /*
1377  * called under mdsc->mutex if error, under no mutex if
1378  * success.
1379  */
1380 static void complete_request(struct ceph_mds_client *mdsc,
1381                              struct ceph_mds_request *req)
1382 {
1383         if (req->r_callback)
1384                 req->r_callback(mdsc, req);
1385         else
1386                 complete(&req->r_completion);
1387 }
1388
1389 /*
1390  * called under mdsc->mutex
1391  */
1392 static int __prepare_send_request(struct ceph_mds_client *mdsc,
1393                                   struct ceph_mds_request *req,
1394                                   int mds)
1395 {
1396         struct ceph_mds_request_head *rhead;
1397         struct ceph_msg *msg;
1398         int flags = 0;
1399
1400         req->r_mds = mds;
1401         req->r_attempts++;
1402         dout("prepare_send_request %p tid %lld %s (attempt %d)\n", req,
1403              req->r_tid, ceph_mds_op_name(req->r_op), req->r_attempts);
1404
1405         if (req->r_request) {
1406                 ceph_msg_put(req->r_request);
1407                 req->r_request = NULL;
1408         }
1409         msg = create_request_message(mdsc, req, mds);
1410         if (IS_ERR(msg)) {
1411                 req->r_reply = ERR_PTR(PTR_ERR(msg));
1412                 complete_request(mdsc, req);
1413                 return -PTR_ERR(msg);
1414         }
1415         req->r_request = msg;
1416
1417         rhead = msg->front.iov_base;
1418         rhead->tid = cpu_to_le64(req->r_tid);
1419         rhead->oldest_client_tid = cpu_to_le64(__get_oldest_tid(mdsc));
1420         if (req->r_got_unsafe)
1421                 flags |= CEPH_MDS_FLAG_REPLAY;
1422         if (req->r_locked_dir)
1423                 flags |= CEPH_MDS_FLAG_WANT_DENTRY;
1424         rhead->flags = cpu_to_le32(flags);
1425         rhead->num_fwd = req->r_num_fwd;
1426         rhead->num_retry = req->r_attempts - 1;
1427
1428         dout(" r_locked_dir = %p\n", req->r_locked_dir);
1429
1430         if (req->r_target_inode && req->r_got_unsafe)
1431                 rhead->ino = cpu_to_le64(ceph_ino(req->r_target_inode));
1432         else
1433                 rhead->ino = 0;
1434         return 0;
1435 }
1436
1437 /*
1438  * send request, or put it on the appropriate wait list.
1439  */
1440 static int __do_request(struct ceph_mds_client *mdsc,
1441                         struct ceph_mds_request *req)
1442 {
1443         struct ceph_mds_session *session = NULL;
1444         int mds = -1;
1445         int err = -EAGAIN;
1446
1447         if (req->r_reply)
1448                 goto out;
1449
1450         if (req->r_timeout &&
1451             time_after_eq(jiffies, req->r_started + req->r_timeout)) {
1452                 dout("do_request timed out\n");
1453                 err = -EIO;
1454                 goto finish;
1455         }
1456
1457         mds = __choose_mds(mdsc, req);
1458         if (mds < 0 ||
1459             ceph_mdsmap_get_state(mdsc->mdsmap, mds) < CEPH_MDS_STATE_ACTIVE) {
1460                 dout("do_request no mds or not active, waiting for map\n");
1461                 list_add(&req->r_wait, &mdsc->waiting_for_map);
1462                 goto out;
1463         }
1464
1465         /* get, open session */
1466         session = __ceph_lookup_mds_session(mdsc, mds);
1467         if (!session)
1468                 session = register_session(mdsc, mds);
1469         dout("do_request mds%d session %p state %s\n", mds, session,
1470              session_state_name(session->s_state));
1471         if (session->s_state != CEPH_MDS_SESSION_OPEN &&
1472             session->s_state != CEPH_MDS_SESSION_HUNG) {
1473                 if (session->s_state == CEPH_MDS_SESSION_NEW ||
1474                     session->s_state == CEPH_MDS_SESSION_CLOSING)
1475                         __open_session(mdsc, session);
1476                 list_add(&req->r_wait, &session->s_waiting);
1477                 goto out_session;
1478         }
1479
1480         /* send request */
1481         req->r_session = get_session(session);
1482         req->r_resend_mds = -1;   /* forget any previous mds hint */
1483
1484         if (req->r_request_started == 0)   /* note request start time */
1485                 req->r_request_started = jiffies;
1486
1487         err = __prepare_send_request(mdsc, req, mds);
1488         if (!err) {
1489                 ceph_msg_get(req->r_request);
1490                 ceph_con_send(&session->s_con, req->r_request);
1491         }
1492
1493 out_session:
1494         ceph_put_mds_session(session);
1495 out:
1496         return err;
1497
1498 finish:
1499         req->r_reply = ERR_PTR(err);
1500         complete_request(mdsc, req);
1501         goto out;
1502 }
1503
1504 /*
1505  * called under mdsc->mutex
1506  */
1507 static void __wake_requests(struct ceph_mds_client *mdsc,
1508                             struct list_head *head)
1509 {
1510         struct ceph_mds_request *req, *nreq;
1511
1512         list_for_each_entry_safe(req, nreq, head, r_wait) {
1513                 list_del_init(&req->r_wait);
1514                 __do_request(mdsc, req);
1515         }
1516 }
1517
1518 /*
1519  * Wake up threads with requests pending for @mds, so that they can
1520  * resubmit their requests to a possibly different mds.  If @all is set,
1521  * wake up if their requests has been forwarded to @mds, too.
1522  */
1523 static void kick_requests(struct ceph_mds_client *mdsc, int mds, int all)
1524 {
1525         struct ceph_mds_request *reqs[10];
1526         u64 nexttid = 0;
1527         int i, got;
1528
1529         dout("kick_requests mds%d\n", mds);
1530         while (nexttid <= mdsc->last_tid) {
1531                 got = radix_tree_gang_lookup(&mdsc->request_tree,
1532                                              (void **)&reqs, nexttid, 10);
1533                 if (got == 0)
1534                         break;
1535                 nexttid = reqs[got-1]->r_tid + 1;
1536                 for (i = 0; i < got; i++) {
1537                         if (reqs[i]->r_got_unsafe)
1538                                 continue;
1539                         if (reqs[i]->r_session &&
1540                             reqs[i]->r_session->s_mds == mds) {
1541                                 dout(" kicking tid %llu\n", reqs[i]->r_tid);
1542                                 put_request_session(reqs[i]);
1543                                 __do_request(mdsc, reqs[i]);
1544                         }
1545                 }
1546         }
1547 }
1548
1549 void ceph_mdsc_submit_request(struct ceph_mds_client *mdsc,
1550                               struct ceph_mds_request *req)
1551 {
1552         dout("submit_request on %p\n", req);
1553         mutex_lock(&mdsc->mutex);
1554         __register_request(mdsc, req, NULL);
1555         __do_request(mdsc, req);
1556         mutex_unlock(&mdsc->mutex);
1557 }
1558
1559 /*
1560  * Synchrously perform an mds request.  Take care of all of the
1561  * session setup, forwarding, retry details.
1562  */
1563 int ceph_mdsc_do_request(struct ceph_mds_client *mdsc,
1564                          struct inode *dir,
1565                          struct ceph_mds_request *req)
1566 {
1567         int err;
1568
1569         dout("do_request on %p\n", req);
1570
1571         /* take CAP_PIN refs for r_inode, r_locked_dir, r_old_dentry */
1572         if (req->r_inode)
1573                 ceph_get_cap_refs(ceph_inode(req->r_inode), CEPH_CAP_PIN);
1574         if (req->r_locked_dir)
1575                 ceph_get_cap_refs(ceph_inode(req->r_locked_dir), CEPH_CAP_PIN);
1576         if (req->r_old_dentry)
1577                 ceph_get_cap_refs(
1578                         ceph_inode(req->r_old_dentry->d_parent->d_inode),
1579                         CEPH_CAP_PIN);
1580
1581         /* issue */
1582         mutex_lock(&mdsc->mutex);
1583         __register_request(mdsc, req, dir);
1584         __do_request(mdsc, req);
1585
1586         /* wait */
1587         if (!req->r_reply) {
1588                 mutex_unlock(&mdsc->mutex);
1589                 if (req->r_timeout) {
1590                         err = wait_for_completion_timeout(&req->r_completion,
1591                                                           req->r_timeout);
1592                         if (err > 0)
1593                                 err = 0;
1594                         else if (err == 0)
1595                                 req->r_reply = ERR_PTR(-EIO);
1596                 } else {
1597                         wait_for_completion(&req->r_completion);
1598                 }
1599                 mutex_lock(&mdsc->mutex);
1600         }
1601
1602         if (IS_ERR(req->r_reply)) {
1603                 err = PTR_ERR(req->r_reply);
1604                 req->r_reply = NULL;
1605
1606                 /* clean up */
1607                 __unregister_request(mdsc, req);
1608                 if (!list_empty(&req->r_unsafe_item))
1609                         list_del_init(&req->r_unsafe_item);
1610                 complete(&req->r_safe_completion);
1611         } else if (req->r_err) {
1612                 err = req->r_err;
1613         } else {
1614                 err = le32_to_cpu(req->r_reply_info.head->result);
1615         }
1616         mutex_unlock(&mdsc->mutex);
1617
1618         dout("do_request %p done, result %d\n", req, err);
1619         return err;
1620 }
1621
1622 /*
1623  * Handle mds reply.
1624  *
1625  * We take the session mutex and parse and process the reply immediately.
1626  * This preserves the logical ordering of replies, capabilities, etc., sent
1627  * by the MDS as they are applied to our local cache.
1628  */
1629 static void handle_reply(struct ceph_mds_session *session, struct ceph_msg *msg)
1630 {
1631         struct ceph_mds_client *mdsc = session->s_mdsc;
1632         struct ceph_mds_request *req;
1633         struct ceph_mds_reply_head *head = msg->front.iov_base;
1634         struct ceph_mds_reply_info_parsed *rinfo;  /* parsed reply info */
1635         u64 tid;
1636         int err, result;
1637         int mds;
1638
1639         if (msg->hdr.src.name.type != CEPH_ENTITY_TYPE_MDS)
1640                 return;
1641         if (msg->front.iov_len < sizeof(*head)) {
1642                 pr_err("mdsc_handle_reply got corrupt (short) reply\n");
1643                 return;
1644         }
1645
1646         /* get request, session */
1647         tid = le64_to_cpu(head->tid);
1648         mutex_lock(&mdsc->mutex);
1649         req = __lookup_request(mdsc, tid);
1650         if (!req) {
1651                 dout("handle_reply on unknown tid %llu\n", tid);
1652                 mutex_unlock(&mdsc->mutex);
1653                 return;
1654         }
1655         dout("handle_reply %p\n", req);
1656         mds = le64_to_cpu(msg->hdr.src.name.num);
1657
1658         /* correct session? */
1659         if (!req->r_session && req->r_session != session) {
1660                 pr_err("mdsc_handle_reply got %llu on session mds%d"
1661                        " not mds%d\n", tid, session->s_mds,
1662                        req->r_session ? req->r_session->s_mds : -1);
1663                 mutex_unlock(&mdsc->mutex);
1664                 goto out;
1665         }
1666
1667         /* dup? */
1668         if ((req->r_got_unsafe && !head->safe) ||
1669             (req->r_got_safe && head->safe)) {
1670                 pr_warning("got a dup %s reply on %llu from mds%d\n",
1671                            head->safe ? "safe" : "unsafe", tid, mds);
1672                 mutex_unlock(&mdsc->mutex);
1673                 goto out;
1674         }
1675
1676         result = le32_to_cpu(head->result);
1677
1678         /*
1679          * Tolerate 2 consecutive ESTALEs from the same mds.
1680          * FIXME: we should be looking at the cap migrate_seq.
1681          */
1682         if (result == -ESTALE) {
1683                 req->r_direct_mode = USE_AUTH_MDS;
1684                 req->r_num_stale++;
1685                 if (req->r_num_stale <= 2) {
1686                         __do_request(mdsc, req);
1687                         mutex_unlock(&mdsc->mutex);
1688                         goto out;
1689                 }
1690         } else {
1691                 req->r_num_stale = 0;
1692         }
1693
1694         if (head->safe) {
1695                 req->r_got_safe = true;
1696                 __unregister_request(mdsc, req);
1697                 complete(&req->r_safe_completion);
1698
1699                 if (req->r_got_unsafe) {
1700                         /*
1701                          * We already handled the unsafe response, now do the
1702                          * cleanup.  No need to examine the response; the MDS
1703                          * doesn't include any result info in the safe
1704                          * response.  And even if it did, there is nothing
1705                          * useful we could do with a revised return value.
1706                          */
1707                         dout("got safe reply %llu, mds%d\n", tid, mds);
1708                         list_del_init(&req->r_unsafe_item);
1709
1710                         /* last unsafe request during umount? */
1711                         if (mdsc->stopping && !__get_oldest_tid(mdsc))
1712                                 complete(&mdsc->safe_umount_waiters);
1713                         mutex_unlock(&mdsc->mutex);
1714                         goto out;
1715                 }
1716         }
1717
1718         BUG_ON(req->r_reply);
1719
1720         if (!head->safe) {
1721                 req->r_got_unsafe = true;
1722                 list_add_tail(&req->r_unsafe_item, &req->r_session->s_unsafe);
1723         }
1724
1725         dout("handle_reply tid %lld result %d\n", tid, result);
1726         rinfo = &req->r_reply_info;
1727         err = parse_reply_info(msg, rinfo);
1728         mutex_unlock(&mdsc->mutex);
1729
1730         mutex_lock(&session->s_mutex);
1731         if (err < 0) {
1732                 pr_err("mdsc_handle_reply got corrupt reply mds%d\n", mds);
1733                 goto out_err;
1734         }
1735
1736         /* snap trace */
1737         if (rinfo->snapblob_len) {
1738                 down_write(&mdsc->snap_rwsem);
1739                 ceph_update_snap_trace(mdsc, rinfo->snapblob,
1740                                rinfo->snapblob + rinfo->snapblob_len,
1741                                le32_to_cpu(head->op) == CEPH_MDS_OP_RMSNAP);
1742                 downgrade_write(&mdsc->snap_rwsem);
1743         } else {
1744                 down_read(&mdsc->snap_rwsem);
1745         }
1746
1747         /* insert trace into our cache */
1748         err = ceph_fill_trace(mdsc->client->sb, req, req->r_session);
1749         if (err == 0) {
1750                 if (result == 0 && rinfo->dir_nr)
1751                         ceph_readdir_prepopulate(req, req->r_session);
1752                 ceph_unreserve_caps(&req->r_caps_reservation);
1753         }
1754
1755         up_read(&mdsc->snap_rwsem);
1756 out_err:
1757         if (err) {
1758                 req->r_err = err;
1759         } else {
1760                 req->r_reply = msg;
1761                 ceph_msg_get(msg);
1762         }
1763
1764         add_cap_releases(mdsc, req->r_session, -1);
1765         mutex_unlock(&session->s_mutex);
1766
1767         /* kick calling process */
1768         complete_request(mdsc, req);
1769 out:
1770         ceph_mdsc_put_request(req);
1771         return;
1772 }
1773
1774
1775
1776 /*
1777  * handle mds notification that our request has been forwarded.
1778  */
1779 static void handle_forward(struct ceph_mds_client *mdsc, struct ceph_msg *msg)
1780 {
1781         struct ceph_mds_request *req;
1782         u64 tid;
1783         u32 next_mds;
1784         u32 fwd_seq;
1785         u8 must_resend;
1786         int err = -EINVAL;
1787         void *p = msg->front.iov_base;
1788         void *end = p + msg->front.iov_len;
1789         int from_mds, state;
1790
1791         if (msg->hdr.src.name.type != CEPH_ENTITY_TYPE_MDS)
1792                 goto bad;
1793         from_mds = le64_to_cpu(msg->hdr.src.name.num);
1794
1795         ceph_decode_need(&p, end, sizeof(u64)+2*sizeof(u32), bad);
1796         tid = ceph_decode_64(&p);
1797         next_mds = ceph_decode_32(&p);
1798         fwd_seq = ceph_decode_32(&p);
1799         must_resend = ceph_decode_8(&p);
1800
1801         WARN_ON(must_resend);  /* shouldn't happen. */
1802
1803         mutex_lock(&mdsc->mutex);
1804         req = __lookup_request(mdsc, tid);
1805         if (!req) {
1806                 dout("forward %llu dne\n", tid);
1807                 goto out;  /* dup reply? */
1808         }
1809
1810         state = mdsc->sessions[next_mds]->s_state;
1811         if (fwd_seq <= req->r_num_fwd) {
1812                 dout("forward %llu to mds%d - old seq %d <= %d\n",
1813                      tid, next_mds, req->r_num_fwd, fwd_seq);
1814         } else {
1815                 /* resend. forward race not possible; mds would drop */
1816                 dout("forward %llu to mds%d (we resend)\n", tid, next_mds);
1817                 req->r_num_fwd = fwd_seq;
1818                 req->r_resend_mds = next_mds;
1819                 put_request_session(req);
1820                 __do_request(mdsc, req);
1821         }
1822         ceph_mdsc_put_request(req);
1823 out:
1824         mutex_unlock(&mdsc->mutex);
1825         return;
1826
1827 bad:
1828         pr_err("mdsc_handle_forward decode error err=%d\n", err);
1829 }
1830
1831 /*
1832  * handle a mds session control message
1833  */
1834 static void handle_session(struct ceph_mds_session *session,
1835                            struct ceph_msg *msg)
1836 {
1837         struct ceph_mds_client *mdsc = session->s_mdsc;
1838         u32 op;
1839         u64 seq;
1840         int mds;
1841         struct ceph_mds_session_head *h = msg->front.iov_base;
1842         int wake = 0;
1843
1844         if (msg->hdr.src.name.type != CEPH_ENTITY_TYPE_MDS)
1845                 return;
1846         mds = le64_to_cpu(msg->hdr.src.name.num);
1847
1848         /* decode */
1849         if (msg->front.iov_len != sizeof(*h))
1850                 goto bad;
1851         op = le32_to_cpu(h->op);
1852         seq = le64_to_cpu(h->seq);
1853
1854         mutex_lock(&mdsc->mutex);
1855         /* FIXME: this ttl calculation is generous */
1856         session->s_ttl = jiffies + HZ*mdsc->mdsmap->m_session_autoclose;
1857         mutex_unlock(&mdsc->mutex);
1858
1859         mutex_lock(&session->s_mutex);
1860
1861         dout("handle_session mds%d %s %p state %s seq %llu\n",
1862              mds, ceph_session_op_name(op), session,
1863              session_state_name(session->s_state), seq);
1864
1865         if (session->s_state == CEPH_MDS_SESSION_HUNG) {
1866                 session->s_state = CEPH_MDS_SESSION_OPEN;
1867                 pr_info("mds%d came back\n", session->s_mds);
1868         }
1869
1870         switch (op) {
1871         case CEPH_SESSION_OPEN:
1872                 session->s_state = CEPH_MDS_SESSION_OPEN;
1873                 renewed_caps(mdsc, session, 0);
1874                 wake = 1;
1875                 if (mdsc->stopping)
1876                         __close_session(mdsc, session);
1877                 break;
1878
1879         case CEPH_SESSION_RENEWCAPS:
1880                 if (session->s_renew_seq == seq)
1881                         renewed_caps(mdsc, session, 1);
1882                 break;
1883
1884         case CEPH_SESSION_CLOSE:
1885                 unregister_session(mdsc, mds);
1886                 remove_session_caps(session);
1887                 wake = 1; /* for good measure */
1888                 complete(&mdsc->session_close_waiters);
1889                 kick_requests(mdsc, mds, 0);      /* cur only */
1890                 break;
1891
1892         case CEPH_SESSION_STALE:
1893                 pr_info("mds%d caps went stale, renewing\n",
1894                         session->s_mds);
1895                 spin_lock(&session->s_cap_lock);
1896                 session->s_cap_gen++;
1897                 session->s_cap_ttl = 0;
1898                 spin_unlock(&session->s_cap_lock);
1899                 send_renew_caps(mdsc, session);
1900                 break;
1901
1902         case CEPH_SESSION_RECALL_STATE:
1903                 trim_caps(mdsc, session, le32_to_cpu(h->max_caps));
1904                 break;
1905
1906         default:
1907                 pr_err("mdsc_handle_session bad op %d mds%d\n", op, mds);
1908                 WARN_ON(1);
1909         }
1910
1911         mutex_unlock(&session->s_mutex);
1912         if (wake) {
1913                 mutex_lock(&mdsc->mutex);
1914                 __wake_requests(mdsc, &session->s_waiting);
1915                 mutex_unlock(&mdsc->mutex);
1916         }
1917         return;
1918
1919 bad:
1920         pr_err("mdsc_handle_session corrupt message mds%d len %d\n", mds,
1921                (int)msg->front.iov_len);
1922         return;
1923 }
1924
1925
1926 /*
1927  * called under session->mutex.
1928  */
1929 static void replay_unsafe_requests(struct ceph_mds_client *mdsc,
1930                                    struct ceph_mds_session *session)
1931 {
1932         struct ceph_mds_request *req, *nreq;
1933         int err;
1934
1935         dout("replay_unsafe_requests mds%d\n", session->s_mds);
1936
1937         mutex_lock(&mdsc->mutex);
1938         list_for_each_entry_safe(req, nreq, &session->s_unsafe, r_unsafe_item) {
1939                 err = __prepare_send_request(mdsc, req, session->s_mds);
1940                 if (!err) {
1941                         ceph_msg_get(req->r_request);
1942                         ceph_con_send(&session->s_con, req->r_request);
1943                 }
1944         }
1945         mutex_unlock(&mdsc->mutex);
1946 }
1947
1948 /*
1949  * Encode information about a cap for a reconnect with the MDS.
1950  */
1951 struct encode_caps_data {
1952         void **pp;
1953         void *end;
1954         int *num_caps;
1955 };
1956
1957 static int encode_caps_cb(struct inode *inode, struct ceph_cap *cap,
1958                           void *arg)
1959 {
1960         struct ceph_mds_cap_reconnect *rec;
1961         struct ceph_inode_info *ci;
1962         struct encode_caps_data *data = (struct encode_caps_data *)arg;
1963         void *p = *(data->pp);
1964         void *end = data->end;
1965         char *path;
1966         int pathlen, err;
1967         u64 pathbase;
1968         struct dentry *dentry;
1969
1970         ci = cap->ci;
1971
1972         dout(" adding %p ino %llx.%llx cap %p %lld %s\n",
1973              inode, ceph_vinop(inode), cap, cap->cap_id,
1974              ceph_cap_string(cap->issued));
1975         ceph_decode_need(&p, end, sizeof(u64), needmore);
1976         ceph_encode_64(&p, ceph_ino(inode));
1977
1978         dentry = d_find_alias(inode);
1979         if (dentry) {
1980                 path = ceph_mdsc_build_path(dentry, &pathlen, &pathbase, 0);
1981                 if (IS_ERR(path)) {
1982                         err = PTR_ERR(path);
1983                         BUG_ON(err);
1984                 }
1985         } else {
1986                 path = NULL;
1987                 pathlen = 0;
1988         }
1989         ceph_decode_need(&p, end, pathlen+4, needmore);
1990         ceph_encode_string(&p, end, path, pathlen);
1991
1992         ceph_decode_need(&p, end, sizeof(*rec), needmore);
1993         rec = p;
1994         p += sizeof(*rec);
1995         BUG_ON(p > end);
1996         spin_lock(&inode->i_lock);
1997         cap->seq = 0;        /* reset cap seq */
1998         cap->issue_seq = 0;  /* and issue_seq */
1999         rec->cap_id = cpu_to_le64(cap->cap_id);
2000         rec->pathbase = cpu_to_le64(pathbase);
2001         rec->wanted = cpu_to_le32(__ceph_caps_wanted(ci));
2002         rec->issued = cpu_to_le32(cap->issued);
2003         rec->size = cpu_to_le64(inode->i_size);
2004         ceph_encode_timespec(&rec->mtime, &inode->i_mtime);
2005         ceph_encode_timespec(&rec->atime, &inode->i_atime);
2006         rec->snaprealm = cpu_to_le64(ci->i_snap_realm->ino);
2007         spin_unlock(&inode->i_lock);
2008
2009         kfree(path);
2010         dput(dentry);
2011         (*data->num_caps)++;
2012         *(data->pp) = p;
2013         return 0;
2014 needmore:
2015         return -ENOSPC;
2016 }
2017
2018
2019 /*
2020  * If an MDS fails and recovers, clients need to reconnect in order to
2021  * reestablish shared state.  This includes all caps issued through
2022  * this session _and_ the snap_realm hierarchy.  Because it's not
2023  * clear which snap realms the mds cares about, we send everything we
2024  * know about.. that ensures we'll then get any new info the
2025  * recovering MDS might have.
2026  *
2027  * This is a relatively heavyweight operation, but it's rare.
2028  *
2029  * called with mdsc->mutex held.
2030  */
2031 static void send_mds_reconnect(struct ceph_mds_client *mdsc, int mds)
2032 {
2033         struct ceph_mds_session *session;
2034         struct ceph_msg *reply;
2035         int newlen, len = 4 + 1;
2036         void *p, *end;
2037         int err;
2038         int num_caps, num_realms = 0;
2039         int got;
2040         u64 next_snap_ino = 0;
2041         __le32 *pnum_caps, *pnum_realms;
2042         struct encode_caps_data iter_args;
2043
2044         pr_info("reconnect to recovering mds%d\n", mds);
2045
2046         /* find session */
2047         session = __ceph_lookup_mds_session(mdsc, mds);
2048         mutex_unlock(&mdsc->mutex);    /* drop lock for duration */
2049
2050         if (session) {
2051                 mutex_lock(&session->s_mutex);
2052
2053                 session->s_state = CEPH_MDS_SESSION_RECONNECTING;
2054                 session->s_seq = 0;
2055                 session->s_recon_gen++;
2056
2057                 ceph_con_open(&session->s_con,
2058                               ceph_mdsmap_get_addr(mdsc->mdsmap, mds));
2059
2060                 /* replay unsafe requests */
2061                 replay_unsafe_requests(mdsc, session);
2062
2063                 /* estimate needed space */
2064                 len += session->s_nr_caps *
2065                         (100+sizeof(struct ceph_mds_cap_reconnect));
2066                 pr_info("estimating i need %d bytes for %d caps\n",
2067                      len, session->s_nr_caps);
2068         } else {
2069                 dout("no session for mds%d, will send short reconnect\n",
2070                      mds);
2071         }
2072
2073         down_read(&mdsc->snap_rwsem);
2074
2075 retry:
2076         /* build reply */
2077         reply = ceph_msg_new(CEPH_MSG_CLIENT_RECONNECT, len, 0, 0, NULL);
2078         if (IS_ERR(reply)) {
2079                 err = PTR_ERR(reply);
2080                 pr_err("send_mds_reconnect ENOMEM on %d for mds%d\n",
2081                        len, mds);
2082                 goto out;
2083         }
2084         p = reply->front.iov_base;
2085         end = p + len;
2086
2087         if (!session) {
2088                 ceph_encode_8(&p, 1); /* session was closed */
2089                 ceph_encode_32(&p, 0);
2090                 goto send;
2091         }
2092         dout("session %p state %s\n", session,
2093              session_state_name(session->s_state));
2094
2095         /* traverse this session's caps */
2096         ceph_encode_8(&p, 0);
2097         pnum_caps = p;
2098         ceph_encode_32(&p, session->s_nr_caps);
2099         num_caps = 0;
2100
2101         iter_args.pp = &p;
2102         iter_args.end = end;
2103         iter_args.num_caps = &num_caps;
2104         err = iterate_session_caps(session, encode_caps_cb, &iter_args);
2105         if (err == -ENOSPC)
2106                 goto needmore;
2107         if (err < 0)
2108                 goto out;
2109         *pnum_caps = cpu_to_le32(num_caps);
2110
2111         /*
2112          * snaprealms.  we provide mds with the ino, seq (version), and
2113          * parent for all of our realms.  If the mds has any newer info,
2114          * it will tell us.
2115          */
2116         next_snap_ino = 0;
2117         /* save some space for the snaprealm count */
2118         pnum_realms = p;
2119         ceph_decode_need(&p, end, sizeof(*pnum_realms), needmore);
2120         p += sizeof(*pnum_realms);
2121         num_realms = 0;
2122         while (1) {
2123                 struct ceph_snap_realm *realm;
2124                 struct ceph_mds_snaprealm_reconnect *sr_rec;
2125                 got = radix_tree_gang_lookup(&mdsc->snap_realms,
2126                                              (void **)&realm, next_snap_ino, 1);
2127                 if (!got)
2128                         break;
2129
2130                 dout(" adding snap realm %llx seq %lld parent %llx\n",
2131                      realm->ino, realm->seq, realm->parent_ino);
2132                 ceph_decode_need(&p, end, sizeof(*sr_rec), needmore);
2133                 sr_rec = p;
2134                 sr_rec->ino = cpu_to_le64(realm->ino);
2135                 sr_rec->seq = cpu_to_le64(realm->seq);
2136                 sr_rec->parent = cpu_to_le64(realm->parent_ino);
2137                 p += sizeof(*sr_rec);
2138                 num_realms++;
2139                 next_snap_ino = realm->ino + 1;
2140         }
2141         *pnum_realms = cpu_to_le32(num_realms);
2142
2143 send:
2144         reply->front.iov_len = p - reply->front.iov_base;
2145         reply->hdr.front_len = cpu_to_le32(reply->front.iov_len);
2146         dout("final len was %u (guessed %d)\n",
2147              (unsigned)reply->front.iov_len, len);
2148         ceph_con_send(&session->s_con, reply);
2149
2150         if (session) {
2151                 session->s_state = CEPH_MDS_SESSION_OPEN;
2152                 __wake_requests(mdsc, &session->s_waiting);
2153         }
2154
2155 out:
2156         up_read(&mdsc->snap_rwsem);
2157         if (session) {
2158                 mutex_unlock(&session->s_mutex);
2159                 ceph_put_mds_session(session);
2160         }
2161         mutex_lock(&mdsc->mutex);
2162         return;
2163
2164 needmore:
2165         /*
2166          * we need a larger buffer.  this doesn't very accurately
2167          * factor in snap realms, but it's safe.
2168          */
2169         num_caps += num_realms;
2170         newlen = len * ((100 * (session->s_nr_caps+3)) / (num_caps + 1)) / 100;
2171         pr_info("i guessed %d, and did %d of %d caps, retrying with %d\n",
2172              len, num_caps, session->s_nr_caps, newlen);
2173         len = newlen;
2174         ceph_msg_put(reply);
2175         goto retry;
2176 }
2177
2178
2179 /*
2180  * compare old and new mdsmaps, kicking requests
2181  * and closing out old connections as necessary
2182  *
2183  * called under mdsc->mutex.
2184  */
2185 static void check_new_map(struct ceph_mds_client *mdsc,
2186                           struct ceph_mdsmap *newmap,
2187                           struct ceph_mdsmap *oldmap)
2188 {
2189         int i;
2190         int oldstate, newstate;
2191         struct ceph_mds_session *s;
2192
2193         dout("check_new_map new %u old %u\n",
2194              newmap->m_epoch, oldmap->m_epoch);
2195
2196         for (i = 0; i < oldmap->m_max_mds && i < mdsc->max_sessions; i++) {
2197                 if (mdsc->sessions[i] == NULL)
2198                         continue;
2199                 s = mdsc->sessions[i];
2200                 oldstate = ceph_mdsmap_get_state(oldmap, i);
2201                 newstate = ceph_mdsmap_get_state(newmap, i);
2202
2203                 dout("check_new_map mds%d state %s -> %s (session %s)\n",
2204                      i, ceph_mds_state_name(oldstate),
2205                      ceph_mds_state_name(newstate),
2206                      session_state_name(s->s_state));
2207
2208                 if (memcmp(ceph_mdsmap_get_addr(oldmap, i),
2209                            ceph_mdsmap_get_addr(newmap, i),
2210                            sizeof(struct ceph_entity_addr))) {
2211                         if (s->s_state == CEPH_MDS_SESSION_OPENING) {
2212                                 /* the session never opened, just close it
2213                                  * out now */
2214                                 __wake_requests(mdsc, &s->s_waiting);
2215                                 unregister_session(mdsc, i);
2216                         } else {
2217                                 /* just close it */
2218                                 mutex_unlock(&mdsc->mutex);
2219                                 mutex_lock(&s->s_mutex);
2220                                 mutex_lock(&mdsc->mutex);
2221                                 ceph_con_close(&s->s_con);
2222                                 mutex_unlock(&s->s_mutex);
2223                                 s->s_state = CEPH_MDS_SESSION_RESTARTING;
2224                         }
2225
2226                         /* kick any requests waiting on the recovering mds */
2227                         kick_requests(mdsc, i, 1);
2228                 } else if (oldstate == newstate) {
2229                         continue;  /* nothing new with this mds */
2230                 }
2231
2232                 /*
2233                  * send reconnect?
2234                  */
2235                 if (s->s_state == CEPH_MDS_SESSION_RESTARTING &&
2236                     newstate >= CEPH_MDS_STATE_RECONNECT)
2237                         send_mds_reconnect(mdsc, i);
2238
2239                 /*
2240                  * kick requests on any mds that has gone active.
2241                  *
2242                  * kick requests on cur or forwarder: we may have sent
2243                  * the request to mds1, mds1 told us it forwarded it
2244                  * to mds2, but then we learn mds1 failed and can't be
2245                  * sure it successfully forwarded our request before
2246                  * it died.
2247                  */
2248                 if (oldstate < CEPH_MDS_STATE_ACTIVE &&
2249                     newstate >= CEPH_MDS_STATE_ACTIVE) {
2250                         kick_requests(mdsc, i, 1);
2251                         ceph_kick_flushing_caps(mdsc, s);
2252                 }
2253         }
2254 }
2255
2256
2257
2258 /*
2259  * leases
2260  */
2261
2262 /*
2263  * caller must hold session s_mutex, dentry->d_lock
2264  */
2265 void __ceph_mdsc_drop_dentry_lease(struct dentry *dentry)
2266 {
2267         struct ceph_dentry_info *di = ceph_dentry(dentry);
2268
2269         ceph_put_mds_session(di->lease_session);
2270         di->lease_session = NULL;
2271 }
2272
2273 static void handle_lease(struct ceph_mds_client *mdsc, struct ceph_msg *msg)
2274 {
2275         struct super_block *sb = mdsc->client->sb;
2276         struct inode *inode;
2277         struct ceph_mds_session *session;
2278         struct ceph_inode_info *ci;
2279         struct dentry *parent, *dentry;
2280         struct ceph_dentry_info *di;
2281         int mds;
2282         struct ceph_mds_lease *h = msg->front.iov_base;
2283         struct ceph_vino vino;
2284         int mask;
2285         struct qstr dname;
2286         int release = 0;
2287
2288         if (msg->hdr.src.name.type != CEPH_ENTITY_TYPE_MDS)
2289                 return;
2290         mds = le64_to_cpu(msg->hdr.src.name.num);
2291         dout("handle_lease from mds%d\n", mds);
2292
2293         /* decode */
2294         if (msg->front.iov_len < sizeof(*h) + sizeof(u32))
2295                 goto bad;
2296         vino.ino = le64_to_cpu(h->ino);
2297         vino.snap = CEPH_NOSNAP;
2298         mask = le16_to_cpu(h->mask);
2299         dname.name = (void *)h + sizeof(*h) + sizeof(u32);
2300         dname.len = msg->front.iov_len - sizeof(*h) - sizeof(u32);
2301         if (dname.len != get_unaligned_le32(h+1))
2302                 goto bad;
2303
2304         /* find session */
2305         mutex_lock(&mdsc->mutex);
2306         session = __ceph_lookup_mds_session(mdsc, mds);
2307         mutex_unlock(&mdsc->mutex);
2308         if (!session) {
2309                 pr_err("handle_lease got lease but no session mds%d\n", mds);
2310                 return;
2311         }
2312
2313         mutex_lock(&session->s_mutex);
2314         session->s_seq++;
2315
2316         /* lookup inode */
2317         inode = ceph_find_inode(sb, vino);
2318         dout("handle_lease '%s', mask %d, ino %llx %p\n",
2319              ceph_lease_op_name(h->action), mask, vino.ino, inode);
2320         if (inode == NULL) {
2321                 dout("handle_lease no inode %llx\n", vino.ino);
2322                 goto release;
2323         }
2324         ci = ceph_inode(inode);
2325
2326         /* dentry */
2327         parent = d_find_alias(inode);
2328         if (!parent) {
2329                 dout("no parent dentry on inode %p\n", inode);
2330                 WARN_ON(1);
2331                 goto release;  /* hrm... */
2332         }
2333         dname.hash = full_name_hash(dname.name, dname.len);
2334         dentry = d_lookup(parent, &dname);
2335         dput(parent);
2336         if (!dentry)
2337                 goto release;
2338
2339         spin_lock(&dentry->d_lock);
2340         di = ceph_dentry(dentry);
2341         switch (h->action) {
2342         case CEPH_MDS_LEASE_REVOKE:
2343                 if (di && di->lease_session == session) {
2344                         h->seq = cpu_to_le32(di->lease_seq);
2345                         __ceph_mdsc_drop_dentry_lease(dentry);
2346                 }
2347                 release = 1;
2348                 break;
2349
2350         case CEPH_MDS_LEASE_RENEW:
2351                 if (di && di->lease_session == session &&
2352                     di->lease_gen == session->s_cap_gen &&
2353                     di->lease_renew_from &&
2354                     di->lease_renew_after == 0) {
2355                         unsigned long duration =
2356                                 le32_to_cpu(h->duration_ms) * HZ / 1000;
2357
2358                         di->lease_seq = le32_to_cpu(h->seq);
2359                         dentry->d_time = di->lease_renew_from + duration;
2360                         di->lease_renew_after = di->lease_renew_from +
2361                                 (duration >> 1);
2362                         di->lease_renew_from = 0;
2363                 }
2364                 break;
2365         }
2366         spin_unlock(&dentry->d_lock);
2367         dput(dentry);
2368
2369         if (!release)
2370                 goto out;
2371
2372 release:
2373         /* let's just reuse the same message */
2374         h->action = CEPH_MDS_LEASE_REVOKE_ACK;
2375         ceph_msg_get(msg);
2376         ceph_con_send(&session->s_con, msg);
2377
2378 out:
2379         iput(inode);
2380         mutex_unlock(&session->s_mutex);
2381         ceph_put_mds_session(session);
2382         return;
2383
2384 bad:
2385         pr_err("corrupt lease message\n");
2386 }
2387
2388 void ceph_mdsc_lease_send_msg(struct ceph_mds_session *session,
2389                               struct inode *inode,
2390                               struct dentry *dentry, char action,
2391                               u32 seq)
2392 {
2393         struct ceph_msg *msg;
2394         struct ceph_mds_lease *lease;
2395         int len = sizeof(*lease) + sizeof(u32);
2396         int dnamelen = 0;
2397
2398         dout("lease_send_msg inode %p dentry %p %s to mds%d\n",
2399              inode, dentry, ceph_lease_op_name(action), session->s_mds);
2400         dnamelen = dentry->d_name.len;
2401         len += dnamelen;
2402
2403         msg = ceph_msg_new(CEPH_MSG_CLIENT_LEASE, len, 0, 0, NULL);
2404         if (IS_ERR(msg))
2405                 return;
2406         lease = msg->front.iov_base;
2407         lease->action = action;
2408         lease->mask = cpu_to_le16(CEPH_LOCK_DN);
2409         lease->ino = cpu_to_le64(ceph_vino(inode).ino);
2410         lease->first = lease->last = cpu_to_le64(ceph_vino(inode).snap);
2411         lease->seq = cpu_to_le32(seq);
2412         put_unaligned_le32(dnamelen, lease + 1);
2413         memcpy((void *)(lease + 1) + 4, dentry->d_name.name, dnamelen);
2414
2415         /*
2416          * if this is a preemptive lease RELEASE, no need to
2417          * flush request stream, since the actual request will
2418          * soon follow.
2419          */
2420         msg->more_to_follow = (action == CEPH_MDS_LEASE_RELEASE);
2421
2422         ceph_con_send(&session->s_con, msg);
2423 }
2424
2425 /*
2426  * Preemptively release a lease we expect to invalidate anyway.
2427  * Pass @inode always, @dentry is optional.
2428  */
2429 void ceph_mdsc_lease_release(struct ceph_mds_client *mdsc, struct inode *inode,
2430                              struct dentry *dentry, int mask)
2431 {
2432         struct ceph_dentry_info *di;
2433         struct ceph_mds_session *session;
2434         u32 seq;
2435
2436         BUG_ON(inode == NULL);
2437         BUG_ON(dentry == NULL);
2438         BUG_ON(mask != CEPH_LOCK_DN);
2439
2440         /* is dentry lease valid? */
2441         spin_lock(&dentry->d_lock);
2442         di = ceph_dentry(dentry);
2443         if (!di || !di->lease_session ||
2444             di->lease_session->s_mds < 0 ||
2445             di->lease_gen != di->lease_session->s_cap_gen ||
2446             !time_before(jiffies, dentry->d_time)) {
2447                 dout("lease_release inode %p dentry %p -- "
2448                      "no lease on %d\n",
2449                      inode, dentry, mask);
2450                 spin_unlock(&dentry->d_lock);
2451                 return;
2452         }
2453
2454         /* we do have a lease on this dentry; note mds and seq */
2455         session = ceph_get_mds_session(di->lease_session);
2456         seq = di->lease_seq;
2457         __ceph_mdsc_drop_dentry_lease(dentry);
2458         spin_unlock(&dentry->d_lock);
2459
2460         dout("lease_release inode %p dentry %p mask %d to mds%d\n",
2461              inode, dentry, mask, session->s_mds);
2462         ceph_mdsc_lease_send_msg(session, inode, dentry,
2463                                  CEPH_MDS_LEASE_RELEASE, seq);
2464         ceph_put_mds_session(session);
2465 }
2466
2467 /*
2468  * drop all leases (and dentry refs) in preparation for umount
2469  */
2470 static void drop_leases(struct ceph_mds_client *mdsc)
2471 {
2472         int i;
2473
2474         dout("drop_leases\n");
2475         mutex_lock(&mdsc->mutex);
2476         for (i = 0; i < mdsc->max_sessions; i++) {
2477                 struct ceph_mds_session *s = __ceph_lookup_mds_session(mdsc, i);
2478                 if (!s)
2479                         continue;
2480                 mutex_unlock(&mdsc->mutex);
2481                 mutex_lock(&s->s_mutex);
2482                 mutex_unlock(&s->s_mutex);
2483                 ceph_put_mds_session(s);
2484                 mutex_lock(&mdsc->mutex);
2485         }
2486         mutex_unlock(&mdsc->mutex);
2487 }
2488
2489
2490
2491 /*
2492  * delayed work -- periodically trim expired leases, renew caps with mds
2493  */
2494 static void schedule_delayed(struct ceph_mds_client *mdsc)
2495 {
2496         int delay = 5;
2497         unsigned hz = round_jiffies_relative(HZ * delay);
2498         schedule_delayed_work(&mdsc->delayed_work, hz);
2499 }
2500
2501 static void delayed_work(struct work_struct *work)
2502 {
2503         int i;
2504         struct ceph_mds_client *mdsc =
2505                 container_of(work, struct ceph_mds_client, delayed_work.work);
2506         int renew_interval;
2507         int renew_caps;
2508
2509         dout("mdsc delayed_work\n");
2510         ceph_check_delayed_caps(mdsc);
2511
2512         mutex_lock(&mdsc->mutex);
2513         renew_interval = mdsc->mdsmap->m_session_timeout >> 2;
2514         renew_caps = time_after_eq(jiffies, HZ*renew_interval +
2515                                    mdsc->last_renew_caps);
2516         if (renew_caps)
2517                 mdsc->last_renew_caps = jiffies;
2518
2519         for (i = 0; i < mdsc->max_sessions; i++) {
2520                 struct ceph_mds_session *s = __ceph_lookup_mds_session(mdsc, i);
2521                 if (s == NULL)
2522                         continue;
2523                 if (s->s_state == CEPH_MDS_SESSION_CLOSING) {
2524                         dout("resending session close request for mds%d\n",
2525                              s->s_mds);
2526                         request_close_session(mdsc, s);
2527                         ceph_put_mds_session(s);
2528                         continue;
2529                 }
2530                 if (s->s_ttl && time_after(jiffies, s->s_ttl)) {
2531                         if (s->s_state == CEPH_MDS_SESSION_OPEN) {
2532                                 s->s_state = CEPH_MDS_SESSION_HUNG;
2533                                 pr_info("mds%d hung\n", s->s_mds);
2534                         }
2535                 }
2536                 if (s->s_state < CEPH_MDS_SESSION_OPEN) {
2537                         /* this mds is failed or recovering, just wait */
2538                         ceph_put_mds_session(s);
2539                         continue;
2540                 }
2541                 mutex_unlock(&mdsc->mutex);
2542
2543                 mutex_lock(&s->s_mutex);
2544                 if (renew_caps)
2545                         send_renew_caps(mdsc, s);
2546                 else
2547                         ceph_con_keepalive(&s->s_con);
2548                 add_cap_releases(mdsc, s, -1);
2549                 send_cap_releases(mdsc, s);
2550                 mutex_unlock(&s->s_mutex);
2551                 ceph_put_mds_session(s);
2552
2553                 mutex_lock(&mdsc->mutex);
2554         }
2555         mutex_unlock(&mdsc->mutex);
2556
2557         schedule_delayed(mdsc);
2558 }
2559
2560
2561 void ceph_mdsc_init(struct ceph_mds_client *mdsc, struct ceph_client *client)
2562 {
2563         mdsc->client = client;
2564         mutex_init(&mdsc->mutex);
2565         mdsc->mdsmap = kzalloc(sizeof(*mdsc->mdsmap), GFP_NOFS);
2566         init_completion(&mdsc->safe_umount_waiters);
2567         init_completion(&mdsc->session_close_waiters);
2568         INIT_LIST_HEAD(&mdsc->waiting_for_map);
2569         mdsc->sessions = NULL;
2570         mdsc->max_sessions = 0;
2571         mdsc->stopping = 0;
2572         init_rwsem(&mdsc->snap_rwsem);
2573         INIT_RADIX_TREE(&mdsc->snap_realms, GFP_NOFS);
2574         INIT_LIST_HEAD(&mdsc->snap_empty);
2575         spin_lock_init(&mdsc->snap_empty_lock);
2576         mdsc->last_tid = 0;
2577         INIT_RADIX_TREE(&mdsc->request_tree, GFP_NOFS);
2578         INIT_DELAYED_WORK(&mdsc->delayed_work, delayed_work);
2579         mdsc->last_renew_caps = jiffies;
2580         INIT_LIST_HEAD(&mdsc->cap_delay_list);
2581         spin_lock_init(&mdsc->cap_delay_lock);
2582         INIT_LIST_HEAD(&mdsc->snap_flush_list);
2583         spin_lock_init(&mdsc->snap_flush_lock);
2584         mdsc->cap_flush_seq = 0;
2585         INIT_LIST_HEAD(&mdsc->cap_dirty);
2586         mdsc->num_cap_flushing = 0;
2587         spin_lock_init(&mdsc->cap_dirty_lock);
2588         init_waitqueue_head(&mdsc->cap_flushing_wq);
2589         spin_lock_init(&mdsc->dentry_lru_lock);
2590         INIT_LIST_HEAD(&mdsc->dentry_lru);
2591 }
2592
2593 /*
2594  * Wait for safe replies on open mds requests.  If we time out, drop
2595  * all requests from the tree to avoid dangling dentry refs.
2596  */
2597 static void wait_requests(struct ceph_mds_client *mdsc)
2598 {
2599         struct ceph_mds_request *req;
2600         struct ceph_client *client = mdsc->client;
2601
2602         mutex_lock(&mdsc->mutex);
2603         if (__get_oldest_tid(mdsc)) {
2604                 mutex_unlock(&mdsc->mutex);
2605                 dout("wait_requests waiting for requests\n");
2606                 wait_for_completion_timeout(&mdsc->safe_umount_waiters,
2607                                     client->mount_args->mount_timeout * HZ);
2608                 mutex_lock(&mdsc->mutex);
2609
2610                 /* tear down remaining requests */
2611                 while (radix_tree_gang_lookup(&mdsc->request_tree,
2612                                               (void **)&req, 0, 1)) {
2613                         dout("wait_requests timed out on tid %llu\n",
2614                              req->r_tid);
2615                         radix_tree_delete(&mdsc->request_tree, req->r_tid);
2616                         ceph_mdsc_put_request(req);
2617                 }
2618         }
2619         mutex_unlock(&mdsc->mutex);
2620         dout("wait_requests done\n");
2621 }
2622
2623 /*
2624  * called before mount is ro, and before dentries are torn down.
2625  * (hmm, does this still race with new lookups?)
2626  */
2627 void ceph_mdsc_pre_umount(struct ceph_mds_client *mdsc)
2628 {
2629         dout("pre_umount\n");
2630         mdsc->stopping = 1;
2631
2632         drop_leases(mdsc);
2633         ceph_flush_dirty_caps(mdsc);
2634         wait_requests(mdsc);
2635 }
2636
2637 /*
2638  * wait for all write mds requests to flush.
2639  */
2640 static void wait_unsafe_requests(struct ceph_mds_client *mdsc, u64 want_tid)
2641 {
2642         struct ceph_mds_request *req;
2643         u64 next_tid = 0;
2644         int got;
2645
2646         mutex_lock(&mdsc->mutex);
2647         dout("wait_unsafe_requests want %lld\n", want_tid);
2648         while (1) {
2649                 got = radix_tree_gang_lookup(&mdsc->request_tree, (void **)&req,
2650                                              next_tid, 1);
2651                 if (!got)
2652                         break;
2653                 if (req->r_tid > want_tid)
2654                         break;
2655
2656                 next_tid = req->r_tid + 1;
2657                 if ((req->r_op & CEPH_MDS_OP_WRITE) == 0)
2658                         continue;  /* not a write op */
2659
2660                 ceph_mdsc_get_request(req);
2661                 mutex_unlock(&mdsc->mutex);
2662                 dout("wait_unsafe_requests  wait on %llu (want %llu)\n",
2663                      req->r_tid, want_tid);
2664                 wait_for_completion(&req->r_safe_completion);
2665                 mutex_lock(&mdsc->mutex);
2666                 ceph_mdsc_put_request(req);
2667         }
2668         mutex_unlock(&mdsc->mutex);
2669         dout("wait_unsafe_requests done\n");
2670 }
2671
2672 void ceph_mdsc_sync(struct ceph_mds_client *mdsc)
2673 {
2674         u64 want_tid, want_flush;
2675
2676         dout("sync\n");
2677         mutex_lock(&mdsc->mutex);
2678         want_tid = mdsc->last_tid;
2679         want_flush = mdsc->cap_flush_seq;
2680         mutex_unlock(&mdsc->mutex);
2681         dout("sync want tid %lld flush_seq %lld\n", want_tid, want_flush);
2682
2683         ceph_flush_dirty_caps(mdsc);
2684
2685         wait_unsafe_requests(mdsc, want_tid);
2686         wait_event(mdsc->cap_flushing_wq, check_cap_flush(mdsc, want_flush));
2687 }
2688
2689
2690 /*
2691  * called after sb is ro.
2692  */
2693 void ceph_mdsc_close_sessions(struct ceph_mds_client *mdsc)
2694 {
2695         struct ceph_mds_session *session;
2696         int i;
2697         int n;
2698         struct ceph_client *client = mdsc->client;
2699         unsigned long started, timeout = client->mount_args->mount_timeout * HZ;
2700
2701         dout("close_sessions\n");
2702
2703         mutex_lock(&mdsc->mutex);
2704
2705         /* close sessions */
2706         started = jiffies;
2707         while (time_before(jiffies, started + timeout)) {
2708                 dout("closing sessions\n");
2709                 n = 0;
2710                 for (i = 0; i < mdsc->max_sessions; i++) {
2711                         session = __ceph_lookup_mds_session(mdsc, i);
2712                         if (!session)
2713                                 continue;
2714                         mutex_unlock(&mdsc->mutex);
2715                         mutex_lock(&session->s_mutex);
2716                         __close_session(mdsc, session);
2717                         mutex_unlock(&session->s_mutex);
2718                         ceph_put_mds_session(session);
2719                         mutex_lock(&mdsc->mutex);
2720                         n++;
2721                 }
2722                 if (n == 0)
2723                         break;
2724
2725                 if (client->mount_state == CEPH_MOUNT_SHUTDOWN)
2726                         break;
2727
2728                 dout("waiting for sessions to close\n");
2729                 mutex_unlock(&mdsc->mutex);
2730                 wait_for_completion_timeout(&mdsc->session_close_waiters,
2731                                             timeout);
2732                 mutex_lock(&mdsc->mutex);
2733         }
2734
2735         /* tear down remaining sessions */
2736         for (i = 0; i < mdsc->max_sessions; i++) {
2737                 if (mdsc->sessions[i]) {
2738                         session = get_session(mdsc->sessions[i]);
2739                         unregister_session(mdsc, i);
2740                         mutex_unlock(&mdsc->mutex);
2741                         mutex_lock(&session->s_mutex);
2742                         remove_session_caps(session);
2743                         mutex_unlock(&session->s_mutex);
2744                         ceph_put_mds_session(session);
2745                         mutex_lock(&mdsc->mutex);
2746                 }
2747         }
2748
2749         WARN_ON(!list_empty(&mdsc->cap_delay_list));
2750
2751         mutex_unlock(&mdsc->mutex);
2752
2753         ceph_cleanup_empty_realms(mdsc);
2754
2755         cancel_delayed_work_sync(&mdsc->delayed_work); /* cancel timer */
2756
2757         dout("stopped\n");
2758 }
2759
2760 void ceph_mdsc_stop(struct ceph_mds_client *mdsc)
2761 {
2762         dout("stop\n");
2763         cancel_delayed_work_sync(&mdsc->delayed_work); /* cancel timer */
2764         if (mdsc->mdsmap)
2765                 ceph_mdsmap_destroy(mdsc->mdsmap);
2766         kfree(mdsc->sessions);
2767 }
2768
2769
2770 /*
2771  * handle mds map update.
2772  */
2773 void ceph_mdsc_handle_map(struct ceph_mds_client *mdsc, struct ceph_msg *msg)
2774 {
2775         u32 epoch;
2776         u32 maplen;
2777         void *p = msg->front.iov_base;
2778         void *end = p + msg->front.iov_len;
2779         struct ceph_mdsmap *newmap, *oldmap;
2780         struct ceph_fsid fsid;
2781         int err = -EINVAL;
2782
2783         ceph_decode_need(&p, end, sizeof(fsid)+2*sizeof(u32), bad);
2784         ceph_decode_copy(&p, &fsid, sizeof(fsid));
2785         if (ceph_fsid_compare(&fsid, &mdsc->client->monc.monmap->fsid)) {
2786                 pr_err("got mdsmap with wrong fsid\n");
2787                 return;
2788         }
2789         epoch = ceph_decode_32(&p);
2790         maplen = ceph_decode_32(&p);
2791         dout("handle_map epoch %u len %d\n", epoch, (int)maplen);
2792
2793         /* do we need it? */
2794         ceph_monc_got_mdsmap(&mdsc->client->monc, epoch);
2795         mutex_lock(&mdsc->mutex);
2796         if (mdsc->mdsmap && epoch <= mdsc->mdsmap->m_epoch) {
2797                 dout("handle_map epoch %u <= our %u\n",
2798                      epoch, mdsc->mdsmap->m_epoch);
2799                 mutex_unlock(&mdsc->mutex);
2800                 return;
2801         }
2802
2803         newmap = ceph_mdsmap_decode(&p, end);
2804         if (IS_ERR(newmap)) {
2805                 err = PTR_ERR(newmap);
2806                 goto bad_unlock;
2807         }
2808
2809         /* swap into place */
2810         if (mdsc->mdsmap) {
2811                 oldmap = mdsc->mdsmap;
2812                 mdsc->mdsmap = newmap;
2813                 check_new_map(mdsc, newmap, oldmap);
2814                 ceph_mdsmap_destroy(oldmap);
2815         } else {
2816                 mdsc->mdsmap = newmap;  /* first mds map */
2817         }
2818         mdsc->client->sb->s_maxbytes = mdsc->mdsmap->m_max_file_size;
2819
2820         __wake_requests(mdsc, &mdsc->waiting_for_map);
2821
2822         mutex_unlock(&mdsc->mutex);
2823         schedule_delayed(mdsc);
2824         return;
2825
2826 bad_unlock:
2827         mutex_unlock(&mdsc->mutex);
2828 bad:
2829         pr_err("error decoding mdsmap %d\n", err);
2830         return;
2831 }
2832
2833 static struct ceph_connection *con_get(struct ceph_connection *con)
2834 {
2835         struct ceph_mds_session *s = con->private;
2836
2837         if (get_session(s)) {
2838                 dout("mdsc con_get %p %d -> %d\n", s,
2839                      atomic_read(&s->s_ref) - 1, atomic_read(&s->s_ref));
2840                 return con;
2841         }
2842         dout("mdsc con_get %p FAIL\n", s);
2843         return NULL;
2844 }
2845
2846 static void con_put(struct ceph_connection *con)
2847 {
2848         struct ceph_mds_session *s = con->private;
2849
2850         dout("mdsc con_put %p %d -> %d\n", s, atomic_read(&s->s_ref),
2851              atomic_read(&s->s_ref) - 1);
2852         ceph_put_mds_session(s);
2853 }
2854
2855 /*
2856  * if the client is unresponsive for long enough, the mds will kill
2857  * the session entirely.
2858  */
2859 static void peer_reset(struct ceph_connection *con)
2860 {
2861         struct ceph_mds_session *s = con->private;
2862
2863         pr_err("mds%d gave us the boot.  IMPLEMENT RECONNECT.\n",
2864                s->s_mds);
2865 }
2866
2867 static void dispatch(struct ceph_connection *con, struct ceph_msg *msg)
2868 {
2869         struct ceph_mds_session *s = con->private;
2870         struct ceph_mds_client *mdsc = s->s_mdsc;
2871         int type = le16_to_cpu(msg->hdr.type);
2872
2873         switch (type) {
2874         case CEPH_MSG_MDS_MAP:
2875                 ceph_mdsc_handle_map(mdsc, msg);
2876                 break;
2877         case CEPH_MSG_CLIENT_SESSION:
2878                 handle_session(s, msg);
2879                 break;
2880         case CEPH_MSG_CLIENT_REPLY:
2881                 handle_reply(s, msg);
2882                 break;
2883         case CEPH_MSG_CLIENT_REQUEST_FORWARD:
2884                 handle_forward(mdsc, msg);
2885                 break;
2886         case CEPH_MSG_CLIENT_CAPS:
2887                 ceph_handle_caps(s, msg);
2888                 break;
2889         case CEPH_MSG_CLIENT_SNAP:
2890                 ceph_handle_snap(mdsc, msg);
2891                 break;
2892         case CEPH_MSG_CLIENT_LEASE:
2893                 handle_lease(mdsc, msg);
2894                 break;
2895
2896         default:
2897                 pr_err("received unknown message type %d %s\n", type,
2898                        ceph_msg_type_name(type));
2899         }
2900         ceph_msg_put(msg);
2901 }
2902
2903 const static struct ceph_connection_operations mds_con_ops = {
2904         .get = con_get,
2905         .put = con_put,
2906         .dispatch = dispatch,
2907         .peer_reset = peer_reset,
2908         .alloc_msg = ceph_alloc_msg,
2909         .alloc_middle = ceph_alloc_middle,
2910 };
2911
2912
2913
2914
2915 /* eof */