Merge branches 'urgent.2012.10.27a', 'doc.2012.11.16a', 'fixes.2012.11.13a', 'srcu...
[linux-3.10.git] / kernel / rcutree.c
index 24b21cb..5ffadcc 100644 (file)
@@ -68,9 +68,9 @@ static struct lock_class_key rcu_fqs_class[RCU_NUM_LVLS];
        .level = { &sname##_state.node[0] }, \
        .call = cr, \
        .fqs_state = RCU_GP_IDLE, \
-       .gpnum = -300, \
-       .completed = -300, \
-       .onofflock = __RAW_SPIN_LOCK_UNLOCKED(&sname##_state.onofflock), \
+       .gpnum = 0UL - 300UL, \
+       .completed = 0UL - 300UL, \
+       .orphan_lock = __RAW_SPIN_LOCK_UNLOCKED(&sname##_state.orphan_lock), \
        .orphan_nxttail = &sname##_state.orphan_nxtlist, \
        .orphan_donetail = &sname##_state.orphan_donelist, \
        .barrier_mutex = __MUTEX_INITIALIZER(sname##_state.barrier_mutex), \
@@ -212,13 +212,13 @@ DEFINE_PER_CPU(struct rcu_dynticks, rcu_dynticks) = {
 #endif
 };
 
-static int blimit = 10;                /* Maximum callbacks per rcu_do_batch. */
-static int qhimark = 10000;    /* If this many pending, ignore blimit. */
-static int qlowmark = 100;     /* Once only this many pending, use blimit. */
+static long blimit = 10;       /* Maximum callbacks per rcu_do_batch. */
+static long qhimark = 10000;   /* If this many pending, ignore blimit. */
+static long qlowmark = 100;    /* Once only this many pending, use blimit. */
 
-module_param(blimit, int, 0444);
-module_param(qhimark, int, 0444);
-module_param(qlowmark, int, 0444);
+module_param(blimit, long, 0444);
+module_param(qhimark, long, 0444);
+module_param(qlowmark, long, 0444);
 
 int rcu_cpu_stall_suppress __read_mostly; /* 1 = suppress stall warnings. */
 int rcu_cpu_stall_timeout __read_mostly = CONFIG_RCU_CPU_STALL_TIMEOUT;
@@ -313,7 +313,7 @@ static int
 cpu_needs_another_gp(struct rcu_state *rsp, struct rcu_data *rdp)
 {
        return *rdp->nxttail[RCU_DONE_TAIL +
-                            ACCESS_ONCE(rsp->completed) != rdp->completed] &&
+                            (ACCESS_ONCE(rsp->completed) != rdp->completed)] &&
               !rcu_gp_in_progress(rsp);
 }
 
@@ -1436,15 +1436,37 @@ rcu_start_gp(struct rcu_state *rsp, unsigned long flags)
            !cpu_needs_another_gp(rsp, rdp)) {
                /*
                 * Either we have not yet spawned the grace-period
-                * task or this CPU does not need another grace period.
+                * task, this CPU does not need another grace period,
+                * or a grace period is already in progress.
                 * Either way, don't start a new grace period.
                 */
                raw_spin_unlock_irqrestore(&rnp->lock, flags);
                return;
        }
 
+       /*
+        * Because there is no grace period in progress right now,
+        * any callbacks we have up to this point will be satisfied
+        * by the next grace period.  So promote all callbacks to be
+        * handled after the end of the next grace period.  If the
+        * CPU is not yet aware of the end of the previous grace period,
+        * we need to allow for the callback advancement that will
+        * occur when it does become aware.  Deadlock prevents us from
+        * making it aware at this point: We cannot acquire a leaf
+        * rcu_node ->lock while holding the root rcu_node ->lock.
+        */
+       rdp->nxttail[RCU_NEXT_READY_TAIL] = rdp->nxttail[RCU_NEXT_TAIL];
+       if (rdp->completed == rsp->completed)
+               rdp->nxttail[RCU_WAIT_TAIL] = rdp->nxttail[RCU_NEXT_TAIL];
+
        rsp->gp_flags = RCU_GP_FLAG_INIT;
-       raw_spin_unlock_irqrestore(&rnp->lock, flags);
+       raw_spin_unlock(&rnp->lock); /* Interrupts remain disabled. */
+
+       /* Ensure that CPU is aware of completion of last grace period. */
+       rcu_process_gp_end(rsp, rdp);
+       local_irq_restore(flags);
+
+       /* Wake up rcu_gp_kthread() to start the grace period. */
        wake_up(&rsp->gp_wq);
 }
 
@@ -1605,7 +1627,7 @@ rcu_check_quiescent_state(struct rcu_state *rsp, struct rcu_data *rdp)
 /*
  * Send the specified CPU's RCU callbacks to the orphanage.  The
  * specified CPU must be offline, and the caller must hold the
- * ->onofflock.
+ * ->orphan_lock.
  */
 static void
 rcu_send_cbs_to_orphanage(int cpu, struct rcu_state *rsp,
@@ -1613,8 +1635,8 @@ rcu_send_cbs_to_orphanage(int cpu, struct rcu_state *rsp,
 {
        /*
         * Orphan the callbacks.  First adjust the counts.  This is safe
-        * because ->onofflock excludes _rcu_barrier()'s adoption of
-        * the callbacks, thus no memory barrier is required.
+        * because _rcu_barrier() excludes CPU-hotplug operations, so it
+        * cannot be running now.  Thus no memory barrier is required.
         */
        if (rdp->nxtlist != NULL) {
                rsp->qlen_lazy += rdp->qlen_lazy;
@@ -1655,7 +1677,7 @@ rcu_send_cbs_to_orphanage(int cpu, struct rcu_state *rsp,
 
 /*
  * Adopt the RCU callbacks from the specified rcu_state structure's
- * orphanage.  The caller must hold the ->onofflock.
+ * orphanage.  The caller must hold the ->orphan_lock.
  */
 static void rcu_adopt_orphan_cbs(struct rcu_state *rsp)
 {
@@ -1734,7 +1756,7 @@ static void rcu_cleanup_dead_cpu(int cpu, struct rcu_state *rsp)
 
        /* Exclude any attempts to start a new grace period. */
        mutex_lock(&rsp->onoff_mutex);
-       raw_spin_lock_irqsave(&rsp->onofflock, flags);
+       raw_spin_lock_irqsave(&rsp->orphan_lock, flags);
 
        /* Orphan the dead CPU's callbacks, and adopt them if appropriate. */
        rcu_send_cbs_to_orphanage(cpu, rsp, rnp, rdp);
@@ -1761,10 +1783,10 @@ static void rcu_cleanup_dead_cpu(int cpu, struct rcu_state *rsp)
        /*
         * We still hold the leaf rcu_node structure lock here, and
         * irqs are still disabled.  The reason for this subterfuge is
-        * because invoking rcu_report_unblock_qs_rnp() with ->onofflock
+        * because invoking rcu_report_unblock_qs_rnp() with ->orphan_lock
         * held leads to deadlock.
         */
-       raw_spin_unlock(&rsp->onofflock); /* irqs remain disabled. */
+       raw_spin_unlock(&rsp->orphan_lock); /* irqs remain disabled. */
        rnp = rdp->mynode;
        if (need_report & RCU_OFL_TASKS_NORM_GP)
                rcu_report_unblock_qs_rnp(rnp, flags);
@@ -1801,7 +1823,8 @@ static void rcu_do_batch(struct rcu_state *rsp, struct rcu_data *rdp)
 {
        unsigned long flags;
        struct rcu_head *next, *list, **tail;
-       int bl, count, count_lazy, i;
+       long bl, count, count_lazy;
+       int i;
 
        /* If no callbacks are ready, just return.*/
        if (!cpu_has_callbacks_ready_to_invoke(rdp)) {
@@ -2237,10 +2260,28 @@ static inline int rcu_blocking_is_gp(void)
  * rcu_read_lock_sched().
  *
  * This means that all preempt_disable code sequences, including NMI and
- * hardware-interrupt handlers, in progress on entry will have completed
- * before this primitive returns.  However, this does not guarantee that
- * softirq handlers will have completed, since in some kernels, these
- * handlers can run in process context, and can block.
+ * non-threaded hardware-interrupt handlers, in progress on entry will
+ * have completed before this primitive returns.  However, this does not
+ * guarantee that softirq handlers will have completed, since in some
+ * kernels, these handlers can run in process context, and can block.
+ *
+ * Note that this guarantee implies further memory-ordering guarantees.
+ * On systems with more than one CPU, when synchronize_sched() returns,
+ * each CPU is guaranteed to have executed a full memory barrier since the
+ * end of its last RCU-sched read-side critical section whose beginning
+ * preceded the call to synchronize_sched().  In addition, each CPU having
+ * an RCU read-side critical section that extends beyond the return from
+ * synchronize_sched() is guaranteed to have executed a full memory barrier
+ * after the beginning of synchronize_sched() and before the beginning of
+ * that RCU read-side critical section.  Note that these guarantees include
+ * CPUs that are offline, idle, or executing in user mode, as well as CPUs
+ * that are executing in the kernel.
+ *
+ * Furthermore, if CPU A invoked synchronize_sched(), which returned
+ * to its caller on CPU B, then both CPU A and CPU B are guaranteed
+ * to have executed a full memory barrier during the execution of
+ * synchronize_sched() -- even if CPU A and CPU B are the same CPU (but
+ * again only if the system has more than one CPU).
  *
  * This primitive provides the guarantees made by the (now removed)
  * synchronize_kernel() API.  In contrast, synchronize_rcu() only
@@ -2256,7 +2297,10 @@ void synchronize_sched(void)
                           "Illegal synchronize_sched() in RCU-sched read-side critical section");
        if (rcu_blocking_is_gp())
                return;
-       wait_rcu_gp(call_rcu_sched);
+       if (rcu_expedited)
+               synchronize_sched_expedited();
+       else
+               wait_rcu_gp(call_rcu_sched);
 }
 EXPORT_SYMBOL_GPL(synchronize_sched);
 
@@ -2268,6 +2312,9 @@ EXPORT_SYMBOL_GPL(synchronize_sched);
  * read-side critical sections have completed.  RCU read-side critical
  * sections are delimited by rcu_read_lock_bh() and rcu_read_unlock_bh(),
  * and may be nested.
+ *
+ * See the description of synchronize_sched() for more detailed information
+ * on memory ordering guarantees.
  */
 void synchronize_rcu_bh(void)
 {
@@ -2277,13 +2324,13 @@ void synchronize_rcu_bh(void)
                           "Illegal synchronize_rcu_bh() in RCU-bh read-side critical section");
        if (rcu_blocking_is_gp())
                return;
-       wait_rcu_gp(call_rcu_bh);
+       if (rcu_expedited)
+               synchronize_rcu_bh_expedited();
+       else
+               wait_rcu_gp(call_rcu_bh);
 }
 EXPORT_SYMBOL_GPL(synchronize_rcu_bh);
 
-static atomic_t sync_sched_expedited_started = ATOMIC_INIT(0);
-static atomic_t sync_sched_expedited_done = ATOMIC_INIT(0);
-
 static int synchronize_sched_expedited_cpu_stop(void *data)
 {
        /*
@@ -2340,10 +2387,32 @@ static int synchronize_sched_expedited_cpu_stop(void *data)
  */
 void synchronize_sched_expedited(void)
 {
-       int firstsnap, s, snap, trycount = 0;
+       long firstsnap, s, snap;
+       int trycount = 0;
+       struct rcu_state *rsp = &rcu_sched_state;
 
-       /* Note that atomic_inc_return() implies full memory barrier. */
-       firstsnap = snap = atomic_inc_return(&sync_sched_expedited_started);
+       /*
+        * If we are in danger of counter wrap, just do synchronize_sched().
+        * By allowing sync_sched_expedited_started to advance no more than
+        * ULONG_MAX/8 ahead of sync_sched_expedited_done, we are ensuring
+        * that more than 3.5 billion CPUs would be required to force a
+        * counter wrap on a 32-bit system.  Quite a few more CPUs would of
+        * course be required on a 64-bit system.
+        */
+       if (ULONG_CMP_GE((ulong)atomic_long_read(&rsp->expedited_start),
+                        (ulong)atomic_long_read(&rsp->expedited_done) +
+                        ULONG_MAX / 8)) {
+               synchronize_sched();
+               atomic_long_inc(&rsp->expedited_wrap);
+               return;
+       }
+
+       /*
+        * Take a ticket.  Note that atomic_inc_return() implies a
+        * full memory barrier.
+        */
+       snap = atomic_long_inc_return(&rsp->expedited_start);
+       firstsnap = snap;
        get_online_cpus();
        WARN_ON_ONCE(cpu_is_offline(raw_smp_processor_id()));
 
@@ -2355,48 +2424,65 @@ void synchronize_sched_expedited(void)
                             synchronize_sched_expedited_cpu_stop,
                             NULL) == -EAGAIN) {
                put_online_cpus();
+               atomic_long_inc(&rsp->expedited_tryfail);
+
+               /* Check to see if someone else did our work for us. */
+               s = atomic_long_read(&rsp->expedited_done);
+               if (ULONG_CMP_GE((ulong)s, (ulong)firstsnap)) {
+                       /* ensure test happens before caller kfree */
+                       smp_mb__before_atomic_inc(); /* ^^^ */
+                       atomic_long_inc(&rsp->expedited_workdone1);
+                       return;
+               }
 
                /* No joy, try again later.  Or just synchronize_sched(). */
                if (trycount++ < 10) {
                        udelay(trycount * num_online_cpus());
                } else {
-                       synchronize_sched();
+                       wait_rcu_gp(call_rcu_sched);
+                       atomic_long_inc(&rsp->expedited_normal);
                        return;
                }
 
-               /* Check to see if someone else did our work for us. */
-               s = atomic_read(&sync_sched_expedited_done);
-               if (UINT_CMP_GE((unsigned)s, (unsigned)firstsnap)) {
-                       smp_mb(); /* ensure test happens before caller kfree */
+               /* Recheck to see if someone else did our work for us. */
+               s = atomic_long_read(&rsp->expedited_done);
+               if (ULONG_CMP_GE((ulong)s, (ulong)firstsnap)) {
+                       /* ensure test happens before caller kfree */
+                       smp_mb__before_atomic_inc(); /* ^^^ */
+                       atomic_long_inc(&rsp->expedited_workdone2);
                        return;
                }
 
                /*
                 * Refetching sync_sched_expedited_started allows later
-                * callers to piggyback on our grace period.  We subtract
-                * 1 to get the same token that the last incrementer got.
-                * We retry after they started, so our grace period works
-                * for them, and they started after our first try, so their
-                * grace period works for us.
+                * callers to piggyback on our grace period.  We retry
+                * after they started, so our grace period works for them,
+                * and they started after our first try, so their grace
+                * period works for us.
                 */
                get_online_cpus();
-               snap = atomic_read(&sync_sched_expedited_started);
+               snap = atomic_long_read(&rsp->expedited_start);
                smp_mb(); /* ensure read is before try_stop_cpus(). */
        }
+       atomic_long_inc(&rsp->expedited_stoppedcpus);
 
        /*
         * Everyone up to our most recent fetch is covered by our grace
         * period.  Update the counter, but only if our work is still
         * relevant -- which it won't be if someone who started later
-        * than we did beat us to the punch.
+        * than we did already did their update.
         */
        do {
-               s = atomic_read(&sync_sched_expedited_done);
-               if (UINT_CMP_GE((unsigned)s, (unsigned)snap)) {
-                       smp_mb(); /* ensure test happens before caller kfree */
+               atomic_long_inc(&rsp->expedited_done_tries);
+               s = atomic_long_read(&rsp->expedited_done);
+               if (ULONG_CMP_GE((ulong)s, (ulong)snap)) {
+                       /* ensure test happens before caller kfree */
+                       smp_mb__before_atomic_inc(); /* ^^^ */
+                       atomic_long_inc(&rsp->expedited_done_lost);
                        break;
                }
-       } while (atomic_cmpxchg(&sync_sched_expedited_done, s, snap) != s);
+       } while (atomic_long_cmpxchg(&rsp->expedited_done, s, snap) != s);
+       atomic_long_inc(&rsp->expedited_done_exit);
 
        put_online_cpus();
 }