ceph: fix debugfs entry, simplify fsid checks
[linux-3.10.git] / fs / ceph / osd_client.c
index 0a25405..d63f192 100644 (file)
@@ -11,6 +11,7 @@
 #include "osd_client.h"
 #include "messenger.h"
 #include "decode.h"
+#include "auth.h"
 
 const static struct ceph_connection_operations osd_con_ops;
 
@@ -331,6 +332,7 @@ static struct ceph_osd *create_osd(struct ceph_osd_client *osdc)
        osd->o_con.private = osd;
        osd->o_con.ops = &osd_con_ops;
        osd->o_con.peer_name.type = CEPH_ENTITY_TYPE_OSD;
+
        return osd;
 }
 
@@ -350,10 +352,8 @@ static void put_osd(struct ceph_osd *osd)
 {
        dout("put_osd %p %d -> %d\n", osd, atomic_read(&osd->o_ref),
             atomic_read(&osd->o_ref) - 1);
-       if (atomic_dec_and_test(&osd->o_ref)) {
-               ceph_con_shutdown(&osd->o_con);
+       if (atomic_dec_and_test(&osd->o_ref))
                kfree(osd);
-       }
 }
 
 /*
@@ -444,7 +444,7 @@ static void register_request(struct ceph_osd_client *osdc,
        osdc->num_requests++;
 
        req->r_timeout_stamp =
-               jiffies + osdc->client->mount_args.osd_timeout*HZ;
+               jiffies + osdc->client->mount_args->osd_timeout*HZ;
 
        if (osdc->num_requests == 1) {
                osdc->timeout_tid = req->r_tid;
@@ -520,7 +520,7 @@ static int __map_osds(struct ceph_osd_client *osdc,
                      struct ceph_osd_request *req)
 {
        struct ceph_osd_request_head *reqhead = req->r_request->front.iov_base;
-       union ceph_pg pgid;
+       struct ceph_pg pgid;
        int o = -1;
        int err;
        struct ceph_osd *newosd = NULL;
@@ -530,7 +530,7 @@ static int __map_osds(struct ceph_osd_client *osdc,
                                      &req->r_file_layout, osdc->osdmap);
        if (err)
                return err;
-       pgid.pg64 = le64_to_cpu(reqhead->layout.ol_pgid);
+       pgid = reqhead->layout.ol_pgid;
        o = ceph_calc_pg_primary(osdc->osdmap, pgid);
 
        if ((req->r_osd && req->r_osd->o_osd == o &&
@@ -538,8 +538,8 @@ static int __map_osds(struct ceph_osd_client *osdc,
            (req->r_osd == NULL && o == -1))
                return 0;  /* no change */
 
-       dout("map_osds tid %llu pgid %llx pool %d osd%d (was osd%d)\n",
-            req->r_tid, pgid.pg64, pgid.pg.pool, o,
+       dout("map_osds tid %llu pgid %d.%x osd%d (was osd%d)\n",
+            req->r_tid, le32_to_cpu(pgid.pool), le16_to_cpu(pgid.ps), o,
             req->r_osd ? req->r_osd->o_osd : -1);
 
        if (req->r_osd) {
@@ -609,7 +609,7 @@ static int __send_request(struct ceph_osd_client *osdc,
        reqhead->flags |= cpu_to_le32(req->r_flags);  /* e.g., RETRY */
        reqhead->reassert_version = req->r_reassert_version;
 
-       req->r_timeout_stamp = jiffies+osdc->client->mount_args.osd_timeout*HZ;
+       req->r_timeout_stamp = jiffies+osdc->client->mount_args->osd_timeout*HZ;
 
        ceph_msg_get(req->r_request); /* send consumes a ref */
        ceph_con_send(&req->r_osd->o_con, req->r_request);
@@ -632,7 +632,7 @@ static void handle_timeout(struct work_struct *work)
                container_of(work, struct ceph_osd_client, timeout_work.work);
        struct ceph_osd_request *req;
        struct ceph_osd *osd;
-       unsigned long timeout = osdc->client->mount_args.osd_timeout * HZ;
+       unsigned long timeout = osdc->client->mount_args->osd_timeout * HZ;
        unsigned long next_timeout = timeout + jiffies;
        struct rb_node *p;
 
@@ -882,10 +882,8 @@ void ceph_osdc_handle_map(struct ceph_osd_client *osdc, struct ceph_msg *msg)
        /* verify fsid */
        ceph_decode_need(&p, end, sizeof(fsid), bad);
        ceph_decode_copy(&p, &fsid, sizeof(fsid));
-       if (ceph_fsid_compare(&fsid, &osdc->client->monc.monmap->fsid)) {
-               pr_err("got osdmap with wrong fsid, ignoring\n");
+       if (ceph_check_fsid(osdc->client, &fsid) < 0)
                return;
-       }
 
        down_write(&osdc->map_sem);
 
@@ -1129,19 +1127,26 @@ int ceph_osdc_init(struct ceph_osd_client *osdc, struct ceph_client *client)
        osdc->num_requests = 0;
        INIT_DELAYED_WORK(&osdc->timeout_work, handle_timeout);
 
+       err = -ENOMEM;
        osdc->req_mempool = mempool_create_kmalloc_pool(10,
                                        sizeof(struct ceph_osd_request));
        if (!osdc->req_mempool)
-               return -ENOMEM;
+               goto out;
 
        err = ceph_msgpool_init(&osdc->msgpool_op, 4096, 10, true);
        if (err < 0)
-               return -ENOMEM;
+               goto out_mempool;
        err = ceph_msgpool_init(&osdc->msgpool_op_reply, 512, 0, false);
        if (err < 0)
-               return -ENOMEM;
-
+               goto out_msgpool;
        return 0;
+
+out_msgpool:
+       ceph_msgpool_destroy(&osdc->msgpool_op);
+out_mempool:
+       mempool_destroy(osdc->req_mempool);
+out:
+       return err;
 }
 
 void ceph_osdc_stop(struct ceph_osd_client *osdc)
@@ -1297,10 +1302,59 @@ static void put_osd_con(struct ceph_connection *con)
        put_osd(osd);
 }
 
+/*
+ * authentication
+ */
+static int get_authorizer(struct ceph_connection *con,
+                          void **buf, int *len, int *proto,
+                          void **reply_buf, int *reply_len, int force_new)
+{
+       struct ceph_osd *o = con->private;
+       struct ceph_osd_client *osdc = o->o_osdc;
+       struct ceph_auth_client *ac = osdc->client->monc.auth;
+       int ret = 0;
+
+       if (force_new && o->o_authorizer) {
+               ac->ops->destroy_authorizer(ac, o->o_authorizer);
+               o->o_authorizer = NULL;
+       }
+       if (o->o_authorizer == NULL) {
+               ret = ac->ops->create_authorizer(
+                       ac, CEPH_ENTITY_TYPE_OSD,
+                       &o->o_authorizer,
+                       &o->o_authorizer_buf,
+                       &o->o_authorizer_buf_len,
+                       &o->o_authorizer_reply_buf,
+                       &o->o_authorizer_reply_buf_len);
+               if (ret)
+               return ret;
+       }
+
+       *proto = ac->protocol;
+       *buf = o->o_authorizer_buf;
+       *len = o->o_authorizer_buf_len;
+       *reply_buf = o->o_authorizer_reply_buf;
+       *reply_len = o->o_authorizer_reply_buf_len;
+       return 0;
+}
+
+
+static int verify_authorizer_reply(struct ceph_connection *con, int len)
+{
+       struct ceph_osd *o = con->private;
+       struct ceph_osd_client *osdc = o->o_osdc;
+       struct ceph_auth_client *ac = osdc->client->monc.auth;
+
+       return ac->ops->verify_authorizer_reply(ac, o->o_authorizer, len);
+}
+
+
 const static struct ceph_connection_operations osd_con_ops = {
        .get = get_osd_con,
        .put = put_osd_con,
        .dispatch = dispatch,
+       .get_authorizer = get_authorizer,
+       .verify_authorizer_reply = verify_authorizer_reply,
        .alloc_msg = alloc_msg,
        .fault = osd_reset,
        .alloc_middle = ceph_alloc_middle,