\file
LockImpl
.d
\brief Provides a framework for implementing blocking locks and related
synchronizers (semaphores, events, etc) that rely on
first-in-first-out (FIFO) wait queues.
Written by Doug Lea with assistance from members of JCP JSR-166
Expert Group and released to the public domain, as explained at
http:
//creativecommons.org/licenses/publicdomain
Ported to D by Ben Hinkle.
Email comments and bug reports to ben.hinkle@gmail.com
revision 2.01
- class
AbstractLock
;
- \class
AbstractLock
\brief Provides a framework for implementing blocking locks and
related synchronizers (semaphores, events, etc) that rely on
first-in-first-out (FIFO) wait queues.
This class is designed to be a useful basis for most kinds of
synchronizers that rely on a single atomic int value to
represent state. Subclasses must define the protected methods that
change this state, and which define what that state means in terms
of this object being acquired or released. Given these, the other
methods in this class carry out all queuing and blocking
mechanics. Subclasses can maintain other state fields, but only the
atomically updated int value manipulated using the getter
and setter for the state property and compareAndSet32 is tracked
with respect to synchronization.
Subclasses should be defined as non-public internal helper
classes that are used to implement the synchronization properties
of their enclosing class. Class
AbstractLock
does not
implement any synchronization interface. Instead it defines
methods such as acquire that can be invoked as appropriate by
concrete locks and related synchronizers to implement their public
methods.
This class supports either or both a default exclusive
mode and a shared mode. When acquired in exclusive mode,
attempted acquires by other threads cannot succeed. Shared mode
acquires by multiple threads may (but need not) succeed. This class
does not "understand" these differences except in the
mechanical sense that when a shared mode acquire succeeds, the next
waiting thread (if one exists) must also determine whether it can
acquire as well. Threads waiting in the different modes share the
same FIFO queue. Usually, implementation subclasses support only
one of these modes, but both can come into play for example in a
ReadWriteLock. Subclasses that support only exclusive or only
shared modes need not define the methods supporting the unused
mode.
This class defines a nested ConditionObject class that can be
used as a Condition implementation by subclasses supporting
exclusive mode for which method isHeldExclusively reports whether
synchronization is exclusively held with respect to the current
thread, method release invoked with the current state value fully
releases this object, and acquire, given this saved state value,
eventually restores this object to its previous acquired state. No
AbstractLock
method otherwise creates such a condition, so
if this constraint cannot be met, do not use it. The behavior of
ConditionObject depends of course on the semantics of its
synchronizer implementation.
This class provides inspection, instrumentation, and monitoring
methods for the internal queue, as well as similar methods for
condition objects. These can be exported as desired into classes
using an
AbstractLock
for their synchronization mechanics.
Usage
To use this class as the basis of a synchronizer, redefine the
following methods, as applicable, by inspecting and/or modifying
the synchronization state using the state getter and setter and/or
compareAndSet32.
- {@link #tryAcquire}
- {@link #tryRelease}
- {@link #tryAcquireShared}
- {@link #tryReleaseShared}
- {@link #isHeldExclusively}
Each of these methods by default throws
UnsupportedOperationException. Implementations of these methods
must be internally thread-safe, and should in general be short and
not block. Defining these methods is the only supported
means of using this class. All other methods are declared
final because they cannot be independently varied.
Even though this class is based on an internal FIFO queue, it
does not automatically enforce FIFO acquisition policies. The core
of exclusive synchronization takes the form:
Acquire:
while (!tryAcquire(arg)) {
enqueue thread if it is not already queued;
possibly block current thread;
}
Release:
if (tryRelease(arg))
unblock the first queued thread;
(Shared mode is similar but may involve cascading signals.)
Because checks in acquire are invoked before enqueuing, a newly
acquiring thread may barge ahead of others that are
blocked and queued. However, you can, if desired, define
tryAcquire and/or tryAcquireShared to disable
barging by internally invoking one or more of the inspection
methods. In particular, a strict FIFO lock can define
tryAcquire to immediately return false if
getFirstQueuedThread does not return the current thread. A
normally preferable non-strict fair version can immediately return
false only if hasQueuedThreads returns true and
getFirstQueuedThread is not the current thread; or
equivalently, that getFirstQueuedThread is both non-null
and not the current thread. Further variations are possible.
Throughput and scalability are generally highest for the
default barging (also known as greedy,
renouncement, and convoy-avoidance) strategy.
While this is not guaranteed to be fair or starvation-free, earlier
queued threads are allowed to recontend before later queued
threads, and each recontention has an unbiased chance to succeed
against incoming threads. Also, while acquires do not
"spin" in the usual sense, they may perform multiple
invocations of tryAcquire interspersed with other
computations before blocking. This gives most of the benefits of
spins when exclusive synchronization is only briefly held, without
most of the liabilities when it isn't. If so desired, you can
augment this by preceding calls to acquire methods with "fast-path"
checks, possibly prechecking {@link #hasContended} and/or
hasQueuedThreads to only do so if the synchronizer is likely not to
be contended.
Usage Examples
Here is a non-reentrant mutual exclusion lock class that uses
the value zero to represent the unlocked state, and one to
represent the locked state. It also supports conditions and exposes
one of the instrumentation methods:
class Mutex : Lock {
// Our internal helper class
private class Sync : AbstractLock {
// Report whether in locked state
bool isHeldExclusively() {
return state == 1;
}
// Acquire the lock if state is zero
bool tryAcquire(int acquires) {
assert ( acquires == 1 ); // Otherwise unused
return Atomic.compareAndSet32(&state, 0, 1);
}
// Release the lock by setting state to zero
protected bool tryRelease(int releases) {
assert ( releases == 1 ); // Otherwise unused
state = 0;
return true;
}
// Provide a Condition
Condition newCondition() { return new ConditionObject(); }
}
// The sync object does all the hard work. We just forward to it.
private Sync sync = new Sync();
void lock() { sync.acquire(1); }
bool tryLock() { return sync.tryAcquire(1); }
void unlock() { sync.release(1); }
Condition newCondition() { return sync.newCondition(); }
bool isLocked() { return sync.isHeldExclusively(); }
bool hasQueuedThreads() { return sync.hasQueuedThreads(); }
bool tryLock(long timeout, TimeUnit unit) {
return sync.tryAcquireNanos(1, toNanos(timeout,unit));
}
}
Here is a latch class that is like a CountDownLatch except that
it only requires a single notify to fire. Because a latch
is non-exclusive, it uses the shared acquire and release
methods.
class BoolLatch {
private class Sync : AbstractLock {
bool isSignalled() { return state != 0; }
protected int tryAcquireShared(int ignore) {
return isSignalled()? 1 : -1;
}
protected bool tryReleaseShared(int ignore) {
state = 1;
return true;
}
}
private final Sync sync = new Sync();
bool isSignalled() { return sync.isSignalled(); }
void notify() { sync.releaseShared(1); }
void wait() {
sync.acquireShared(1);
}
}
- this();
- Creates a new AbstractLock instance
with initial synchronization state of zero.
- class
Node
;
- Wait queue node class.
The wait queue is a variant of a "CLH" (Craig, Landin, and
Hagersten) lock queue. CLH locks are normally used for spinlocks.
We instead use them for blocking synchronizers, but use the same
basic tactic of holding some of the control information about a
thread in the predecessor of its node. A "status" field in each
node keeps track of whether a thread should block. A node is
signalled when its predecessor releases. Each node of the queue
otherwise serves as a specific-notification-style monitor holding
a single waiting thread. The status field does NOT control
whether threads are granted locks etc though. A thread may try
to acquire if it is first in the queue. But being first does not
guarantee success; it only gives the right to contend. So the
currently released contender thread may need to rewait.
To enqueue into a CLH lock, you atomically splice it in as new
tail. To dequeue, you just set the head field.
+------+ prev +-----+ +-----+
head | | <---- | | <---- | | tail
+------+ +-----+ +-----+
Insertion into a CLH queue requires only a single atomic
operation on "tail", so there is a simple atomic point of
demarcation from unqueued to queued. Similarly, dequeing
involves only updating the "head". However, it takes a bit
more work for nodes to determine who their successors are,
in part to deal with possible cancellation due to timeouts.
The "prev" links (not used in original CLH locks), are mainly
needed to handle cancellation. If a node is cancelled, its
successor is (normally) relinked to a non-cancelled
predecessor. For explanation of similar mechanics in the case
of spin locks, see the papers by Scott and Scherer at
http:
//www.cs.rochester.edu/u/scott/synchronization/
We also use "next" links to implement blocking mechanics.
The thread id for each node is kept in its own node, so a
predecessor signals the next node to wake up by traversing
next link to determine which thread it is. Determination of
successor must avoid races with newly queued nodes to set
the "next" fields of their predecessors. This is solved
when necessary by checking backwards from the atomically
updated "tail" when a node's successor appears to be null.
(Or, said differently, the next-links are an optimization
so that we don't usually need a backward scan.)
Cancellation introduces some conservatism to the basic
algorithms. Since we must poll for cancellation of other nodes,
we can miss noticing whether a cancelled node is ahead or behind
us. This is dealt with by always unparking successors upon
cancellation, allowing them to stabilize on a new predecessor.
CLH queues need a dummy header node to get started. But we
don't create them on construction, because it would be wasted
effort if there is never contention. Instead, the node is
constructed and head and tail pointers are set upon first
contention.
Threads waiting on Conditions use the same nodes, but use an
additional link. Conditions only need to link nodes in simple
(non-concurrent) linked queues because they are only accessed
when exclusively held. Upon await, a node is inserted into a
condition queue. Upon signal, the node is transferred to the
main queue. A special value of status field is used to mark
which queue a node is on.
Thanks go to Dave Dice, Mark Moir, Victor Luchangco, Bill
Scherer and Michael Scott, along with members of JSR-166
expert group, for helpful ideas, discussions, and critiques
on the design of this class.
- const final int
CANCELLED
;
- waitStatus value to indicate thread has cancelled
- const final int
SIGNAL
;
- waitStatus value to indicate thread needs unparking
- const final int
CONDITION
;
- waitStatus value to indicate thread is waiting on condition
- static final Node
SHARED
;
- Marker to indicate a node is waiting in shared mode
- static final Node
EXCLUSIVE
;
- Marker to indicate a node is waiting in exclusive mode
- final int
waitStatus
;
- Status field, taking on only the values:
SIGNAL:
The successor of this node is (or will soon be)
blocked (via park), so the current node must
unpark its successor when it releases or
cancels. To avoid races, acquire methods must
first indicate they need a signal,
then retry the atomic acquire, and then,
on failure, block.
CANCELLED:
Node is cancelled due to timeout
Nodes never leave this state. In particular,
a thread with cancelled node never again blocks.
CONDITION:
Node is currently on a condition queue
It will not be used as a sync queue node until
transferred. (Use of this value here
has nothing to do with the other uses
of the field, but simplifies mechanics.)
0: None of the above
The values are arranged numerically to simplify use.
Non-negative values mean that a node doesn't need to
signal. So, most code doesn't need to check for particular
values, just for sign.
The field is initialized to 0 for normal sync nodes, and
CONDITION for condition nodes. It is modified only using
CAS.
- final Node
prev
;
- Link to predecessor node that current node/thread relies on for
checking waitStatus. Assigned during enqueing, and nulled out
(for sake of GC) only upon dequeuing. Also, upon cancellation
of a predecessor, we short-circuit while finding a
non-cancelled one, which will always exist because the head
node is never cancelled: A node becomes head only as a result
of successful acquire. A cancelled thread never succeeds in
acquiring, and a thread only cancels itself, not any other
node.
- final Node
next
;
- Link to the successor node that the current node/thread unparks
upon release. Assigned once during enqueuing, and nulled out
(for sake of GC) when no longer needed. Upon cancellation, we
cannot adjust this field, but can notice status and bypass the
node if cancelled. The enq operation does not assign
next
field of a predecessor until after attachment, so seeing a null
next
field does not necessarily mean that node is at end of
queue. However, if a
next
field appears to be null, we can scan
prev's from the tail to double-check.
- final Thread
thread
;
- The
thread
that enqueued this node. Initialized on
construction and nulled out after use.
- final Node
nextWaiter
;
- Link to next node waiting on condition, or the special value
SHARED. Because condition queues are accessed only when
holding in exclusive mode, we just need a simple linked queue
to hold nodes while they are waiting on conditions. They are
then transferred to the queue to re-acquire. And because
conditions can only be exclusive, we save a field by using
special value to indicate shared mode.
- final bool
isShared
();
- Returns true if node is waiting in shared mode
- final Node
predecessor
();
- Returns previous node. Use when
predecessor
cannot be null.
\return the
predecessor
of this node
- protected int
state_
;
- The synchronization state.
- protected final int
state
();
- Returns the current value of synchronization
state
.
This operation has memory semantics of a volatile read.
\return current
state
value
- protected final void
state
(int newState);
- Sets the value of synchronization
state
.
This operation has memory semantics of a volatile write.
\param newState the new
state
value
- final void
acquireQueued
(Node node, int arg);
- Acquire in exclusive uninterruptible mode for thread already in
queue. Used by condition wait methods as well as acquire.
\param node the node
\param arg the acquire argument
- protected bool
tryAcquire
(int arg);
- Attempts to acquire in exclusive mode. This method should query
if the state of the object permits it to be acquired in the
exclusive mode, and if so to acquire it.
This method is always invoked by the thread performing
acquire. If this method reports failure, the acquire method may
queue the thread, if it is not already queued, until it is
signalled by a release from some other thread. This can be used
to implement method tryLock().
The default implementation throws
UnsupportedOperationException
\param arg the acquire argument. This value is always the one
passed to an acquire method, or is the value saved on entry to a
condition wait. The value is otherwise uninterpreted and can
represent anything you like.
\return true if successful. Upon success, this object has been
acquired.
- protected bool
tryRelease
(int arg);
- Attempts to set the state to reflect a release in exclusive mode.
This method is always invoked by the thread performing
release.
The default implementation throws
UnsupportedOperationException
\param arg the release argument. This value is always the one
passed to a release method, or the current state value upon entry
to a condition wait. The value is otherwise uninterpreted and
can represent anything you like.
\return true if this object is now in a fully released state,
so that any waiting threads may attempt to acquire; and false
otherwise.
- protected int
tryAcquireShared
(int arg);
- Attempts to acquire in shared mode. This method should query if
the state of the object permits it to be acquired in the shared
mode, and if so to acquire it.
This method is always invoked by the thread performing
acquire. If this method reports failure, the acquire method
may queue the thread, if it is not already queued, until it is
signalled by a release from some other thread.
The default implementation throws
UnsupportedOperationException
\param arg the acquire argument. This value is always the one
passed to an acquire method, or is the value saved on entry to a
condition wait. The value is otherwise uninterpreted and can
represent anything you like.
\return a negative value on failure, zero on exclusive success,
and a positive value if non-exclusively successful, in which case
a subsequent waiting thread must check availability. (Support for
three different return values enables this method to be used in
contexts where acquires only sometimes act exclusively.) Upon
success, this object has been acquired.
- protected bool
tryReleaseShared
(int arg);
- Attempts to set the state to reflect a release in shared mode.
This method is always invoked by the thread performing release.
The default implementation throws
UnsupportedOperationException.
\param arg the release argument. This value is always the one
passed to a release method, or the current state value upon entry
to a condition wait. The value is otherwise uninterpreted and
can represent anything you like.
\return true if this object is now in a fully released state,
so that any waiting threads may attempt to acquire; and false
otherwise.
- bool
isHeldExclusively
();
- Returns true if synchronization is held exclusively with respect
to the current (calling) thread. This method is invoked upon
each call to a non-waiting ConditionObject method. (Waiting
methods instead invoke release.)
The default implementation throws
UnsupportedOperationException. This method is invoked internally
only within ConditionObject methods, so need not be defined if
conditions are not used.
\return true if synchronization is held exclusively; else false
- final void
acquire
(int arg);
- Acquires in exclusive mode. Implemented by invoking at least
once tryAcquire, returning on success. Otherwise the thread is
queued, possibly repeatedly blocking and unblocking, invoking
tryAcquire until success. This method can be used to implement
method Lock.lock
\param arg the
acquire
argument. This value is conveyed to
tryAcquire but is otherwise uninterpreted and can represent
anything you like.
- final bool
tryAcquireNanos
(int arg, long nanosTimeout);
- Attempts to acquire in exclusive mode and failing if the given
timeout elapses. Implemented by first checking interrupt status,
then invoking at least once tryAcquire, returning on success.
Otherwise, the thread is queued, possibly repeatedly blocking and
unblocking, invoking tryAcquire until success or the timeout
elapses. This method can be used to implement method
Lock.tryLock(long, TimeUnit).
\param arg the acquire argument. This value is conveyed to
tryAcquire but is otherwise uninterpreted and can represent
anything you like.
\param nanosTimeout the maximum number of nanoseconds to wait
\return true if acquired; false if timed out
- final bool
release
(int arg);
- Releases in exclusive mode. Implemented by unblocking one or
more threads if tryRelease returns true. This method can be used
to implement method Lock.unlock
\param arg the
release
argument. This value is conveyed to
tryRelease but is otherwise uninterpreted and can represent
anything you like.
\return the value returned from tryRelease
- final void
acquireShared
(int arg);
- Acquires in shared mode. Implemented by first invoking at least
once tryAcquireShared, returning on success. Otherwise the
thread is queued, possibly repeatedly blocking and unblocking,
invoking tryAcquireShared until success.
\param arg the acquire argument. This value is conveyed to
tryAcquireShared but is otherwise uninterpreted and can represent
anything you like.
- final bool
tryAcquireSharedNanos
(int arg, long nanosTimeout);
- Attempts to acquire in shared mode and failing if the given
timeout elapses. Implemented by invoking at least once
tryAcquireShared, returning on success. Otherwise, the thread is
queued, possibly repeatedly blocking and unblocking, invoking
tryAcquireShared until success or the timeout elapses.
\param arg the acquire argument. This value is conveyed to
tryAcquireShared but is otherwise uninterpreted and can
represent anything you like.
\param nanosTimeout the maximum number of nanoseconds to wait
\return true if acquired; false if timed out
- final bool
releaseShared
(int arg);
- Releases in shared mode. Implemented by unblocking one or more
threads if tryReleaseShared returns true.
\param arg the release argument. This value is conveyed to
{@link #tryReleaseShared} but is otherwise uninterpreted and can
represent anything you like.
\return the value returned from tryReleaseShared
- final bool
hasQueuedThreads
();
- Queries whether any threads are waiting to acquire. Note that
because cancellations due to interrupts and timeouts may occur
at any time, a true return does not guarantee that any
other thread will ever acquire.
In this implementation, this operation returns in
constant time.
\return true if there may be other threads waiting to acquire
the lock.
- final bool
hasContended
();
- Queries whether any threads have ever contended to acquire this
synchronizer; that is if an acquire method has ever blocked.
In this implementation, this operation returns in
constant time.
\return true if there has ever been contention
- final Thread
getFirstQueuedThread
();
- Returns the first (longest-waiting) thread in the queue, or
null if no threads are currently queued.
In this implementation, this operation normally returns in
constant time, but may iterate upon contention if other threads are
concurrently modifying the queue.
\return the first (longest-waiting) thread in the queue, or
null if no threads are currently queued.
- final bool
isQueued
(Thread thread);
- Returns true if the given thread is currently queued.
This implementation traverses the queue to determine
presence of the given thread.
\param thread the thread
\return true if the given thread in on the queue
- final int
getQueueLength
();
- Returns an estimate of the number of threads waiting to acquire.
The value is only an estimate because the number of threads may
change dynamically while this method traverses internal data
structures. This method is designed for use in monitoring system
state, not for synchronization control.
\return the estimated number of threads waiting for this lock
- final Thread[]
getQueuedThreads
();
- Returns a collection containing threads that may be waiting to
acquire. Because the actual set of threads may change
dynamically while constructing this result, the returned
collection is only a best-effort estimate. The elements of the
returned collection are in no particular order. This method is
designed to facilitate construction of subclasses that provide
more extensive monitoring facilities.
\return the collection of threads
- final Thread[]
getExclusiveQueuedThreads
();
- Returns a collection containing threads that may be waiting to
acquire in exclusive mode. This has the same properties as
getQueuedThreads except that it only returns those threads
waiting due to an exclusive acquire.
\return the collection of threads
- final Thread[]
getSharedQueuedThreads
();
- Returns a collection containing threads that may be waiting to
acquire in shared mode. This has the same properties as
getQueuedThreads except that it only returns those threads
waiting due to a shared acquire.
\return the collection of threads
- char[]
toString
();
- Returns a string identifying this synchronizer, as well as its
state. The state, in brackets, includes the String "State
=" followed by the current value of state(), and
either "nonempty" or "empty" depending on
whether the queue is empty.
\return a string identifying this synchronizer, as well as its state.
- final bool
isOnSyncQueue
(Node node);
- Returns true if a node, always one that was initially placed on
a condition queue, is now waiting to reacquire on sync queue.
\param node the node
\return true if is reacquiring
- final bool
transferForNotify
(Node node);
- Transfers a node from a condition queue onto sync queue.
Returns true if successful.
\param node the node
\return true if successfully transferred (else the node was
cancelled before signal).
- final bool
transferAfterCancelledWait
(Node node);
- Transfers node, if necessary, to sync queue after a cancelled
wait. Returns true if thread was cancelled before being
signalled.
\param current the waiting thread
\param node its node
\return true if cancelled before the node was signalled.
- final int
fullyRelease
(Node node);
- Invoke release with current state value; return saved state.
Cancel node and throw exception on failure.
\param node the condition node for this wait
\return previous sync state
- final bool
owns
(ConditionObject condition);
- Queries whether the given ConditionObject
uses this synchronizer as its lock.
\param condition the condition
\return true if owned
- final bool
hasWaiters
(ConditionObject condition);
- Queries whether any threads are waiting on the given condition
associated with this synchronizer. Note that because timeouts
and interrupts may occur at any time, a true return
does not guarantee that a future signal will awaken
any threads. This method is designed primarily for use in
monitoring of the system state.
\param condition the condition
\return true if there are any waiting threads.
- final int
getWaitQueueLength
(ConditionObject condition);
- Returns an estimate of the number of threads waiting on the given
condition associated with this synchronizer. Note that because
timeouts may occur at any time, the estimate serves only as an
upper bound on the actual number of waiters. This method is
designed for use in monitoring of the system state, not for
synchronization control.
\param condition the condition
\return the estimated number of waiting threads.
- final Thread[]
getWaitingThreads
(ConditionObject condition);
- Returns a collection containing those threads that may be waiting
on the given condition associated with this synchronizer.
Because the actual set of threads may change dynamically while
constructing this result, the returned collection is only a
best-effort estimate. The elements of the returned collection are
in no particular order.
\param condition the condition
\return the collection of threads
- class
ConditionObject
: mango.locks.Condition.Condition;
- Condition implementation for a AbstractLock serving as the basis
of a Lock implementation.
Method documentation for this class describes mechanics, not
behavioral specifications from the point of view of Lock and
Condition users. Exported versions of this class will in general
need to be accompanied by documentation describing condition
semantics that rely on those of the associated
AbstractLock.
This class is Serializable, but all fields are transient,
so deserialized conditions have no waiters.
- this(AbstractLock parent);
- Creates a new ConditionObject instance.
- final void
notify
();
- Moves the longest-waiting thread, if one exists, from the
wait queue for this condition to the wait queue for the
owning lock.
- final void
notifyAll
();
- Moves all threads from the wait queue for this condition to
the wait queue for the owning lock.
- final long
waitNanos
(long nanosTimeout);
- Implements timed condition wait.
- Save lock state returned by state()
- Invoke release with saved state as argument
- Block until signalled or timed out
- Reacquire by invoking specialized version of
acquire with saved state as argument.
- final bool
wait
(long time, TimeUnit unit);
- Implements timed condition
wait
.
- Save lock state returned by state()
- Invoke release with saved state as argument
- Block until signalled, interrupted, or timed out
- Reacquire by invoking specialized version of
{@link #acquire} with saved state as argument.
- If interrupted while blocked in step 4, throw InterruptedException
- If timed out while blocked in step 4, return false, else true
- final bool
isOwnedBy
(AbstractLock sync);
- Returns true if this condition was created by the given
synchronization object
\return true if owned
- final bool
hasWaiters
();
- Queries whether any threads are waiting on this condition.
Implements AbstractLock.
hasWaiters
\return true if there are any waiting threads.
- final int
getWaitQueueLength
();
- Returns an estimate of the number of threads waiting on
this condition.
Implements AbstractLock.
getWaitQueueLength
\return the estimated number of waiting threads.
- final Thread[]
getWaitingThreads
();
- Returns a collection containing those threads that may be
waiting on this Condition.
Implements AbstractLock.
getWaitingThreads
\return the collection of threads
|