ceph: send cap releases more aggressively

When pending cap releases fill up one message, start a work to send
cap release message. (old way is sending cap releases every 5 seconds)

Signed-off-by: "Yan, Zheng" <zyan@redhat.com>
Reviewed-by: Jeff Layton <jlayton@redhat.com>
Signed-off-by: Ilya Dryomov <idryomov@gmail.com>
diff --git a/fs/ceph/mds_client.c b/fs/ceph/mds_client.c
index ddfb6a4..c9d45613 100644
--- a/fs/ceph/mds_client.c
+++ b/fs/ceph/mds_client.c
@@ -57,6 +57,7 @@
 
 static void __wake_requests(struct ceph_mds_client *mdsc,
 			    struct list_head *head);
+static void ceph_cap_release_work(struct work_struct *work);
 
 static const struct ceph_connection_operations mds_con_ops;
 
@@ -636,6 +637,8 @@
 	s->s_cap_reconnect = 0;
 	s->s_cap_iterator = NULL;
 	INIT_LIST_HEAD(&s->s_cap_releases);
+	INIT_WORK(&s->s_cap_release_work, ceph_cap_release_work);
+
 	INIT_LIST_HEAD(&s->s_cap_flushing);
 
 	mdsc->sessions[mds] = s;
@@ -661,6 +664,7 @@
 	dout("__unregister_session mds%d %p\n", s->s_mds, s);
 	BUG_ON(mdsc->sessions[s->s_mds] != s);
 	mdsc->sessions[s->s_mds] = NULL;
+	s->s_state = 0;
 	ceph_con_close(&s->s_con);
 	ceph_put_mds_session(s);
 	atomic_dec(&mdsc->num_sessions);
@@ -1323,13 +1327,10 @@
 			cap->session = NULL;
 			list_del_init(&cap->session_caps);
 			session->s_nr_caps--;
-			if (cap->queue_release) {
-				list_add_tail(&cap->session_caps,
-					      &session->s_cap_releases);
-				session->s_num_cap_releases++;
-			} else {
+			if (cap->queue_release)
+				__ceph_queue_cap_release(session, cap);
+			else
 				old_cap = cap;  /* put_cap it w/o locks held */
-			}
 		}
 		if (ret < 0)
 			goto out;
@@ -1764,7 +1765,7 @@
 		session->s_trim_caps = 0;
 	}
 
-	ceph_send_cap_releases(mdsc, session);
+	ceph_flush_cap_releases(mdsc, session);
 	return 0;
 }
 
@@ -1807,8 +1808,8 @@
 /*
  * called under s_mutex
  */
-void ceph_send_cap_releases(struct ceph_mds_client *mdsc,
-			    struct ceph_mds_session *session)
+static void ceph_send_cap_releases(struct ceph_mds_client *mdsc,
+				   struct ceph_mds_session *session)
 {
 	struct ceph_msg *msg = NULL;
 	struct ceph_mds_cap_release *head;
@@ -1900,6 +1901,48 @@
 	spin_unlock(&session->s_cap_lock);
 }
 
+static void ceph_cap_release_work(struct work_struct *work)
+{
+	struct ceph_mds_session *session =
+		container_of(work, struct ceph_mds_session, s_cap_release_work);
+
+	mutex_lock(&session->s_mutex);
+	if (session->s_state == CEPH_MDS_SESSION_OPEN ||
+	    session->s_state == CEPH_MDS_SESSION_HUNG)
+		ceph_send_cap_releases(session->s_mdsc, session);
+	mutex_unlock(&session->s_mutex);
+	ceph_put_mds_session(session);
+}
+
+void ceph_flush_cap_releases(struct ceph_mds_client *mdsc,
+		             struct ceph_mds_session *session)
+{
+	if (mdsc->stopping)
+		return;
+
+	get_session(session);
+	if (queue_work(mdsc->fsc->cap_wq,
+		       &session->s_cap_release_work)) {
+		dout("cap release work queued\n");
+	} else {
+		ceph_put_mds_session(session);
+		dout("failed to queue cap release work\n");
+	}
+}
+
+/*
+ * caller holds session->s_cap_lock
+ */
+void __ceph_queue_cap_release(struct ceph_mds_session *session,
+			      struct ceph_cap *cap)
+{
+	list_add_tail(&cap->session_caps, &session->s_cap_releases);
+	session->s_num_cap_releases++;
+
+	if (!(session->s_num_cap_releases % CEPH_CAPS_PER_RELEASE))
+		ceph_flush_cap_releases(session->s_mdsc, session);
+}
+
 /*
  * requests
  */