Merge branch 'upstream-linus' of git://git.kernel.org/pub/scm/linux/kernel/git/mfashe...
authorLinus Torvalds <torvalds@woody.osdl.org>
Tue, 12 Dec 2006 18:21:01 +0000 (10:21 -0800)
committerLinus Torvalds <torvalds@woody.osdl.org>
Tue, 12 Dec 2006 18:21:01 +0000 (10:21 -0800)
* 'upstream-linus' of git://git.kernel.org/pub/scm/linux/kernel/git/mfasheh/ocfs2:
  [patch 3/3] OCFS2 Configurable timeouts - Protocol changes
  [patch 2/3] OCFS2 Configurable timeouts
  [patch 1/3] OCFS2 - Expose struct o2nm_cluster
  ocfs2: Synchronize feature incompat flags in ocfs2_fs.h
  ocfs2: update mount option documentation
  ocfs2: local mounts

17 files changed:
Documentation/filesystems/ocfs2.txt
fs/ocfs2/cluster/nodemanager.c
fs/ocfs2/cluster/nodemanager.h
fs/ocfs2/cluster/tcp.c
fs/ocfs2/cluster/tcp.h
fs/ocfs2/cluster/tcp_internal.h
fs/ocfs2/dlmglue.c
fs/ocfs2/heartbeat.c
fs/ocfs2/inode.c
fs/ocfs2/journal.c
fs/ocfs2/journal.h
fs/ocfs2/mmap.c
fs/ocfs2/namei.c
fs/ocfs2/ocfs2.h
fs/ocfs2/ocfs2_fs.h
fs/ocfs2/super.c
fs/ocfs2/vote.c

index af6defd10cb604fa4f0468ba6522ca30e57adda5..8ccf0c1b58ed0df9430d1d67b6f62f4c64564218 100644 (file)
@@ -54,3 +54,6 @@ errors=panic          Panic and halt the machine if an error occurs.
 intr           (*)     Allow signals to interrupt cluster operations.
 nointr                 Do not allow signals to interrupt cluster
                        operations.
+atime_quantum=60(*)    OCFS2 will not update atime unless this number
+                       of seconds has passed since the last update.
+                       Set to zero to always update atime.
index d11753c50bc1452822befe923c149f8e79202c0c..357f1d551771201f4b9587186fc01a97809ad06b 100644 (file)
@@ -35,7 +35,7 @@
 /* for now we operate under the assertion that there can be only one
  * cluster active at a time.  Changing this will require trickling
  * cluster references throughout where nodes are looked up */
-static struct o2nm_cluster *o2nm_single_cluster = NULL;
+struct o2nm_cluster *o2nm_single_cluster = NULL;
 
 #define OCFS2_MAX_HB_CTL_PATH 256
 static char ocfs2_hb_ctl_path[OCFS2_MAX_HB_CTL_PATH] = "/sbin/ocfs2_hb_ctl";
@@ -97,17 +97,6 @@ const char *o2nm_get_hb_ctl_path(void)
 }
 EXPORT_SYMBOL_GPL(o2nm_get_hb_ctl_path);
 
-struct o2nm_cluster {
-       struct config_group     cl_group;
-       unsigned                cl_has_local:1;
-       u8                      cl_local_node;
-       rwlock_t                cl_nodes_lock;
-       struct o2nm_node        *cl_nodes[O2NM_MAX_NODES];
-       struct rb_root          cl_node_ip_tree;
-       /* this bitmap is part of a hack for disk bitmap.. will go eventually. - zab */
-       unsigned long   cl_nodes_bitmap[BITS_TO_LONGS(O2NM_MAX_NODES)];
-};
-
 struct o2nm_node *o2nm_get_node_by_num(u8 node_num)
 {
        struct o2nm_node *node = NULL;
@@ -543,6 +532,179 @@ static struct o2nm_node_group *to_o2nm_node_group(struct config_group *group)
 }
 #endif
 
+struct o2nm_cluster_attribute {
+       struct configfs_attribute attr;
+       ssize_t (*show)(struct o2nm_cluster *, char *);
+       ssize_t (*store)(struct o2nm_cluster *, const char *, size_t);
+};
+
+static ssize_t o2nm_cluster_attr_write(const char *page, ssize_t count,
+                                       unsigned int *val)
+{
+       unsigned long tmp;
+       char *p = (char *)page;
+
+       tmp = simple_strtoul(p, &p, 0);
+       if (!p || (*p && (*p != '\n')))
+               return -EINVAL;
+
+       if (tmp == 0)
+               return -EINVAL;
+       if (tmp >= (u32)-1)
+               return -ERANGE;
+
+       *val = tmp;
+
+       return count;
+}
+
+static ssize_t o2nm_cluster_attr_idle_timeout_ms_read(
+       struct o2nm_cluster *cluster, char *page)
+{
+       return sprintf(page, "%u\n", cluster->cl_idle_timeout_ms);
+}
+
+static ssize_t o2nm_cluster_attr_idle_timeout_ms_write(
+       struct o2nm_cluster *cluster, const char *page, size_t count)
+{
+       ssize_t ret;
+       unsigned int val;
+
+       ret =  o2nm_cluster_attr_write(page, count, &val);
+
+       if (ret > 0) {
+               if (cluster->cl_idle_timeout_ms != val
+                       && o2net_num_connected_peers()) {
+                       mlog(ML_NOTICE,
+                            "o2net: cannot change idle timeout after "
+                            "the first peer has agreed to it."
+                            "  %d connected peers\n",
+                            o2net_num_connected_peers());
+                       ret = -EINVAL;
+               } else if (val <= cluster->cl_keepalive_delay_ms) {
+                       mlog(ML_NOTICE, "o2net: idle timeout must be larger "
+                            "than keepalive delay\n");
+                       ret = -EINVAL;
+               } else {
+                       cluster->cl_idle_timeout_ms = val;
+               }
+       }
+
+       return ret;
+}
+
+static ssize_t o2nm_cluster_attr_keepalive_delay_ms_read(
+       struct o2nm_cluster *cluster, char *page)
+{
+       return sprintf(page, "%u\n", cluster->cl_keepalive_delay_ms);
+}
+
+static ssize_t o2nm_cluster_attr_keepalive_delay_ms_write(
+       struct o2nm_cluster *cluster, const char *page, size_t count)
+{
+       ssize_t ret;
+       unsigned int val;
+
+       ret =  o2nm_cluster_attr_write(page, count, &val);
+
+       if (ret > 0) {
+               if (cluster->cl_keepalive_delay_ms != val
+                   && o2net_num_connected_peers()) {
+                       mlog(ML_NOTICE,
+                            "o2net: cannot change keepalive delay after"
+                            " the first peer has agreed to it."
+                            "  %d connected peers\n",
+                            o2net_num_connected_peers());
+                       ret = -EINVAL;
+               } else if (val >= cluster->cl_idle_timeout_ms) {
+                       mlog(ML_NOTICE, "o2net: keepalive delay must be "
+                            "smaller than idle timeout\n");
+                       ret = -EINVAL;
+               } else {
+                       cluster->cl_keepalive_delay_ms = val;
+               }
+       }
+
+       return ret;
+}
+
+static ssize_t o2nm_cluster_attr_reconnect_delay_ms_read(
+       struct o2nm_cluster *cluster, char *page)
+{
+       return sprintf(page, "%u\n", cluster->cl_reconnect_delay_ms);
+}
+
+static ssize_t o2nm_cluster_attr_reconnect_delay_ms_write(
+       struct o2nm_cluster *cluster, const char *page, size_t count)
+{
+       return o2nm_cluster_attr_write(page, count,
+                                      &cluster->cl_reconnect_delay_ms);
+}
+static struct o2nm_cluster_attribute o2nm_cluster_attr_idle_timeout_ms = {
+       .attr   = { .ca_owner = THIS_MODULE,
+                   .ca_name = "idle_timeout_ms",
+                   .ca_mode = S_IRUGO | S_IWUSR },
+       .show   = o2nm_cluster_attr_idle_timeout_ms_read,
+       .store  = o2nm_cluster_attr_idle_timeout_ms_write,
+};
+
+static struct o2nm_cluster_attribute o2nm_cluster_attr_keepalive_delay_ms = {
+       .attr   = { .ca_owner = THIS_MODULE,
+                   .ca_name = "keepalive_delay_ms",
+                   .ca_mode = S_IRUGO | S_IWUSR },
+       .show   = o2nm_cluster_attr_keepalive_delay_ms_read,
+       .store  = o2nm_cluster_attr_keepalive_delay_ms_write,
+};
+
+static struct o2nm_cluster_attribute o2nm_cluster_attr_reconnect_delay_ms = {
+       .attr   = { .ca_owner = THIS_MODULE,
+                   .ca_name = "reconnect_delay_ms",
+                   .ca_mode = S_IRUGO | S_IWUSR },
+       .show   = o2nm_cluster_attr_reconnect_delay_ms_read,
+       .store  = o2nm_cluster_attr_reconnect_delay_ms_write,
+};
+
+static struct configfs_attribute *o2nm_cluster_attrs[] = {
+       &o2nm_cluster_attr_idle_timeout_ms.attr,
+       &o2nm_cluster_attr_keepalive_delay_ms.attr,
+       &o2nm_cluster_attr_reconnect_delay_ms.attr,
+       NULL,
+};
+static ssize_t o2nm_cluster_show(struct config_item *item,
+                                 struct configfs_attribute *attr,
+                                 char *page)
+{
+       struct o2nm_cluster *cluster = to_o2nm_cluster(item);
+       struct o2nm_cluster_attribute *o2nm_cluster_attr =
+               container_of(attr, struct o2nm_cluster_attribute, attr);
+       ssize_t ret = 0;
+
+       if (o2nm_cluster_attr->show)
+               ret = o2nm_cluster_attr->show(cluster, page);
+       return ret;
+}
+
+static ssize_t o2nm_cluster_store(struct config_item *item,
+                                  struct configfs_attribute *attr,
+                                  const char *page, size_t count)
+{
+       struct o2nm_cluster *cluster = to_o2nm_cluster(item);
+       struct o2nm_cluster_attribute *o2nm_cluster_attr =
+               container_of(attr, struct o2nm_cluster_attribute, attr);
+       ssize_t ret;
+
+       if (o2nm_cluster_attr->store == NULL) {
+               ret = -EINVAL;
+               goto out;
+       }
+
+       ret = o2nm_cluster_attr->store(cluster, page, count);
+       if (ret < count)
+               goto out;
+out:
+       return ret;
+}
+
 static struct config_item *o2nm_node_group_make_item(struct config_group *group,
                                                     const char *name)
 {
@@ -624,10 +786,13 @@ static void o2nm_cluster_release(struct config_item *item)
 
 static struct configfs_item_operations o2nm_cluster_item_ops = {
        .release        = o2nm_cluster_release,
+       .show_attribute         = o2nm_cluster_show,
+       .store_attribute        = o2nm_cluster_store,
 };
 
 static struct config_item_type o2nm_cluster_type = {
        .ct_item_ops    = &o2nm_cluster_item_ops,
+       .ct_attrs       = o2nm_cluster_attrs,
        .ct_owner       = THIS_MODULE,
 };
 
@@ -678,6 +843,9 @@ static struct config_group *o2nm_cluster_group_make_group(struct config_group *g
        cluster->cl_group.default_groups[2] = NULL;
        rwlock_init(&cluster->cl_nodes_lock);
        cluster->cl_node_ip_tree = RB_ROOT;
+       cluster->cl_reconnect_delay_ms = O2NET_RECONNECT_DELAY_MS_DEFAULT;
+       cluster->cl_idle_timeout_ms    = O2NET_IDLE_TIMEOUT_MS_DEFAULT;
+       cluster->cl_keepalive_delay_ms = O2NET_KEEPALIVE_DELAY_MS_DEFAULT;
 
        ret = &cluster->cl_group;
        o2nm_single_cluster = cluster;
index fce8033c310fb00acf55178c530692ee49de5527..8fb23cacc2f5f15308b7288321a503eb5322fa0d 100644 (file)
@@ -53,6 +53,23 @@ struct o2nm_node {
        unsigned long           nd_set_attributes;
 };
 
+struct o2nm_cluster {
+       struct config_group     cl_group;
+       unsigned                cl_has_local:1;
+       u8                      cl_local_node;
+       rwlock_t                cl_nodes_lock;
+       struct o2nm_node        *cl_nodes[O2NM_MAX_NODES];
+       struct rb_root          cl_node_ip_tree;
+       unsigned int            cl_idle_timeout_ms;
+       unsigned int            cl_keepalive_delay_ms;
+       unsigned int            cl_reconnect_delay_ms;
+
+       /* this bitmap is part of a hack for disk bitmap.. will go eventually. - zab */
+       unsigned long   cl_nodes_bitmap[BITS_TO_LONGS(O2NM_MAX_NODES)];
+};
+
+extern struct o2nm_cluster *o2nm_single_cluster;
+
 u8 o2nm_this_node(void);
 
 int o2nm_configured_node_map(unsigned long *map, unsigned bytes);
index 9b3209dc0b16a147e8b81f0e1eddbfcb2714e27e..457753df1ae76719933463a268e7fd44a7dd9fc1 100644 (file)
@@ -147,6 +147,28 @@ static void o2net_listen_data_ready(struct sock *sk, int bytes);
 static void o2net_sc_send_keep_req(struct work_struct *work);
 static void o2net_idle_timer(unsigned long data);
 static void o2net_sc_postpone_idle(struct o2net_sock_container *sc);
+static void o2net_sc_reset_idle_timer(struct o2net_sock_container *sc);
+
+/*
+ * FIXME: These should use to_o2nm_cluster_from_node(), but we end up
+ * losing our parent link to the cluster during shutdown. This can be
+ * solved by adding a pre-removal callback to configfs, or passing
+ * around the cluster with the node. -jeffm
+ */
+static inline int o2net_reconnect_delay(struct o2nm_node *node)
+{
+       return o2nm_single_cluster->cl_reconnect_delay_ms;
+}
+
+static inline int o2net_keepalive_delay(struct o2nm_node *node)
+{
+       return o2nm_single_cluster->cl_keepalive_delay_ms;
+}
+
+static inline int o2net_idle_timeout(struct o2nm_node *node)
+{
+       return o2nm_single_cluster->cl_idle_timeout_ms;
+}
 
 static inline int o2net_sys_err_to_errno(enum o2net_system_error err)
 {
@@ -271,6 +293,8 @@ static void sc_kref_release(struct kref *kref)
 {
        struct o2net_sock_container *sc = container_of(kref,
                                        struct o2net_sock_container, sc_kref);
+       BUG_ON(timer_pending(&sc->sc_idle_timeout));
+
        sclog(sc, "releasing\n");
 
        if (sc->sc_sock) {
@@ -356,6 +380,13 @@ static void o2net_sc_cancel_delayed_work(struct o2net_sock_container *sc,
                sc_put(sc);
 }
 
+static atomic_t o2net_connected_peers = ATOMIC_INIT(0);
+
+int o2net_num_connected_peers(void)
+{
+       return atomic_read(&o2net_connected_peers);
+}
+
 static void o2net_set_nn_state(struct o2net_node *nn,
                               struct o2net_sock_container *sc,
                               unsigned valid, int err)
@@ -366,6 +397,11 @@ static void o2net_set_nn_state(struct o2net_node *nn,
 
        assert_spin_locked(&nn->nn_lock);
 
+       if (old_sc && !sc)
+               atomic_dec(&o2net_connected_peers);
+       else if (!old_sc && sc)
+               atomic_inc(&o2net_connected_peers);
+
        /* the node num comparison and single connect/accept path should stop
         * an non-null sc from being overwritten with another */
        BUG_ON(sc && nn->nn_sc && nn->nn_sc != sc);
@@ -424,9 +460,9 @@ static void o2net_set_nn_state(struct o2net_node *nn,
                /* delay if we're withing a RECONNECT_DELAY of the
                 * last attempt */
                delay = (nn->nn_last_connect_attempt +
-                        msecs_to_jiffies(O2NET_RECONNECT_DELAY_MS))
+                        msecs_to_jiffies(o2net_reconnect_delay(sc->sc_node)))
                        - jiffies;
-               if (delay > msecs_to_jiffies(O2NET_RECONNECT_DELAY_MS))
+               if (delay > msecs_to_jiffies(o2net_reconnect_delay(sc->sc_node)))
                        delay = 0;
                mlog(ML_CONN, "queueing conn attempt in %lu jiffies\n", delay);
                queue_delayed_work(o2net_wq, &nn->nn_connect_work, delay);
@@ -1099,13 +1135,51 @@ static int o2net_check_handshake(struct o2net_sock_container *sc)
                return -1;
        }
 
+       /*
+        * Ensure timeouts are consistent with other nodes, otherwise
+        * we can end up with one node thinking that the other must be down,
+        * but isn't. This can ultimately cause corruption.
+        */
+       if (be32_to_cpu(hand->o2net_idle_timeout_ms) !=
+                               o2net_idle_timeout(sc->sc_node)) {
+               mlog(ML_NOTICE, SC_NODEF_FMT " uses a network idle timeout of "
+                    "%u ms, but we use %u ms locally.  disconnecting\n",
+                    SC_NODEF_ARGS(sc),
+                    be32_to_cpu(hand->o2net_idle_timeout_ms),
+                    o2net_idle_timeout(sc->sc_node));
+               o2net_ensure_shutdown(nn, sc, -ENOTCONN);
+               return -1;
+       }
+
+       if (be32_to_cpu(hand->o2net_keepalive_delay_ms) !=
+                       o2net_keepalive_delay(sc->sc_node)) {
+               mlog(ML_NOTICE, SC_NODEF_FMT " uses a keepalive delay of "
+                    "%u ms, but we use %u ms locally.  disconnecting\n",
+                    SC_NODEF_ARGS(sc),
+                    be32_to_cpu(hand->o2net_keepalive_delay_ms),
+                    o2net_keepalive_delay(sc->sc_node));
+               o2net_ensure_shutdown(nn, sc, -ENOTCONN);
+               return -1;
+       }
+
+       if (be32_to_cpu(hand->o2hb_heartbeat_timeout_ms) !=
+                       O2HB_MAX_WRITE_TIMEOUT_MS) {
+               mlog(ML_NOTICE, SC_NODEF_FMT " uses a heartbeat timeout of "
+                    "%u ms, but we use %u ms locally.  disconnecting\n",
+                    SC_NODEF_ARGS(sc),
+                    be32_to_cpu(hand->o2hb_heartbeat_timeout_ms),
+                    O2HB_MAX_WRITE_TIMEOUT_MS);
+               o2net_ensure_shutdown(nn, sc, -ENOTCONN);
+               return -1;
+       }
+
        sc->sc_handshake_ok = 1;
 
        spin_lock(&nn->nn_lock);
        /* set valid and queue the idle timers only if it hasn't been
         * shut down already */
        if (nn->nn_sc == sc) {
-               o2net_sc_postpone_idle(sc);
+               o2net_sc_reset_idle_timer(sc);
                o2net_set_nn_state(nn, sc, 1, 0);
        }
        spin_unlock(&nn->nn_lock);
@@ -1131,6 +1205,23 @@ static int o2net_advance_rx(struct o2net_sock_container *sc)
        sclog(sc, "receiving\n");
        do_gettimeofday(&sc->sc_tv_advance_start);
 
+       if (unlikely(sc->sc_handshake_ok == 0)) {
+               if(sc->sc_page_off < sizeof(struct o2net_handshake)) {
+                       data = page_address(sc->sc_page) + sc->sc_page_off;
+                       datalen = sizeof(struct o2net_handshake) - sc->sc_page_off;
+                       ret = o2net_recv_tcp_msg(sc->sc_sock, data, datalen);
+                       if (ret > 0)
+                               sc->sc_page_off += ret;
+               }
+
+               if (sc->sc_page_off == sizeof(struct o2net_handshake)) {
+                       o2net_check_handshake(sc);
+                       if (unlikely(sc->sc_handshake_ok == 0))
+                               ret = -EPROTO;
+               }
+               goto out;
+       }
+
        /* do we need more header? */
        if (sc->sc_page_off < sizeof(struct o2net_msg)) {
                data = page_address(sc->sc_page) + sc->sc_page_off;
@@ -1138,15 +1229,6 @@ static int o2net_advance_rx(struct o2net_sock_container *sc)
                ret = o2net_recv_tcp_msg(sc->sc_sock, data, datalen);
                if (ret > 0) {
                        sc->sc_page_off += ret;
-
-                       /* this working relies on the handshake being
-                        * smaller than the normal message header */
-                       if (sc->sc_page_off >= sizeof(struct o2net_handshake)&&
-                           !sc->sc_handshake_ok && o2net_check_handshake(sc)) {
-                               ret = -EPROTO;
-                               goto out;
-                       }
-
                        /* only swab incoming here.. we can
                         * only get here once as we cross from
                         * being under to over */
@@ -1248,6 +1330,18 @@ static int o2net_set_nodelay(struct socket *sock)
        return ret;
 }
 
+static void o2net_initialize_handshake(void)
+{
+       o2net_hand->o2hb_heartbeat_timeout_ms = cpu_to_be32(
+               O2HB_MAX_WRITE_TIMEOUT_MS);
+       o2net_hand->o2net_idle_timeout_ms = cpu_to_be32(
+               o2net_idle_timeout(NULL));
+       o2net_hand->o2net_keepalive_delay_ms = cpu_to_be32(
+               o2net_keepalive_delay(NULL));
+       o2net_hand->o2net_reconnect_delay_ms = cpu_to_be32(
+               o2net_reconnect_delay(NULL));
+}
+
 /* ------------------------------------------------------------ */
 
 /* called when a connect completes and after a sock is accepted.  the
@@ -1262,6 +1356,7 @@ static void o2net_sc_connect_completed(struct work_struct *work)
               (unsigned long long)O2NET_PROTOCOL_VERSION,
              (unsigned long long)be64_to_cpu(o2net_hand->connector_id));
 
+       o2net_initialize_handshake();
        o2net_sendpage(sc, o2net_hand, sizeof(*o2net_hand));
        sc_put(sc);
 }
@@ -1287,8 +1382,10 @@ static void o2net_idle_timer(unsigned long data)
 
        do_gettimeofday(&now);
 
-       printk(KERN_INFO "o2net: connection to " SC_NODEF_FMT " has been idle for 10 "
-            "seconds, shutting it down.\n", SC_NODEF_ARGS(sc));
+       printk(KERN_INFO "o2net: connection to " SC_NODEF_FMT " has been idle for %u.%u "
+            "seconds, shutting it down.\n", SC_NODEF_ARGS(sc),
+                    o2net_idle_timeout(sc->sc_node) / 1000,
+                    o2net_idle_timeout(sc->sc_node) % 1000);
        mlog(ML_NOTICE, "here are some times that might help debug the "
             "situation: (tmr %ld.%ld now %ld.%ld dr %ld.%ld adv "
             "%ld.%ld:%ld.%ld func (%08x:%u) %ld.%ld:%ld.%ld)\n",
@@ -1306,14 +1403,21 @@ static void o2net_idle_timer(unsigned long data)
        o2net_sc_queue_work(sc, &sc->sc_shutdown_work);
 }
 
-static void o2net_sc_postpone_idle(struct o2net_sock_container *sc)
+static void o2net_sc_reset_idle_timer(struct o2net_sock_container *sc)
 {
        o2net_sc_cancel_delayed_work(sc, &sc->sc_keepalive_work);
        o2net_sc_queue_delayed_work(sc, &sc->sc_keepalive_work,
-                                   O2NET_KEEPALIVE_DELAY_SECS * HZ);
+                     msecs_to_jiffies(o2net_keepalive_delay(sc->sc_node)));
        do_gettimeofday(&sc->sc_tv_timer);
        mod_timer(&sc->sc_idle_timeout,
-                 jiffies + (O2NET_IDLE_TIMEOUT_SECS * HZ));
+              jiffies + msecs_to_jiffies(o2net_idle_timeout(sc->sc_node)));
+}
+
+static void o2net_sc_postpone_idle(struct o2net_sock_container *sc)
+{
+       /* Only push out an existing timer */
+       if (timer_pending(&sc->sc_idle_timeout))
+               o2net_sc_reset_idle_timer(sc);
 }
 
 /* this work func is kicked whenever a path sets the nn state which doesn't
@@ -1435,9 +1539,12 @@ static void o2net_connect_expired(struct work_struct *work)
 
        spin_lock(&nn->nn_lock);
        if (!nn->nn_sc_valid) {
+               struct o2nm_node *node = nn->nn_sc->sc_node;
                mlog(ML_ERROR, "no connection established with node %u after "
-                    "%u seconds, giving up and returning errors.\n",
-                    o2net_num_from_nn(nn), O2NET_IDLE_TIMEOUT_SECS);
+                    "%u.%u seconds, giving up and returning errors.\n",
+                    o2net_num_from_nn(nn),
+                    o2net_idle_timeout(node) / 1000,
+                    o2net_idle_timeout(node) % 1000);
 
                o2net_set_nn_state(nn, NULL, 0, -ENOTCONN);
        }
@@ -1478,6 +1585,8 @@ static void o2net_hb_node_down_cb(struct o2nm_node *node, int node_num,
 
        if (node_num != o2nm_this_node())
                o2net_disconnect_node(node);
+
+       BUG_ON(atomic_read(&o2net_connected_peers) < 0);
 }
 
 static void o2net_hb_node_up_cb(struct o2nm_node *node, int node_num,
@@ -1489,14 +1598,14 @@ static void o2net_hb_node_up_cb(struct o2nm_node *node, int node_num,
 
        /* ensure an immediate connect attempt */
        nn->nn_last_connect_attempt = jiffies -
-               (msecs_to_jiffies(O2NET_RECONNECT_DELAY_MS) + 1);
+               (msecs_to_jiffies(o2net_reconnect_delay(node)) + 1);
 
        if (node_num != o2nm_this_node()) {
                /* heartbeat doesn't work unless a local node number is
                 * configured and doing so brings up the o2net_wq, so we can
                 * use it.. */
                queue_delayed_work(o2net_wq, &nn->nn_connect_expired,
-                                  O2NET_IDLE_TIMEOUT_SECS * HZ);
+                                  msecs_to_jiffies(o2net_idle_timeout(node)));
 
                /* believe it or not, accept and node hearbeating testing
                 * can succeed for this node before we got here.. so
@@ -1641,6 +1750,7 @@ static int o2net_accept_one(struct socket *sock)
        o2net_register_callbacks(sc->sc_sock->sk, sc);
        o2net_sc_queue_work(sc, &sc->sc_rx_work);
 
+       o2net_initialize_handshake();
        o2net_sendpage(sc, o2net_hand, sizeof(*o2net_hand));
 
 out:
index 616ff2b8434ad2fb33c77f702c08789ef031ac01..21a4e43df836159c177771974ffc215957d26bf3 100644 (file)
@@ -54,6 +54,13 @@ typedef int (o2net_msg_handler_func)(struct o2net_msg *msg, u32 len, void *data)
 
 #define O2NET_MAX_PAYLOAD_BYTES  (4096 - sizeof(struct o2net_msg))
 
+/* same as hb delay, we're waiting for another node to recognize our hb */
+#define O2NET_RECONNECT_DELAY_MS_DEFAULT       2000
+
+#define O2NET_KEEPALIVE_DELAY_MS_DEFAULT       5000
+#define O2NET_IDLE_TIMEOUT_MS_DEFAULT          10000
+
+
 /* TODO: figure this out.... */
 static inline int o2net_link_down(int err, struct socket *sock)
 {
@@ -101,6 +108,7 @@ void o2net_unregister_hb_callbacks(void);
 int o2net_start_listening(struct o2nm_node *node);
 void o2net_stop_listening(struct o2nm_node *node);
 void o2net_disconnect_node(struct o2nm_node *node);
+int o2net_num_connected_peers(void);
 
 int o2net_init(void);
 void o2net_exit(void);
index daebbd3a2c8ceb630404566223e2189d96bb0ed6..b700dc9624d13774cd693ab3bde426829fc9e79a 100644 (file)
 #define O2NET_MSG_KEEP_REQ_MAGIC  ((u16)0xfa57)
 #define O2NET_MSG_KEEP_RESP_MAGIC ((u16)0xfa58)
 
-/* same as hb delay, we're waiting for another node to recognize our hb */
-#define O2NET_RECONNECT_DELAY_MS       O2HB_REGION_TIMEOUT_MS
-
 /* we're delaying our quorum decision so that heartbeat will have timed
  * out truly dead nodes by the time we come around to making decisions
  * on their number */
 #define O2NET_QUORUM_DELAY_MS  ((o2hb_dead_threshold + 2) * O2HB_REGION_TIMEOUT_MS)
 
-#define O2NET_KEEPALIVE_DELAY_SECS     5
-#define O2NET_IDLE_TIMEOUT_SECS                10
-
 /* 
  * This version number represents quite a lot, unfortunately.  It not
  * only represents the raw network message protocol on the wire but also
  * locking semantics of the file system using the protocol.  It should 
  * be somewhere else, I'm sure, but right now it isn't.
  *
+ * New in version 5:
+ *     - Network timeout checking protocol
+ *
  * New in version 4:
  *     - Remove i_generation from lock names for better stat performance.
  *
  *     - full 64 bit i_size in the metadata lock lvbs
  *     - introduction of "rw" lock and pushing meta/data locking down
  */
-#define O2NET_PROTOCOL_VERSION 4ULL
+#define O2NET_PROTOCOL_VERSION 5ULL
 struct o2net_handshake {
        __be64  protocol_version;
        __be64  connector_id;
+       __be32  o2hb_heartbeat_timeout_ms;
+       __be32  o2net_idle_timeout_ms;
+       __be32  o2net_keepalive_delay_ms;
+       __be32  o2net_reconnect_delay_ms;
 };
 
 struct o2net_node {
index 69fba16efbd1e08241ebe46551d12b506a442b15..e6220137bf691600be76ac97d86868347269d4d1 100644 (file)
@@ -770,7 +770,7 @@ static int ocfs2_lock_create(struct ocfs2_super *osb,
                             int dlm_flags)
 {
        int ret = 0;
-       enum dlm_status status;
+       enum dlm_status status = DLM_NORMAL;
        unsigned long flags;
 
        mlog_entry_void();
@@ -1138,6 +1138,7 @@ int ocfs2_rw_lock(struct inode *inode, int write)
 {
        int status, level;
        struct ocfs2_lock_res *lockres;
+       struct ocfs2_super *osb = OCFS2_SB(inode->i_sb);
 
        BUG_ON(!inode);
 
@@ -1147,6 +1148,9 @@ int ocfs2_rw_lock(struct inode *inode, int write)
             (unsigned long long)OCFS2_I(inode)->ip_blkno,
             write ? "EXMODE" : "PRMODE");
 
+       if (ocfs2_mount_local(osb))
+               return 0;
+
        lockres = &OCFS2_I(inode)->ip_rw_lockres;
 
        level = write ? LKM_EXMODE : LKM_PRMODE;
@@ -1164,6 +1168,7 @@ void ocfs2_rw_unlock(struct inode *inode, int write)
 {
        int level = write ? LKM_EXMODE : LKM_PRMODE;
        struct ocfs2_lock_res *lockres = &OCFS2_I(inode)->ip_rw_lockres;
+       struct ocfs2_super *osb = OCFS2_SB(inode->i_sb);
 
        mlog_entry_void();
 
@@ -1171,7 +1176,8 @@ void ocfs2_rw_unlock(struct inode *inode, int write)
             (unsigned long long)OCFS2_I(inode)->ip_blkno,
             write ? "EXMODE" : "PRMODE");
 
-       ocfs2_cluster_unlock(OCFS2_SB(inode->i_sb), lockres, level);
+       if (!ocfs2_mount_local(osb))
+               ocfs2_cluster_unlock(OCFS2_SB(inode->i_sb), lockres, level);
 
        mlog_exit_void();
 }
@@ -1182,6 +1188,7 @@ int ocfs2_data_lock_full(struct inode *inode,
 {
        int status = 0, level;
        struct ocfs2_lock_res *lockres;
+       struct ocfs2_super *osb = OCFS2_SB(inode->i_sb);
 
        BUG_ON(!inode);
 
@@ -1201,6 +1208,9 @@ int ocfs2_data_lock_full(struct inode *inode,
                goto out;
        }
 
+       if (ocfs2_mount_local(osb))
+               goto out;
+
        lockres = &OCFS2_I(inode)->ip_data_lockres;
 
        level = write ? LKM_EXMODE : LKM_PRMODE;
@@ -1269,6 +1279,7 @@ void ocfs2_data_unlock(struct inode *inode,
 {
        int level = write ? LKM_EXMODE : LKM_PRMODE;
        struct ocfs2_lock_res *lockres = &OCFS2_I(inode)->ip_data_lockres;
+       struct ocfs2_super *osb = OCFS2_SB(inode->i_sb);
 
        mlog_entry_void();
 
@@ -1276,7 +1287,8 @@ void ocfs2_data_unlock(struct inode *inode,
             (unsigned long long)OCFS2_I(inode)->ip_blkno,
             write ? "EXMODE" : "PRMODE");
 
-       if (!ocfs2_is_hard_readonly(OCFS2_SB(inode->i_sb)))
+       if (!ocfs2_is_hard_readonly(OCFS2_SB(inode->i_sb)) &&
+           !ocfs2_mount_local(osb))
                ocfs2_cluster_unlock(OCFS2_SB(inode->i_sb), lockres, level);
 
        mlog_exit_void();
@@ -1467,8 +1479,9 @@ static int ocfs2_meta_lock_update(struct inode *inode,
 {
        int status = 0;
        struct ocfs2_inode_info *oi = OCFS2_I(inode);
-       struct ocfs2_lock_res *lockres;
+       struct ocfs2_lock_res *lockres = NULL;
        struct ocfs2_dinode *fe;
+       struct ocfs2_super *osb = OCFS2_SB(inode->i_sb);
 
        mlog_entry_void();
 
@@ -1483,10 +1496,12 @@ static int ocfs2_meta_lock_update(struct inode *inode,
        }
        spin_unlock(&oi->ip_lock);
 
-       lockres = &oi->ip_meta_lockres;
+       if (!ocfs2_mount_local(osb)) {
+               lockres = &oi->ip_meta_lockres;
 
-       if (!ocfs2_should_refresh_lock_res(lockres))
-               goto bail;
+               if (!ocfs2_should_refresh_lock_res(lockres))
+                       goto bail;
+       }
 
        /* This will discard any caching information we might have had
         * for the inode metadata. */
@@ -1496,7 +1511,7 @@ static int ocfs2_meta_lock_update(struct inode *inode,
         * map (directories, bitmap files, etc) */
        ocfs2_extent_map_trunc(inode, 0);
 
-       if (ocfs2_meta_lvb_is_trustable(inode, lockres)) {
+       if (lockres && ocfs2_meta_lvb_is_trustable(inode, lockres)) {
                mlog(0, "Trusting LVB on inode %llu\n",
                     (unsigned long long)oi->ip_blkno);
                ocfs2_refresh_inode_from_lvb(inode);
@@ -1543,7 +1558,8 @@ static int ocfs2_meta_lock_update(struct inode *inode,
 
        status = 0;
 bail_refresh:
-       ocfs2_complete_lock_res_refresh(lockres, status);
+       if (lockres)
+               ocfs2_complete_lock_res_refresh(lockres, status);
 bail:
        mlog_exit(status);
        return status;
@@ -1585,7 +1601,7 @@ int ocfs2_meta_lock_full(struct inode *inode,
                         int arg_flags)
 {
        int status, level, dlm_flags, acquired;
-       struct ocfs2_lock_res *lockres;
+       struct ocfs2_lock_res *lockres = NULL;
        struct ocfs2_super *osb = OCFS2_SB(inode->i_sb);
        struct buffer_head *local_bh = NULL;
 
@@ -1607,6 +1623,9 @@ int ocfs2_meta_lock_full(struct inode *inode,
                goto bail;
        }
 
+       if (ocfs2_mount_local(osb))
+               goto local;
+
        if (!(arg_flags & OCFS2_META_LOCK_RECOVERY))
                wait_event(osb->recovery_event,
                           ocfs2_node_map_is_empty(osb, &osb->recovery_map));
@@ -1636,6 +1655,7 @@ int ocfs2_meta_lock_full(struct inode *inode,
                wait_event(osb->recovery_event,
                           ocfs2_node_map_is_empty(osb, &osb->recovery_map));
 
+local:
        /*
         * We only see this flag if we're being called from
         * ocfs2_read_locked_inode(). It means we're locking an inode
@@ -1644,7 +1664,8 @@ int ocfs2_meta_lock_full(struct inode *inode,
         */
        if (inode->i_state & I_NEW) {
                status = 0;
-               ocfs2_complete_lock_res_refresh(lockres, 0);
+               if (lockres)
+                       ocfs2_complete_lock_res_refresh(lockres, 0);
                goto bail;
        }
 
@@ -1767,6 +1788,7 @@ void ocfs2_meta_unlock(struct inode *inode,
 {
        int level = ex ? LKM_EXMODE : LKM_PRMODE;
        struct ocfs2_lock_res *lockres = &OCFS2_I(inode)->ip_meta_lockres;
+       struct ocfs2_super *osb = OCFS2_SB(inode->i_sb);
 
        mlog_entry_void();
 
@@ -1774,7 +1796,8 @@ void ocfs2_meta_unlock(struct inode *inode,
             (unsigned long long)OCFS2_I(inode)->ip_blkno,
             ex ? "EXMODE" : "PRMODE");
 
-       if (!ocfs2_is_hard_readonly(OCFS2_SB(inode->i_sb)))
+       if (!ocfs2_is_hard_readonly(OCFS2_SB(inode->i_sb)) &&
+           !ocfs2_mount_local(osb))
                ocfs2_cluster_unlock(OCFS2_SB(inode->i_sb), lockres, level);
 
        mlog_exit_void();
@@ -1783,7 +1806,7 @@ void ocfs2_meta_unlock(struct inode *inode,
 int ocfs2_super_lock(struct ocfs2_super *osb,
                     int ex)
 {
-       int status;
+       int status = 0;
        int level = ex ? LKM_EXMODE : LKM_PRMODE;
        struct ocfs2_lock_res *lockres = &osb->osb_super_lockres;
        struct buffer_head *bh;
@@ -1794,6 +1817,9 @@ int ocfs2_super_lock(struct ocfs2_super *osb,
        if (ocfs2_is_hard_readonly(osb))
                return -EROFS;
 
+       if (ocfs2_mount_local(osb))
+               goto bail;
+
        status = ocfs2_cluster_lock(osb, lockres, level, 0, 0);
        if (status < 0) {
                mlog_errno(status);
@@ -1832,7 +1858,8 @@ void ocfs2_super_unlock(struct ocfs2_super *osb,
        int level = ex ? LKM_EXMODE : LKM_PRMODE;
        struct ocfs2_lock_res *lockres = &osb->osb_super_lockres;
 
-       ocfs2_cluster_unlock(osb, lockres, level);
+       if (!ocfs2_mount_local(osb))
+               ocfs2_cluster_unlock(osb, lockres, level);
 }
 
 int ocfs2_rename_lock(struct ocfs2_super *osb)
@@ -1843,6 +1870,9 @@ int ocfs2_rename_lock(struct ocfs2_super *osb)
        if (ocfs2_is_hard_readonly(osb))
                return -EROFS;
 
+       if (ocfs2_mount_local(osb))
+               return 0;
+
        status = ocfs2_cluster_lock(osb, lockres, LKM_EXMODE, 0, 0);
        if (status < 0)
                mlog_errno(status);
@@ -1854,7 +1884,8 @@ void ocfs2_rename_unlock(struct ocfs2_super *osb)
 {
        struct ocfs2_lock_res *lockres = &osb->osb_rename_lockres;
 
-       ocfs2_cluster_unlock(osb, lockres, LKM_EXMODE);
+       if (!ocfs2_mount_local(osb))
+               ocfs2_cluster_unlock(osb, lockres, LKM_EXMODE);
 }
 
 int ocfs2_dentry_lock(struct dentry *dentry, int ex)
@@ -1869,6 +1900,9 @@ int ocfs2_dentry_lock(struct dentry *dentry, int ex)
        if (ocfs2_is_hard_readonly(osb))
                return -EROFS;
 
+       if (ocfs2_mount_local(osb))
+               return 0;
+
        ret = ocfs2_cluster_lock(osb, &dl->dl_lockres, level, 0, 0);
        if (ret < 0)
                mlog_errno(ret);
@@ -1882,7 +1916,8 @@ void ocfs2_dentry_unlock(struct dentry *dentry, int ex)
        struct ocfs2_dentry_lock *dl = dentry->d_fsdata;
        struct ocfs2_super *osb = OCFS2_SB(dentry->d_sb);
 
-       ocfs2_cluster_unlock(osb, &dl->dl_lockres, level);
+       if (!ocfs2_mount_local(osb))
+               ocfs2_cluster_unlock(osb, &dl->dl_lockres, level);
 }
 
 /* Reference counting of the dlm debug structure. We want this because
@@ -2145,12 +2180,15 @@ static void ocfs2_dlm_shutdown_debug(struct ocfs2_super *osb)
 
 int ocfs2_dlm_init(struct ocfs2_super *osb)
 {
-       int status;
+       int status = 0;
        u32 dlm_key;
-       struct dlm_ctxt *dlm;
+       struct dlm_ctxt *dlm = NULL;
 
        mlog_entry_void();
 
+       if (ocfs2_mount_local(osb))
+               goto local;
+
        status = ocfs2_dlm_init_debug(osb);
        if (status < 0) {
                mlog_errno(status);
@@ -2178,11 +2216,12 @@ int ocfs2_dlm_init(struct ocfs2_super *osb)
                goto bail;
        }
 
+       dlm_register_eviction_cb(dlm, &osb->osb_eviction_cb);
+
+local:
        ocfs2_super_lock_res_init(&osb->osb_super_lockres, osb);
        ocfs2_rename_lock_res_init(&osb->osb_rename_lockres, osb);
 
-       dlm_register_eviction_cb(dlm, &osb->osb_eviction_cb);
-
        osb->dlm = dlm;
 
        status = 0;
index cbfd45a97a636f445246fa9c885b6b9e330269ec..8fc52d6d0ce7827a75f40f40986f9ea8ef07b752 100644 (file)
@@ -154,6 +154,9 @@ int ocfs2_register_hb_callbacks(struct ocfs2_super *osb)
 {
        int status;
 
+       if (ocfs2_mount_local(osb))
+               return 0;
+
        status = o2hb_register_callback(&osb->osb_hb_down);
        if (status < 0) {
                mlog_errno(status);
@@ -172,6 +175,9 @@ void ocfs2_clear_hb_callbacks(struct ocfs2_super *osb)
 {
        int status;
 
+       if (ocfs2_mount_local(osb))
+               return;
+
        status = o2hb_unregister_callback(&osb->osb_hb_down);
        if (status < 0)
                mlog_errno(status);
@@ -186,6 +192,9 @@ void ocfs2_stop_heartbeat(struct ocfs2_super *osb)
        int ret;
        char *argv[5], *envp[3];
 
+       if (ocfs2_mount_local(osb))
+               return;
+
        if (!osb->uuid_str) {
                /* This can happen if we don't get far enough in mount... */
                mlog(0, "No UUID with which to stop heartbeat!\n\n");
index 42e361f3054f5114d8e81fe911ca2fb21c86601d..e4d91493d7d74a9efa29e8e55ead977cee06de70 100644 (file)
@@ -423,7 +423,8 @@ static int ocfs2_read_locked_inode(struct inode *inode,
         * cluster lock before trusting anything anyway.
         */
        can_lock = !(args->fi_flags & OCFS2_FI_FLAG_SYSFILE)
-               && !(args->fi_flags & OCFS2_FI_FLAG_NOLOCK);
+               && !(args->fi_flags & OCFS2_FI_FLAG_NOLOCK)
+               && !ocfs2_mount_local(osb);
 
        /*
         * To maintain backwards compatibility with older versions of
index 1d7f4ab1e5ede4b362a5e8f2bcd4523269c5a456..825cb0ae1b4c812862bbff0d8da5608497611eb7 100644 (file)
@@ -144,8 +144,10 @@ handle_t *ocfs2_start_trans(struct ocfs2_super *osb, int max_buffs)
                        ocfs2_abort(osb->sb, "Detected aborted journal");
                        handle = ERR_PTR(-EROFS);
                }
-       } else
-               atomic_inc(&(osb->journal->j_num_trans));
+       } else {
+               if (!ocfs2_mount_local(osb))
+                       atomic_inc(&(osb->journal->j_num_trans));
+       }
 
        return handle;
 }
@@ -507,9 +509,23 @@ void ocfs2_journal_shutdown(struct ocfs2_super *osb)
 
        BUG_ON(atomic_read(&(osb->journal->j_num_trans)) != 0);
 
-       status = ocfs2_journal_toggle_dirty(osb, 0);
-       if (status < 0)
-               mlog_errno(status);
+       if (ocfs2_mount_local(osb)) {
+               journal_lock_updates(journal->j_journal);
+               status = journal_flush(journal->j_journal);
+               journal_unlock_updates(journal->j_journal);
+               if (status < 0)
+                       mlog_errno(status);
+       }
+
+       if (status == 0) {
+               /*
+                * Do not toggle if flush was unsuccessful otherwise
+                * will leave dirty metadata in a "clean" journal
+                */
+               status = ocfs2_journal_toggle_dirty(osb, 0);
+               if (status < 0)
+                       mlog_errno(status);
+       }
 
        /* Shutdown the kernel journal system */
        journal_destroy(journal->j_journal);
@@ -549,7 +565,7 @@ static void ocfs2_clear_journal_error(struct super_block *sb,
        }
 }
 
-int ocfs2_journal_load(struct ocfs2_journal *journal)
+int ocfs2_journal_load(struct ocfs2_journal *journal, int local)
 {
        int status = 0;
        struct ocfs2_super *osb;
@@ -576,14 +592,18 @@ int ocfs2_journal_load(struct ocfs2_journal *journal)
        }
 
        /* Launch the commit thread */
-       osb->commit_task = kthread_run(ocfs2_commit_thread, osb, "ocfs2cmt");
-       if (IS_ERR(osb->commit_task)) {
-               status = PTR_ERR(osb->commit_task);
+       if (!local) {
+               osb->commit_task = kthread_run(ocfs2_commit_thread, osb,
+                                              "ocfs2cmt");
+               if (IS_ERR(osb->commit_task)) {
+                       status = PTR_ERR(osb->commit_task);
+                       osb->commit_task = NULL;
+                       mlog(ML_ERROR, "unable to launch ocfs2commit thread, "
+                            "error=%d", status);
+                       goto done;
+               }
+       } else
                osb->commit_task = NULL;
-               mlog(ML_ERROR, "unable to launch ocfs2commit thread, error=%d",
-                    status);
-               goto done;
-       }
 
 done:
        mlog_exit(status);
index 899112ad813679da5035903c57cc79b6c2a97aea..e1216364d191566ba80b276796afb965fe8ebb61 100644 (file)
@@ -157,7 +157,7 @@ int    ocfs2_journal_init(struct ocfs2_journal *journal,
 void   ocfs2_journal_shutdown(struct ocfs2_super *osb);
 int    ocfs2_journal_wipe(struct ocfs2_journal *journal,
                          int full);
-int    ocfs2_journal_load(struct ocfs2_journal *journal);
+int    ocfs2_journal_load(struct ocfs2_journal *journal, int local);
 int    ocfs2_check_journals_nolocks(struct ocfs2_super *osb);
 void   ocfs2_recovery_thread(struct ocfs2_super *osb,
                             int node_num);
@@ -174,6 +174,9 @@ static inline void ocfs2_checkpoint_inode(struct inode *inode)
 {
        struct ocfs2_super *osb = OCFS2_SB(inode->i_sb);
 
+       if (ocfs2_mount_local(osb))
+               return;
+
        if (!ocfs2_inode_fully_checkpointed(inode)) {
                /* WARNING: This only kicks off a single
                 * checkpoint. If someone races you and adds more
index 69f85ae392dcc9ab0a830a71c576dabda1f779b0..51b02044768360cf7c272bbd1f9cff051c2ad33b 100644 (file)
@@ -83,10 +83,12 @@ static struct vm_operations_struct ocfs2_file_vm_ops = {
 int ocfs2_mmap(struct file *file, struct vm_area_struct *vma)
 {
        int ret = 0, lock_level = 0;
+       struct ocfs2_super *osb = OCFS2_SB(file->f_dentry->d_inode->i_sb);
 
        /* We don't want to support shared writable mappings yet. */
-       if (((vma->vm_flags & VM_SHARED) || (vma->vm_flags & VM_MAYSHARE))
-           && ((vma->vm_flags & VM_WRITE) || (vma->vm_flags & VM_MAYWRITE))) {
+       if (!ocfs2_mount_local(osb) &&
+           ((vma->vm_flags & VM_SHARED) || (vma->vm_flags & VM_MAYSHARE)) &&
+           ((vma->vm_flags & VM_WRITE) || (vma->vm_flags & VM_MAYWRITE))) {
                mlog(0, "disallow shared writable mmaps %lx\n", vma->vm_flags);
                /* This is -EINVAL because generic_file_readonly_mmap
                 * returns it in a similar situation. */
index 21db45ddf144c2ebbfb96237b6d1c9a8970776ae..9637039c263355676716b72b98eaa9f147867dfb 100644 (file)
@@ -587,9 +587,11 @@ static int ocfs2_mknod_locked(struct ocfs2_super *osb,
        }
 
        ocfs2_inode_set_new(osb, inode);
-       status = ocfs2_create_new_inode_locks(inode);
-       if (status < 0)
-               mlog_errno(status);
+       if (!ocfs2_mount_local(osb)) {
+               status = ocfs2_create_new_inode_locks(inode);
+               if (status < 0)
+                       mlog_errno(status);
+       }
 
        status = 0; /* error in ocfs2_create_new_inode_locks is not
                     * critical */
index b767fd7da6ebaeb44ed58a888b80cb2dd99ce545..db8e77cd35d34e1653a9adc841f0ab422f9c994d 100644 (file)
@@ -349,6 +349,11 @@ static inline int ocfs2_is_soft_readonly(struct ocfs2_super *osb)
        return ret;
 }
 
+static inline int ocfs2_mount_local(struct ocfs2_super *osb)
+{
+       return (osb->s_feature_incompat & OCFS2_FEATURE_INCOMPAT_LOCAL_MOUNT);
+}
+
 #define OCFS2_IS_VALID_DINODE(ptr)                                     \
        (!strcmp((ptr)->i_signature, OCFS2_INODE_SIGNATURE))
 
index 3330a5dc6be2f9aa5e42b2a3709906bcdd5bba97..b5c68567077ecaab02bfd0dd903d24f12e13eb56 100644 (file)
@@ -86,7 +86,7 @@
        OCFS2_SB(sb)->s_feature_incompat &= ~(mask)
 
 #define OCFS2_FEATURE_COMPAT_SUPP      0
-#define OCFS2_FEATURE_INCOMPAT_SUPP    0
+#define OCFS2_FEATURE_INCOMPAT_SUPP    OCFS2_FEATURE_INCOMPAT_LOCAL_MOUNT
 #define OCFS2_FEATURE_RO_COMPAT_SUPP   0
 
 /*
  */
 #define OCFS2_FEATURE_INCOMPAT_HEARTBEAT_DEV   0x0002
 
+/*
+ * tunefs sets this incompat flag before starting the resize and clears it
+ * at the end. This flag protects users from inadvertently mounting the fs
+ * after an aborted run without fsck-ing.
+ */
+#define OCFS2_FEATURE_INCOMPAT_RESIZE_INPROG    0x0004
+
+/* Used to denote a non-clustered volume */
+#define OCFS2_FEATURE_INCOMPAT_LOCAL_MOUNT     0x0008
+
+/* Support for sparse allocation in b-trees */
+#define OCFS2_FEATURE_INCOMPAT_SPARSE_ALLOC    0x0010
 
 /*
  * Flags on ocfs2_dinode.i_flags
index 4bf39540e652fb85b99e62045bf3e84646e5f72d..a6d2f8cc165b85da7c3e607b361913a7db65e02f 100644 (file)
@@ -508,6 +508,27 @@ bail:
        return status;
 }
 
+static int ocfs2_verify_heartbeat(struct ocfs2_super *osb)
+{
+       if (ocfs2_mount_local(osb)) {
+               if (osb->s_mount_opt & OCFS2_MOUNT_HB_LOCAL) {
+                       mlog(ML_ERROR, "Cannot heartbeat on a locally "
+                            "mounted device.\n");
+                       return -EINVAL;
+               }
+       }
+
+       if (!(osb->s_mount_opt & OCFS2_MOUNT_HB_LOCAL)) {
+               if (!ocfs2_mount_local(osb) && !ocfs2_is_hard_readonly(osb)) {
+                       mlog(ML_ERROR, "Heartbeat has to be started to mount "
+                            "a read-write clustered device.\n");
+                       return -EINVAL;
+               }
+       }
+
+       return 0;
+}
+
 static int ocfs2_fill_super(struct super_block *sb, void *data, int silent)
 {
        struct dentry *root;
@@ -516,16 +537,24 @@ static int ocfs2_fill_super(struct super_block *sb, void *data, int silent)
        struct inode *inode = NULL;
        struct ocfs2_super *osb = NULL;
        struct buffer_head *bh = NULL;
+       char nodestr[8];
 
        mlog_entry("%p, %p, %i", sb, data, silent);
 
-       /* for now we only have one cluster/node, make sure we see it
-        * in the heartbeat universe */
-       if (!o2hb_check_local_node_heartbeating()) {
+       if (!ocfs2_parse_options(sb, data, &parsed_opt, 0)) {
                status = -EINVAL;
                goto read_super_error;
        }
 
+       /* for now we only have one cluster/node, make sure we see it
+        * in the heartbeat universe */
+       if (parsed_opt & OCFS2_MOUNT_HB_LOCAL) {
+               if (!o2hb_check_local_node_heartbeating()) {
+                       status = -EINVAL;
+                       goto read_super_error;
+               }
+       }
+
        /* probe for superblock */
        status = ocfs2_sb_probe(sb, &bh, &sector_size);
        if (status < 0) {
@@ -541,11 +570,6 @@ static int ocfs2_fill_super(struct super_block *sb, void *data, int silent)
        }
        brelse(bh);
        bh = NULL;
-
-       if (!ocfs2_parse_options(sb, data, &parsed_opt, 0)) {
-               status = -EINVAL;
-               goto read_super_error;
-       }
        osb->s_mount_opt = parsed_opt;
 
        sb->s_magic = OCFS2_SUPER_MAGIC;
@@ -588,21 +612,16 @@ static int ocfs2_fill_super(struct super_block *sb, void *data, int silent)
        }
 
        if (!ocfs2_is_hard_readonly(osb)) {
-               /* If this isn't a hard readonly mount, then we need
-                * to make sure that heartbeat is in a valid state,
-                * and that we mark ourselves soft readonly is -oro
-                * was specified. */
-               if (!(osb->s_mount_opt & OCFS2_MOUNT_HB_LOCAL)) {
-                       mlog(ML_ERROR, "No heartbeat for device (%s)\n",
-                            sb->s_id);
-                       status = -EINVAL;
-                       goto read_super_error;
-               }
-
                if (sb->s_flags & MS_RDONLY)
                        ocfs2_set_ro_flag(osb, 0);
        }
 
+       status = ocfs2_verify_heartbeat(osb);
+       if (status < 0) {
+               mlog_errno(status);
+               goto read_super_error;
+       }
+
        osb->osb_debug_root = debugfs_create_dir(osb->uuid_str,
                                                 ocfs2_debugfs_root);
        if (!osb->osb_debug_root) {
@@ -635,9 +654,14 @@ static int ocfs2_fill_super(struct super_block *sb, void *data, int silent)
 
        ocfs2_complete_mount_recovery(osb);
 
-       printk(KERN_INFO "ocfs2: Mounting device (%s) on (node %d, slot %d) "
+       if (ocfs2_mount_local(osb))
+               snprintf(nodestr, sizeof(nodestr), "local");
+       else
+               snprintf(nodestr, sizeof(nodestr), "%d", osb->node_num);
+
+       printk(KERN_INFO "ocfs2: Mounting device (%s) on (node %s, slot %d) "
               "with %s data mode.\n",
-              osb->dev_str, osb->node_num, osb->slot_num,
+              osb->dev_str, nodestr, osb->slot_num,
               osb->s_mount_opt & OCFS2_MOUNT_DATA_WRITEBACK ? "writeback" :
               "ordered");
 
@@ -999,7 +1023,11 @@ static int ocfs2_fill_local_node_info(struct ocfs2_super *osb)
 
        /* XXX hold a ref on the node while mounte?  easy enough, if
         * desirable. */
-       osb->node_num = o2nm_this_node();
+       if (ocfs2_mount_local(osb))
+               osb->node_num = 0;
+       else
+               osb->node_num = o2nm_this_node();
+
        if (osb->node_num == O2NM_MAX_NODES) {
                mlog(ML_ERROR, "could not find this host's node number\n");
                status = -ENOENT;
@@ -1084,6 +1112,9 @@ static int ocfs2_mount_volume(struct super_block *sb)
                goto leave;
        }
 
+       if (ocfs2_mount_local(osb))
+               goto leave;
+
        /* This should be sent *after* we recovered our journal as it
         * will cause other nodes to unmark us as needing
         * recovery. However, we need to send it *before* dropping the
@@ -1114,6 +1145,7 @@ static void ocfs2_dismount_volume(struct super_block *sb, int mnt_err)
 {
        int tmp;
        struct ocfs2_super *osb = NULL;
+       char nodestr[8];
 
        mlog_entry("(0x%p)\n", sb);
 
@@ -1177,8 +1209,13 @@ static void ocfs2_dismount_volume(struct super_block *sb, int mnt_err)
 
        atomic_set(&osb->vol_state, VOLUME_DISMOUNTED);
 
-       printk(KERN_INFO "ocfs2: Unmounting device (%s) on (node %d)\n",
-              osb->dev_str, osb->node_num);
+       if (ocfs2_mount_local(osb))
+               snprintf(nodestr, sizeof(nodestr), "local");
+       else
+               snprintf(nodestr, sizeof(nodestr), "%d", osb->node_num);
+
+       printk(KERN_INFO "ocfs2: Unmounting device (%s) on (node %s)\n",
+              osb->dev_str, nodestr);
 
        ocfs2_delete_osb(osb);
        kfree(osb);
@@ -1536,6 +1573,7 @@ static int ocfs2_check_volume(struct ocfs2_super *osb)
 {
        int status = 0;
        int dirty;
+       int local;
        struct ocfs2_dinode *local_alloc = NULL; /* only used if we
                                                  * recover
                                                  * ourselves. */
@@ -1563,8 +1601,10 @@ static int ocfs2_check_volume(struct ocfs2_super *osb)
                     "recovering volume.\n");
        }
 
+       local = ocfs2_mount_local(osb);
+
        /* will play back anything left in the journal. */
-       ocfs2_journal_load(osb->journal);
+       ocfs2_journal_load(osb->journal, local);
 
        if (dirty) {
                /* recover my local alloc if we didn't unmount cleanly. */
index 5b4dca79990bfb4218e46d307128debe174f0ccb..0315a8b61ed67b221a33361cac82b052d7af46b5 100644 (file)
@@ -1000,6 +1000,9 @@ int ocfs2_register_net_handlers(struct ocfs2_super *osb)
 {
        int status = 0;
 
+       if (ocfs2_mount_local(osb))
+               return 0;
+
        status = o2net_register_handler(OCFS2_MESSAGE_TYPE_RESPONSE,
                                        osb->net_key,
                                        sizeof(struct ocfs2_response_msg),