diff --git a/doc/design/LockService.adoc b/doc/design/LockService.adoc new file mode 100644 index 00000000..54f6e221 --- /dev/null +++ b/doc/design/LockService.adoc @@ -0,0 +1,98 @@ += Lock Service Design +Zhang Yifei + +Lock service maintains the holder and waiters of a specified lockId, lockId could be seen as the identity of a lock, +and a lock could have only one holder and multiple waiters at same time. + +The waiters will be queued, the first waiter could be changed to holder by the unlocking operation of last holder. + +== Holder Identity +The identity of a holder or waiter has to be a member of the RAFT cluster, because of there are server-initiated +messages. currently the clients are stateless to the server after reply, so there is no way to send the server-initiated +message to the client. + +I have considered to create a new protocol to maintain sessions for clients, but it will be a lot of work to do, for +example, the session's creation and destruction needs to be recorded in the RAFT log, sessions needs to be available +in the new leader if leadership changed, and the client actually keeps connections to all members. + +Holders and waiters in server are represented by the address(UUID) of the channel, the advantage of doing so is +the server can clear those disconnected holders and waiters base on the view of the cluster. + +== Holding Status +The holding status is only for connected members. Disconnected members can assume that they have released all locks, +because the leader of the cluster will clear those leaving members from the locking status when the view change event +arrived. + +For the partition, members are in a minority subgroup will also being cleared by the leader if majority subgroup still +present, if all subgroups are minority, the new elected leader will force clear all previous locking status after cluster +resumed. A new started cluster will clear all previous locking status as well. + +Since the locking status has the same lifecycle as the cluster, the log storage could be in-memory implementation. + +== Waiting Status +Waiting status is treated the same as holding status in the case of disconnection and partitioning. +The tricky part is how to let the waiter know that it has become the holder, this is the server-initiated message +mentioned earlier. As members of the cluster, leader can send messages to any lock service, but in what way? +Those messages must be in order and can't be lost or duplicated, assume a dedicated message to do this, leader will +send them after logs are applied, and the sending process could be async, what if the leader left, the new leader can't +ensure those messages are not lost or duplicated. + +Base on the log applying process of each member is a reliable choice, although it's not perfect. + +== Commands +LOCK:: +With the UUID of the member and the lockId. Hold the lock if possible, otherwise join the waiting queue. +TRY_LOCK:: +With the UUID of the member and the lockId. Hold the lock if possible. +UNLOCK:: +With the UUID of the member and the lockId. If the member is the holder then remove it from the holder status, +and make the first waiter to be the next holder, if the member is a waiter then remove it from the waiting queue. +UNLOCK_ALL:: +With the UUID of the member. Remove the member from all holding and waiting status. +RESET:: +With the UUIDs of members that currently connected. Check all holds and waiters if it's in the list, +if not then remove it from all holding and waiting status, notice the waiter that being promoted to the holder during +unlocking should be in the list as well. It's an internal command, it's not exposed to users. +QUERY:: +With the UUID of the member and the lockId. It's a read-only command that returns the current lock status. + +=== Reset +Members will resign from holder and waiter status when it's disconnected or in a minority subgroup of partition, it +notifies listeners that it has unlocked from all locks, but in the state machine, unlocking hasn't really happened yet. +Unlocking will happen immediately by reset command if the leader still present, or happened eventually after a new +leader present. + +There are two types reset, one is to reset with the list of current members, and another one is to reset with an empty +list which means all state will be cleared. + +The first one is used when the leader found members leaving or a new leader is elected because of previous leader +leaving. + +The second one is used when a new leader is elected and not because of the previous leader leaving. + + +Scenarios for electing a new leader:: +. Majority is just reached. +.. New member connected +.. Disconnected member reconnected +.. Merging views (no subgroup has majority members) +. Leader leave, and majority still there. +. There is a leader in majority subgroup, but view merging cause the coordinator changed and the new coordinator started +a new term voting before knowing the existence of leader. +.. The new coordinator is elected to be the new leader. +.. The existing leader is re-elected to be the leader of next term. + +Above scenario 1 will reset to empty, because potentially all members have resigned. + +Above scenario 2 will reset to current members, because the cluster has majority members all the time, these members +won't resign. + +Scenario 3.1 won't happen I think, because the existing leader will always have longer log because of the reset +command. +Scenario 3.2 will reset to current members. + +== Listener +Listeners could be registered to listen on the status change of locks. In the leader node, listeners are notified by +the RAFT working thread, and in followers, it will be notified by the thread that delivered the response message. + +== Mutex +With the lock service and the ReentrantLock could implement an exclusive lock cross JVMs. + +=== Command executing +The mutex's methods involve executing commands in the lock service, RaftException will be thrown when the command fails +to execute. + +The command executing process is uninterruptible to avoid the inconsistent state, but a timeout could be set to control +the waiting time. + +=== Unexpected status +Many factors can cause unexpected unlocking or locking status, for example, disconnect the channel, network partition, +even calling the lock service with the same lockId, so handlers could be set to handle the unexpected status, let users +know the risks and decide how to deal with them, the RaftException also comes from the same idea. diff --git a/src/org/jgroups/raft/blocks/LockService.java b/src/org/jgroups/raft/blocks/LockService.java new file mode 100644 index 00000000..5b91a1f4 --- /dev/null +++ b/src/org/jgroups/raft/blocks/LockService.java @@ -0,0 +1,845 @@ +package org.jgroups.raft.blocks; + +import static org.jgroups.raft.blocks.LockService.LockStatus.HOLDING; +import static org.jgroups.raft.blocks.LockService.LockStatus.NONE; +import static org.jgroups.raft.blocks.LockService.LockStatus.WAITING; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.LinkedHashSet; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; +import java.util.function.Consumer; + +import org.jgroups.Address; +import org.jgroups.ChannelListener; +import org.jgroups.Event; +import org.jgroups.JChannel; +import org.jgroups.Message; +import org.jgroups.UpHandler; +import org.jgroups.View; +import org.jgroups.logging.Log; +import org.jgroups.logging.LogFactory; +import org.jgroups.protocols.raft.RAFT; +import org.jgroups.protocols.raft.Role; +import org.jgroups.raft.RaftHandle; +import org.jgroups.raft.StateMachine; +import org.jgroups.util.ByteArrayDataInputStream; +import org.jgroups.util.ByteArrayDataOutputStream; +import org.jgroups.util.ExtendedUUID; +import org.jgroups.util.UUID; + +/** + * A state machine that maintains the holder and waiters for a specified lockId. For a lockId, it has only one holder + * in same time, other acquirers will be queued as waiters, when the holder unlock from the lockId, the next holder will + * be polled from the waiting queue if there is a waiter. + *

+ * The {@link Address} of the member will be used to identify the acquirers(holders and waiters), that means a new + * connected member will have a new identity, so a disconnected member will unlock from all related lockId + * automatically. + * For the cluster if there are leaving members, the leader will unlock for those members in the state machine. + * A newly started cluster or cluster resuming from multiple minority partitions will unlock for all previous members + * in the state machine. + *

+ * The {@link LockStatus} represent the member's locking status. + *

+ *

+ * Listeners could be registered to get the notification of lock status change. The order of notifications is the same + * as the order of commands executed. Don't do any heavy job or block the calling thread in the listener. + *

+ * The {@link Mutex} is a distributed implementation of {@link Lock}. It based on the lock service, a thread is holding + * the mutex also means the member is holding the lock in the lock service. There is only one {@link Mutex} instance + * for each lockId in a given lock service, {@link LockService#mutex(long)} method will create the instance if absent, + * otherwise return the existing one. + * + * @author Zhang Yifei + */ +public class LockService { + protected static final Log log = LogFactory.getLog(LockService.class); + + protected static final byte LOCK = 1, TRY_LOCK = 2, UNLOCK = 3, UNLOCK_ALL = 4, RESET = 5, QUERY = 6; + + protected final RaftHandle raft; + protected final Map locks = new HashMap<>(); + protected final Map> memberLocks = new HashMap<>(); + + protected volatile View view; + protected volatile Address lastLeader; + protected volatile ExtendedUUID address; + + protected final ConcurrentMap lockStatus = new ConcurrentHashMap<>(); + protected final CopyOnWriteArrayList listeners = new CopyOnWriteArrayList<>(); + protected final ConcurrentMap mutexes = new ConcurrentHashMap<>(); // could be weak value reference + + public LockService(JChannel channel) { + if (channel.isConnecting() || channel.isConnected()) { + throw new IllegalStateException("Illegal channel state " + channel.getState()); + } + Hook hook = createHook(); + raft = createRaft(channel, hook); + channel.setUpHandler(hook).addChannelListener(hook); + raft.addRoleListener(hook); + } + + protected Hook createHook() { + return new Hook(); + } + + protected RaftHandle createRaft(JChannel ch, StateMachine sm) { + return new RaftHandle(ch, sm); + } + + protected static class LockEntry { + public final long id; + public final LinkedHashSet waiters = new LinkedHashSet<>(); + public UUID holder; + + protected LockEntry(long id) {this.id = id;} + + protected UUID unlock() { + // make sure it's a consistent result for all nodes + Iterator i = waiters.iterator(); + if (!i.hasNext()) return holder = null; + UUID v = i.next(); i.remove(); return holder = v; + } + } + + protected class Hook implements StateMachine, RAFT.RoleChange, UpHandler, ChannelListener { + + @Override + public void readContentFrom(DataInput in) { + Map tmp = new HashMap<>(); + locks.clear(); memberLocks.clear(); + for (int i = 0, l = readInt(in); i < l; i++) { + long id = readLong(in); + LockEntry lock = new LockEntry(id); + lock.holder = readUuid(in); + for (int t = 0, m = readInt(in); t < m; t++) { + lock.waiters.add(readUuid(in)); + } + locks.put(id, lock); + bind(lock.holder, lock); + if (address.equals(lock.holder)) tmp.put(lock.id, HOLDING); + for (UUID waiter : lock.waiters) { + bind(waiter, lock); + if (address.equals(waiter)) tmp.put(lock.id, WAITING); + } + } + tmp.forEach((k, v) -> onCommit(k, null, v, false)); + } + + @Override + public void writeContentTo(DataOutput out) { + writeInt((int) locks.values().stream().filter(t -> t.holder != null).count(), out); + for (LockEntry lock : locks.values()) { + if (lock.holder == null) continue; + writeLong(lock.id, out); + writeUuid(lock.holder, out); + writeInt(lock.waiters.size(), out); + for (UUID t : lock.waiters) { + writeUuid(t, out); + } + } + } + + @Override + public byte[] apply(byte[] data, int offset, int length, boolean response) throws Exception { + var in = new ByteArrayDataInputStream(data, offset, length); + LockStatus status = null; + switch (in.readByte()) { + case LOCK: + status = doLock(readLong(in), readUuid(in), false); break; + case TRY_LOCK: + status = doLock(readLong(in), readUuid(in), true); break; + case UNLOCK: + doUnlock(readLong(in), readUuid(in)); break; + case UNLOCK_ALL: + doUnlock(readUuid(in), null); break; + case RESET: + int len = readInt(in); + List members = new ArrayList<>(len); + for (int i = 0; i < len; i++) { + members.add(readUuid(in)); + } + doReset(members); break; + case QUERY: + if (response) status = doQuery(readLong(in), readUuid(in)); break; + } + return response && status != null ? new byte[] {(byte) status.ordinal()} : null; + } + + @Override + public void roleChanged(Role role) { + if (role == Role.Leader) { + try { + reset(lastLeader != null ? view : null); + } catch (Throwable e) { + log.error("Fail to send reset command", e); + } + } + } + + @Override + public UpHandler setLocalAddress(Address a) { + address = (ExtendedUUID) a; return this; + } + + @Override + public Object up(Event evt) { + if (evt.getType() == Event.VIEW_CHANGE) { + handleView(evt.arg()); + } + return null; + } + + @Override + public Object up(Message msg) {return null;} + + @Override + public void channelDisconnected(JChannel channel) { + resign(); view = null; lastLeader = null; address = null; + } + } + + protected LockStatus doLock(long lockId, UUID member, boolean trying) { + LockEntry lock = locks.computeIfAbsent(lockId, LockEntry::new); + LockStatus prev = NONE, next = HOLDING; + if (lock.holder == null) { + lock.holder = member; + } else if (lock.holder.equals(member)) { + prev = HOLDING; + } else if (trying) { + prev = next = lock.waiters.contains(member) ? WAITING : NONE; + } else { + if (!lock.waiters.add(member)) prev = WAITING; + next = WAITING; + } + if (prev != next) bind(member, lock); + if (address.equals(member)) { + onCommit(lockId, prev, next, false); + } + if (log.isTraceEnabled()) { + log.trace("[%s] %s lock %s, prev: %s, next: %s", address, member, lockId, prev, next); + } + return next; + } + + protected void doUnlock(long lockId, UUID member) { + LockEntry lock = locks.get(lockId); if (lock == null) return; + if (doUnlock(member, lock, null)) unbind(member, lock); + } + + protected void doUnlock(UUID member, Set unlocking) { + Set set = memberLocks.remove(member); if (set == null) return; + for (LockEntry lock : set) doUnlock(member, lock, unlocking); + } + + protected boolean doUnlock(UUID member, LockEntry lock, Set unlocking) { + LockStatus prev = HOLDING; + UUID holder = null; + List waiters = null; + boolean reset = unlocking != null; + if (member.equals(lock.holder)) { + do { + if (holder != null) { + if (waiters == null) waiters = new ArrayList<>(unlocking.size()); + waiters.add(holder); + } + holder = lock.unlock(); + } while (holder != null && unlocking != null && unlocking.contains(holder)); + } else { + prev = lock.waiters.remove(member) ? WAITING : NONE; + } + if (address.equals(member)) { + onCommit(lock.id, prev, NONE, reset); + } else if (address.equals(holder)) { + onCommit(lock.id, WAITING, HOLDING, reset); + } + if (log.isTraceEnabled()) { + log.trace("[%s] %s unlock %s, prev: %s", address, member, lock.id, prev); + if (holder != null) + log.trace("[%s] %s lock %s, prev: %s, next: %s", address, holder, lock.id, WAITING, HOLDING); + } + if (waiters != null) for (UUID waiter : waiters) { + unbind(waiter, lock); + if (address.equals(waiter)) { + onCommit(lock.id, WAITING, NONE, true); + } + if (log.isTraceEnabled()) { + log.trace("[%s] %s unlock %s, prev: %s", address, waiter, lock.id, WAITING); + } + } + return prev != NONE; + } + + protected void doReset(List members) { + Set prev = new HashSet<>(memberLocks.keySet()); + if (log.isTraceEnabled()) { + log.trace("[%s] reset %s to %s", address, prev, members); + } + for (UUID member : members) prev.remove(member); + for (UUID member : prev) doUnlock(member, prev); + } + + protected LockStatus doQuery(Long lockId, UUID member) { + LockEntry lock = locks.get(lockId); + LockStatus status; + if (lock == null || lock.holder == null) status = NONE; + else if (lock.holder.equals(member)) status = HOLDING; + else status = lock.waiters.contains(member) ? WAITING : NONE; + if (log.isTraceEnabled()) { + log.trace("[%s] %s query %s, status: %s", address, member, lockId, status); + } + return status; + } + + protected void bind(UUID member, LockEntry lock) { + memberLocks.computeIfAbsent(member, k -> new LinkedHashSet<>()).add(lock); + } + + protected void unbind(UUID member, LockEntry lock) { + memberLocks.computeIfPresent(member, (k, v) -> { + v.remove(lock); return v.isEmpty() ? null : v; + }); + } + + protected void onCommit(long lockId, LockStatus prev, LockStatus curr, boolean reset) { + if (prev == curr) return; + // In followers, logs and responses of multiple commands can be included in one batch message, since RAFT is + // below REDIRECT in protocol stack, the commands applying may occur before the responses completing. + if (curr == HOLDING && (prev == WAITING || prev == null && lockStatus.get(lockId) != HOLDING)) { + query(lockId); + } else if (reset) { + LockStatus status = lockStatus.get(lockId); + if (curr == NONE && status != null || curr == HOLDING && status != HOLDING) { + if (log.isTraceEnabled()) { + log.trace("[%s] Suspicious status %s, lockId: %s, expected: %s", address, status, lockId, curr); + } + query(lockId); + } + } + } + + protected void handleView(View next) { + View prev = this.view; this.view = next; + Address leader = raft.leader(); lastLeader = leader; + if (log.isTraceEnabled()) { + log.trace("[%s] View accepted: %s, prev: %s, leader: %s", address, next, prev, leader); + } + + if (prev != null) { + int majority = raft.raft().majority(); + if (prev.size() >= majority && next.size() < majority) { // lost majority + // In partition case if majority is still working, it will be unlocked by reset command. + resign(); + } else if (!next.containsMembers(prev.getMembersRaw()) && raft.isLeader()) { // member left + try { + reset(next); + } catch (Throwable e) { + log.error("Fail to send reset command", e); + } + } + } + } + + protected void resign() { + if (log.isTraceEnabled()) { + log.trace("[%s] resign from %s", address, lockStatus); + } + lockStatus.forEach((k, v) -> notifyListeners(k, NONE)); + } + + protected void reset(View view) { + if (log.isTraceEnabled()) { + log.trace("[%s] Send reset command: %s", address, view); + } + Address[] members = view != null ? view.getMembersRaw() : new Address[0]; + int len = members.length; + var out = new ByteArrayDataOutputStream(6 + len * 16); + out.writeByte(RESET); + writeInt(len, out); + for (Address member : members) { + writeUuid((UUID) member, out); + } + assert out.position() <= 6 + len * 16; + invoke(out).exceptionally(e -> { + log.error("Fail to reset to " + view, e); return null; + }); + } + + protected void query(long lockId) { + var out = new ByteArrayDataOutputStream(26); + out.writeByte(QUERY); + writeLong(lockId, out); + writeUuid(address(), out); + assert out.position() <= 26; + invoke(out).whenComplete((r, e) -> { + if (e != null) log.error("Fail to query on " + lockId, e); + else notifyListeners(lockId, LockStatus.values()[r[0]]); + }); + } + + /** + * Add listener + * @param listener listener for the status change. + * @return true if added, otherwise false. + */ + public boolean addListener(Listener listener) { return listeners.addIfAbsent(listener); } + + /** + * Remove listener + * @param listener listener for removing + * @return true if removed, otherwise false. + */ + public boolean removeListener(Listener listener) { return listeners.remove(listener); } + + /** + * Get this member's lock status from local state. + * @param lockId the lock's id + * @return lock status + */ + public LockStatus lockStatus(long lockId) { + LockStatus v = lockStatus.get(lockId); return v == null ? NONE : v; + } + + /** + * Acquire the lock, will join the waiting queue if the lock is held by another member currently. + * @param lockId the lock's id + * @return HOLDING if hold the lock, WAITING if in the waiting queue. + * @throws RaftException if exception happens during sending or executing commands in the lock service. + */ + public CompletableFuture lock(long lockId) { + var out = new ByteArrayDataOutputStream(26); + out.writeByte(LOCK); + writeLong(lockId, out); + writeUuid(address(), out); + assert out.position() <= 26; + return invoke(out).thenApply(t -> notifyListeners(lockId, LockStatus.values()[t[0]])); + } + + /** + * Try to acquire the lock, won't join the waiting queue. + * @param lockId the lock's id + * @return HOLDING if hold the lock, NONE if the lock is held by another member. + * @throws RaftException if exception happens during sending or executing commands in the lock service. + */ + public CompletableFuture tryLock(long lockId) { + var out = new ByteArrayDataOutputStream(26); + out.writeByte(TRY_LOCK); + writeLong(lockId, out); + writeUuid(address(), out); + assert out.position() <= 26; + return invoke(out).thenApply(t -> notifyListeners(lockId, LockStatus.values()[t[0]])); + } + + /** + * Release the lock if it's the holder, and take next waiting member from the queue to be the new holder if there + * is one. Remove from waiting queue if it's waiting. Do nothing if neither of them. + * @param lockId the lock's id + * @return async completion + * @throws RaftException if exception happens during sending or executing commands in the lock service. + */ + public CompletableFuture unlock(long lockId) { + var out = new ByteArrayDataOutputStream(26); + out.writeByte(UNLOCK); + writeLong(lockId, out); + writeUuid(address(), out); + assert out.position() <= 26; + return invoke(out).thenApply(t -> { notifyListeners(lockId, NONE); return null; }); + } + + /** + * Release all related locks for this member. + * @return async completion + * @throws RaftException if exception happens during sending or executing commands in the lock service. + */ + public CompletableFuture unlockAll() { + var out = new ByteArrayDataOutputStream(17); + out.writeByte(UNLOCK_ALL); + writeUuid(address(), out); + assert out.position() <= 17; + return invoke(out).thenApply(t -> { resign(); return null; }); + } + + protected UUID address() { + return Objects.requireNonNull(address); + } + + protected CompletableFuture invoke(ByteArrayDataOutputStream out) { + try { + return raft.setAsync(out.buffer(), 0, out.position()); + } catch (Throwable e) { + throw new RaftException("Fail to execute command", e); + } + } + + protected LockStatus notifyListeners(long lockId, LockStatus curr) { + LockStatus prev = curr == NONE ? lockStatus.remove(lockId) : lockStatus.put(lockId, curr); + if (prev == null) prev = NONE; + if (prev == curr) return curr; + Mutex mutex = mutexes.get(lockId); + if (mutex != null) mutex.onStatusChange(prev, curr); + for (Listener listener : listeners) { + try { + listener.onStatusChange(lockId, prev, curr); + } catch (Throwable e) { + log.error("Fail to notify listener, lock: %s, prev: %s, curr: %s", lockId, prev, curr, e); + } + } + return curr; + } + + /** + * Get the mutex for the specified id. + * @param lockId the id related to the mutex + * @return mutex instance + */ + public Mutex mutex(long lockId) { + return mutexes.computeIfAbsent(lockId, Mutex::new); + } + + /** + * The member's lock status + */ + public enum LockStatus { + HOLDING, WAITING, NONE + } + + /** + * Listen on the lock status changes + */ + public interface Listener { + void onStatusChange(long lockId, LockStatus prev, LockStatus curr); + } + + /** + * Exception for the raft cluster errors + */ + public static class RaftException extends RuntimeException { + public RaftException(String message) { super(message); } + public RaftException(Throwable cause) { super(cause); } + public RaftException(String message, Throwable cause) { super(message, cause); } + } + + /** + * A distributed lock that backed on the lock service. + */ + public class Mutex implements Lock { + private final long lockId; + private volatile LockStatus status = NONE; + private volatile Thread holder; + private final AtomicInteger acquirers = new AtomicInteger(); + private final ReentrantLock delegate = new ReentrantLock(); + private final Condition notWaiting = delegate.newCondition(); + private Consumer lockHandler, unlockHandler; + private long timeout = 8000; + + Mutex(long lockId) {this.lockId = lockId;} + + /** + * Set the timeout for the command executing in the lock service. + * @param timeout in milliseconds + */ + public void setTimeout(long timeout) {this.timeout = timeout;} + + /** + * The lock status in the lock service + * @return lock status of the lockId + */ + public LockStatus getStatus() {return status;} + + /** + * The current holder of this mutex + * @return the thread which holding this mutex + */ + public Thread getHolder() {return holder;} + + /** + * Register a handler for the unexpected unlocking in the lock service. + * @param handler callback with this mutex + */ + public void setUnexpectedUnlockHandler(Consumer handler) {unlockHandler = handler;} + + /** + * Register a handler for the unexpected locking in the lock service. + * @param handler callback with this mutex + */ + public void setUnexpectedLockHandler(Consumer handler) {lockHandler = handler;} + + /** + * Get the lock service + * @return the underlying lock service + */ + public LockService service() {return LockService.this;} + + /** + * @throws RaftException if exception happens during sending or executing commands in the lock service. + */ + @Override + public void lock() { + delegate.lock(); + acquirers.incrementAndGet(); + while (status != HOLDING) { + try { + if (status == WAITING) notWaiting.awaitUninterruptibly(); + else status = join(LockService.this.lock(lockId)); + } catch (Throwable e) { + rethrow(unlock(e)); + } + } + holder = Thread.currentThread(); + } + + /** + * @throws RaftException if exception happens during sending or executing commands in the lock service. + */ + @Override + public void lockInterruptibly() throws InterruptedException { + delegate.lockInterruptibly(); + acquirers.incrementAndGet(); + while (status != HOLDING) { + try { + if (status == WAITING) notWaiting.await(); + else status = join(LockService.this.lock(lockId)); + } catch (InterruptedException e) { + throw unlock(e); + } catch (Throwable e) { + rethrow(unlock(e)); + } + } + holder = Thread.currentThread(); + } + + /** + * @throws RaftException if exception happens during sending or executing commands in the lock service. + */ + @Override + public boolean tryLock() { + if (!delegate.tryLock()) return false; + acquirers.incrementAndGet(); + if (status == NONE) { + try { + status = join(LockService.this.tryLock(lockId)); + } catch (Throwable ignored) { + } + } + if (status == HOLDING) { + holder = Thread.currentThread(); return true; + } + unlock(); return false; + } + + /** + * @throws RaftException if exception happens during sending or executing commands in the lock service. + */ + @Override + public boolean tryLock(long timeout, TimeUnit unit) throws InterruptedException { + long deadline = System.nanoTime() + unit.toNanos(timeout), ns; + if (!delegate.tryLock(timeout, unit)) return false; + acquirers.incrementAndGet(); + while (status != HOLDING && (ns = deadline - System.nanoTime()) > 0) { + try { + if (status == WAITING) notWaiting.awaitNanos(ns); + else status = join(LockService.this.lock(lockId)); + } catch (InterruptedException e) { + throw unlock(e); + } catch (Throwable e) { + rethrow(unlock(e)); + } + } + if (status == HOLDING) { + holder = Thread.currentThread(); return true; + } + unlock(); return false; + } + + /** + * @throws RaftException if exception happens during sending or executing commands in the lock service. + */ + @Override + public void unlock() { + if (!delegate.isHeldByCurrentThread()) return; + assert holder == null || holder == Thread.currentThread(); + if (delegate.getHoldCount() == 1) holder = null; + try { + if (acquirers.decrementAndGet() == 0 && status != NONE) { + join(LockService.this.unlock(lockId)); + status = NONE; + } + } catch (Throwable e) { + rethrow(e); + } finally { + delegate.unlock(); + } + } + + /** + * Unsupported + */ + @Override + public Condition newCondition() { + throw new UnsupportedOperationException(); + } + + private T unlock(T error) { + try { + unlock(); + } catch (Throwable e) { + error.addSuppressed(e); + } + return error; + } + + private T join(CompletableFuture future) throws ExecutionException, TimeoutException { + long nanos = TimeUnit.MILLISECONDS.toNanos(timeout), deadline = System.nanoTime() + nanos; + boolean interrupted = Thread.interrupted(); + try { + do { + try { + return future.get(nanos, TimeUnit.NANOSECONDS); + } catch (InterruptedException e) { + interrupted = true; + } + } while ((nanos = deadline - System.nanoTime()) > 0); + throw new TimeoutException(); + } finally { + if (interrupted) Thread.currentThread().interrupt(); + } + } + + void onStatusChange(LockStatus prev, LockStatus curr) { + if (curr != HOLDING && holder != null) { + status = curr; + var handler = unlockHandler; + if (handler != null) try { + handler.accept(this); + } catch (Throwable e) { + log.error("Error occurred on unlock handler", e); + } + } else if (curr != NONE && acquirers.get() == 0) { + status = curr; + var handler = lockHandler; + if (handler != null) try { + handler.accept(this); + } catch (Throwable e) { + log.error("Error occurred on lock handler", e); + } + } else if (prev == WAITING && acquirers.get() > 0) { + delegate.lock(); + try { + if (status == WAITING) { + status = curr; + notWaiting.signalAll(); + } + } finally { + delegate.unlock(); + } + } + } + } + + private static T rethrow(Throwable e) { + if (e instanceof RaftException) throw (RaftException) e; + if (e instanceof CompletionException) { + Throwable cause = e.getCause(); + throw cause != null ? new RaftException(e) : (CompletionException) e; + } + if (e instanceof ExecutionException) throw new RaftException(e.getCause()); + if (e instanceof TimeoutException) throw new RaftException("Execute command timeout", e); + throw new RaftException("Unknown exception", e); + } + + private static void writeInt(int value, DataOutput out) { + try { + for (; (value & ~0x7F) != 0; value >>>= 7) { + out.writeByte(0x80 | (value & 0x7F)); + } + out.writeByte(value); + } catch (IOException e) { + throw new RaftException("Fail to write", e); + } + } + + private static int readInt(DataInput in) { + try { + int v = in.readByte(); if (v >= 0) return v; + if ((v ^= in.readByte() << 7) < 0) return v ^ 0xFFFFFF80; + if ((v ^= in.readByte() << 14) >= 0) return v ^ 0x00003F80; + if ((v ^= in.readByte() << 21) < 0) return v ^ 0xFFE03F80; + return v ^ in.readByte() << 28 ^ 0x0FE03F80; + } catch (IOException e) { + throw new RaftException("Fail to read", e); + } + } + + private static void writeLong(long value, DataOutput out) { + try { + for (int i = 0; i < 8 && (value & ~0x7FL) != 0; i++) { + out.writeByte(0x80 | ((int) value & 0x7F)); + value >>>= 7; + } + out.writeByte((int) value); + } catch (IOException e) { + throw new RaftException("Fail to write", e); + } + } + + private static long readLong(DataInput in) { + try { + long v = in.readByte(); if (v >= 0) return v; + if ((v ^= (long) in.readByte() << 7) < 0L) return v ^ 0xFFFFFFFFFFFFFF80L; + if ((v ^= (long) in.readByte() << 14) >= 0L) return v ^ 0x0000000000003F80L; + if ((v ^= (long) in.readByte() << 21) < 0L) return v ^ 0xFFFFFFFFFFE03F80L; + if ((v ^= (long) in.readByte() << 28) >= 0L) return v ^ 0x000000000FE03F80L; + if ((v ^= (long) in.readByte() << 35) < 0L) return v ^ 0xFFFFFFF80FE03F80L; + if ((v ^= (long) in.readByte() << 42) >= 0L) return v ^ 0x000003F80FE03F80L; + if ((v ^= (long) in.readByte() << 49) < 0L) return v ^ 0xFFFE03F80FE03F80L; + return v ^ (long) in.readByte() << 56 ^ 0x00FE03F80FE03F80L; + } catch (IOException e) { + throw new RaftException("Fail to read", e); + } + } + + private static void writeUuid(UUID id, DataOutput out) { + try { + out.writeLong(id.getMostSignificantBits()); + out.writeLong(id.getLeastSignificantBits()); + } catch (IOException e) { + throw new RaftException("Fail to write", e); + } + } + + private static UUID readUuid(DataInput in) { + try { + return new UUID(in.readLong(), in.readLong()); + } catch (IOException e) { + throw new RaftException("Fail to read", e); + } + } +} diff --git a/tests/junit-functional/org/jgroups/tests/blocks/LockServiceTest.java b/tests/junit-functional/org/jgroups/tests/blocks/LockServiceTest.java new file mode 100644 index 00000000..e0793750 --- /dev/null +++ b/tests/junit-functional/org/jgroups/tests/blocks/LockServiceTest.java @@ -0,0 +1,785 @@ +package org.jgroups.tests.blocks; + +import static java.util.Arrays.stream; +import static java.util.concurrent.TimeUnit.MILLISECONDS; +import static java.util.concurrent.TimeUnit.NANOSECONDS; +import static java.util.concurrent.TimeUnit.SECONDS; +import static java.util.stream.Collectors.groupingBy; +import static java.util.stream.Collectors.mapping; +import static java.util.stream.Collectors.toList; +import static java.util.stream.Collectors.toMap; +import static java.util.stream.Collectors.toSet; +import static java.util.stream.Stream.concat; +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; +import static org.assertj.core.data.Offset.offset; +import static org.jgroups.raft.blocks.LockService.LockStatus.HOLDING; +import static org.jgroups.raft.blocks.LockService.LockStatus.NONE; +import static org.jgroups.raft.blocks.LockService.LockStatus.WAITING; +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertFalse; +import static org.testng.Assert.assertNull; +import static org.testng.Assert.assertSame; +import static org.testng.Assert.assertThrows; +import static org.testng.Assert.assertTrue; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.Callable; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.LockSupport; +import java.util.stream.Collectors; +import java.util.stream.IntStream; +import java.util.stream.Stream; + +import org.assertj.core.data.Offset; +import org.jgroups.Address; +import org.jgroups.Global; +import org.jgroups.JChannel; +import org.jgroups.View; +import org.jgroups.protocols.pbcast.GMS; +import org.jgroups.protocols.raft.RAFT; +import org.jgroups.raft.Options; +import org.jgroups.raft.RaftHandle; +import org.jgroups.raft.StateMachine; +import org.jgroups.raft.blocks.LockService; +import org.jgroups.raft.blocks.LockService.LockStatus; +import org.jgroups.raft.blocks.LockService.Mutex; +import org.jgroups.tests.harness.BaseRaftChannelTest; +import org.jgroups.tests.harness.BaseRaftElectionTest; +import org.jgroups.util.UUID; +import org.jgroups.util.Util; +import org.testng.annotations.Test; + +/** + * @author Zhang Yifei + */ +@Test(groups = Global.FUNCTIONAL, singleThreaded = true) +public class LockServiceTest extends BaseRaftChannelTest { + + protected Service service_a, service_b, service_c, service_d, service_e; + protected Events events_a, events_b, events_c, events_d, events_e; + + { + clusterSize = 5; + recreatePerMethod = true; + } + + protected static class Event { + final long key; final LockStatus prev, curr; + Event(long key, LockStatus prev, LockStatus curr) { this.key = key; this.prev = prev; this.curr = curr; } + + void assertEq(long key, LockStatus prev, LockStatus curr) { + assertThat(this).usingRecursiveComparison().isEqualTo(new Event(key, prev, curr)); + } + } + + protected static class Batch { + final List events; + Batch(List events) {this.events = events;} + + Batch assertContains(long key, LockStatus prev, LockStatus curr) { + assertThat(events).usingRecursiveFieldByFieldElementComparator().contains(new Event(key, prev, curr)); + return this; + } + } + + protected static class Events implements LockService.Listener { + final BlockingQueue queue = new LinkedBlockingQueue<>(); + + @Override + public void onStatusChange(long key, LockStatus prev, LockStatus curr) { + queue.offer(new Event(key, prev, curr)); + } + + protected void assertEmpty() { assertThat(queue).isEmpty(); } + protected Event next(int secs) throws InterruptedException { return queue.poll(secs, SECONDS); } + protected Event next() throws InterruptedException { return next(5); } + protected Batch batch(int count) throws InterruptedException { + List list = new ArrayList<>(count); + for (int i = 0; i < count; i++) list.add(next()); + return new Batch(list); + } + } + + protected static class Service extends LockService { + TestRaft raft; + + static { +// log.setLevel("trace"); + } + + public Service(JChannel channel) { super(channel); } + + @Override + protected RaftHandle createRaft(JChannel ch, StateMachine sm) { return raft = new TestRaft(ch, sm); } + + Map> dumpState() { + assert locks.values().stream().filter(t -> t.holder != null) + .flatMap(t -> concat(Stream.of(t.holder), t.waiters.stream()).map(m -> Map.entry(m, t))) + .collect(groupingBy(Map.Entry::getKey, mapping(Map.Entry::getValue, toSet()))).equals(memberLocks); + return locks.values().stream().filter(t -> t.holder != null) + .collect(Collectors.toMap(t -> t.id, t -> { + List list = new ArrayList<>(t.waiters.size() + 1); + list.add(t.holder); list.addAll(t.waiters); return list; + })); + } + } + + protected static class TestRaft extends RaftHandle { + Callable> interceptor; + + public TestRaft(JChannel ch, StateMachine sm) { super(ch, sm); } + + @Override + public CompletableFuture setAsync(byte[] buf, int offset, int length, Options options) throws Exception { + if (interceptor != null) return interceptor.call(); + return super.setAsync(buf, offset, length, options); + } + + void throwingInterceptor(Exception e) { interceptor = () -> { throw e; }; } + void errorInterceptor(Throwable e) { interceptor = () -> CompletableFuture.failedFuture(e); } + void voidInterceptor() { interceptor = CompletableFuture::new; } + void removeInterceptor() { interceptor = null; } + } + + @Override + protected void beforeChannelConnection(JChannel ch) { + switch (ch.name()) { + case "A": service_a = new Service(ch); break; + case "B": service_b = new Service(ch); break; + case "C": service_c = new Service(ch); break; + case "D": service_d = new Service(ch); break; + case "E": service_e = new Service(ch); break; + } + } + + @Override + protected void afterClusterCreation() { + RAFT[] rafts = stream(channels()).map(this::raft).toArray(RAFT[]::new); + BaseRaftElectionTest.waitUntilAllHaveLeaderElected(rafts, 15_000); + } + + protected void enableEvents() { + service_a.addListener(events_a = new Events()); + service_b.addListener(events_b = new Events()); + service_c.addListener(events_c = new Events()); + service_d.addListener(events_d = new Events()); + service_e.addListener(events_e = new Events()); + } + + public void lock() throws Exception { + enableEvents(); + + // lock 101 + assertEquals(service_a.lock(101L).get(3, SECONDS), HOLDING); + events_a.next().assertEq(101L, NONE, HOLDING); + assertEquals(service_a.lockStatus(101L), HOLDING); + + assertEquals(service_b.lock(101L).get(3, SECONDS), WAITING); + events_b.next().assertEq(101L, NONE, WAITING); + assertEquals(service_b.lockStatus(101L), WAITING); + + assertEquals(service_c.lock(101L).get(3, SECONDS), WAITING); + events_c.next().assertEq(101L, NONE, WAITING); + assertEquals(service_c.lockStatus(101L), WAITING); + + // lock 102 + assertEquals(service_b.lock(102L).get(3, SECONDS), HOLDING); + events_b.next().assertEq(102L, NONE, HOLDING); + assertEquals(service_b.lockStatus(102L), HOLDING); + + assertEquals(service_a.lock(102L).get(3, SECONDS), WAITING); + events_a.next().assertEq(102L, NONE, WAITING); + assertEquals(service_a.lockStatus(102L), WAITING); + + assertEquals(service_c.lock(102L).get(3, SECONDS), WAITING); + events_c.next().assertEq(102L, NONE, WAITING); + assertEquals(service_c.lockStatus(102L), WAITING); + + // lock 103 + assertEquals(service_c.lock(103L).get(3, SECONDS), HOLDING); + events_c.next().assertEq(103L, NONE, HOLDING); + assertEquals(service_c.lockStatus(103L), HOLDING); + + assertEquals(service_a.lock(103L).get(3, SECONDS), WAITING); + events_a.next().assertEq(103L, NONE, WAITING); + assertEquals(service_a.lockStatus(103L), WAITING); + + assertEquals(service_b.lock(103L).get(3, SECONDS), WAITING); + events_b.next().assertEq(103L, NONE, WAITING); + assertEquals(service_b.lockStatus(103L), WAITING); + + // unlock 101 + service_a.unlock(101L).get(3, SECONDS); + + events_a.next().assertEq(101L, HOLDING, NONE); + assertEquals(service_a.lockStatus(101L), NONE); + + events_b.next().assertEq(101L, WAITING, HOLDING); + assertEquals(service_b.lockStatus(101L), HOLDING); + + // unlock 102 + service_b.unlock(102L).get(3, SECONDS); + + events_b.next().assertEq(102L, HOLDING, NONE); + assertEquals(service_b.lockStatus(102L), NONE); + + events_a.next().assertEq(102L, WAITING, HOLDING); + assertEquals(service_a.lockStatus(102L), HOLDING); + + // unlock 103 + service_c.unlock(103L).get(3, SECONDS); + + events_c.next().assertEq(103L, HOLDING, NONE); + assertEquals(service_c.lockStatus(103L), NONE); + assertEquals(service_c.lockStatus(102L), WAITING); + + events_a.next().assertEq(103L, WAITING, HOLDING); + assertEquals(service_a.lockStatus(103L), HOLDING); + + // unlock a + service_a.unlockAll().get(3, SECONDS); + + events_a.next().assertEq(102L, HOLDING, NONE); + assertEquals(service_a.lockStatus(102L), NONE); + + events_a.next().assertEq(103L, HOLDING, NONE); + assertEquals(service_a.lockStatus(103L), NONE); + + events_c.next().assertEq(102L, WAITING, HOLDING); + assertEquals(service_c.lockStatus(102L), HOLDING); + + events_b.next().assertEq(103L, WAITING, HOLDING); + assertEquals(service_b.lockStatus(103L), HOLDING); + + // unlock b + service_b.unlockAll().get(3, SECONDS); + + events_b.next().assertEq(101L, HOLDING, NONE); + assertEquals(service_b.lockStatus(101L), NONE); + + events_b.next().assertEq(103L, HOLDING, NONE); + assertEquals(service_b.lockStatus(103L), NONE); + + events_c.next().assertEq(101L, WAITING, HOLDING); + assertEquals(service_c.lockStatus(101L), HOLDING); + + // unlock c + service_c.unlockAll().get(3, SECONDS); + + events_c.next().assertEq(101L, HOLDING, NONE); + assertEquals(service_c.lockStatus(101L), NONE); + + events_c.next().assertEq(102L, HOLDING, NONE); + assertEquals(service_c.lockStatus(102L), NONE); + } + + public void tryLock() throws Exception { + enableEvents(); + + assertEquals(service_a.tryLock(101L).get(3, SECONDS), HOLDING); + events_a.next().assertEq(101L, NONE, HOLDING); + assertEquals(service_a.lockStatus(101L), HOLDING); + assertEquals(service_a.tryLock(101L).get(3, SECONDS), HOLDING); + + assertEquals(service_b.tryLock(101L).get(3, SECONDS), NONE); + assertNull(events_b.next(1)); + assertEquals(service_b.lockStatus(101L), NONE); + + assertEquals(service_c.lock(101L).get(3, SECONDS), WAITING); + events_c.next().assertEq(101L, NONE, WAITING); + assertEquals(service_c.lockStatus(101L), WAITING); + assertEquals(service_c.tryLock(101L).get(3, SECONDS), WAITING); + + service_a.unlock(101L); + events_a.next().assertEq(101L, HOLDING, NONE); + events_c.next().assertEq(101L, WAITING, HOLDING); + service_c.unlockAll(); + events_c.next().assertEq(101L, HOLDING, NONE); + assertNull(events_b.next(1)); + } + + public void reset_by_disconnect() throws Exception { + enableEvents(); + + assertEquals(service_a.lock(101L).get(3, SECONDS), HOLDING); + events_a.next().assertEq(101L, NONE, HOLDING); + assertEquals(service_b.lock(101L).get(3, SECONDS), WAITING); + events_b.next().assertEq(101L, NONE, WAITING); + assertEquals(service_c.lock(101L).get(3, SECONDS), WAITING); + events_c.next().assertEq(101L, NONE, WAITING); + assertEquals(service_d.lock(101L).get(3, SECONDS), WAITING); + events_d.next().assertEq(101L, NONE, WAITING); + assertEquals(service_e.lock(101L).get(3, SECONDS), WAITING); + events_e.next().assertEq(101L, NONE, WAITING); + + // Disconnect the coordinator/leader/holder + channel(0).disconnect(); // [B,C,D,E] + // Resigned because of disconnection + events_a.next().assertEq(101L, HOLDING, NONE); + // Reset to [B,C,D,E], notified by reset command. + events_b.next().assertEq(101L, WAITING, HOLDING); + + // Disconnect a participant + channel(2).disconnect(); // [B,D,E] + // Resigned because of disconnection + events_c.next().assertEq(101L, WAITING, NONE); + // Reset to [B,D,E], D is next waiter. + service_b.unlock(101L); + events_b.next().assertEq(101L, HOLDING, NONE); + events_d.next().assertEq(101L, WAITING, HOLDING); + + // Disconnect the holder and lost majority + channel(3).disconnect(); // [B,E] + // Resigned because of disconnection + events_d.next().assertEq(101L, HOLDING, NONE); + // Resigned because of lost majority + events_e.next().assertEq(101L, WAITING, NONE); + + // Reconnect the previous holder and reach majority. + service_d = reconnect(channel(3), events_d); // [B,E,D] + waitUntilLeaderElected(1, 3, 4); + // Reset to empty, there is no holder or waiter. + assertEquals(service_b.lock(101L).get(3, SECONDS), HOLDING); + events_b.next().assertEq(101L, NONE, HOLDING); + assertEquals(service_d.lock(101L).get(3, SECONDS), WAITING); + events_d.next().assertEq(101L, NONE, WAITING); + assertEquals(service_e.lockStatus(101L), NONE); + events_e.assertEmpty(); + + // Disconnect to lost majority + channel(4).disconnect(); // [B,D] + // Resigned because of lost majority + events_b.next().assertEq(101L, HOLDING, NONE); + events_d.next().assertEq(101L, WAITING, NONE); + + // Reconnect to reach majority + service_e = reconnect(channel(4), events_e); // [B,D,E] + waitUntilLeaderElected(1, 3, 4); + // Reset to empty, there is no holder or waiter. + assertEquals(service_e.lock(101L).get(3, SECONDS), HOLDING); + events_e.next().assertEq(101L, NONE, HOLDING); + assertEquals(service_b.lockStatus(101L), NONE); + events_b.assertEmpty(); + assertEquals(service_d.lockStatus(101L), NONE); + events_d.assertEmpty(); + } + + public void reset_by_partition() throws Exception { + enableEvents(); + + assertEquals(service_d.lock(101L).get(3, SECONDS), HOLDING); + events_d.next().assertEq(101L, NONE, HOLDING); + assertEquals(service_e.lock(102L).get(3, SECONDS), HOLDING); + events_e.next().assertEq(102L, NONE, HOLDING); + + assertEquals(service_a.lock(101L).get(3, SECONDS), WAITING); + assertEquals(service_a.lock(102L).get(3, SECONDS), WAITING); + events_a.next().assertEq(101L, NONE, WAITING); + events_a.next().assertEq(102L, NONE, WAITING); + + // Partition into a majority subgroup and a minority subgroup + // Put the minority subgroup first, otherwise it will receive the AppendEntriesRequest of reset command. + partition(new int[]{3, 4}, new int[]{0, 1, 2}); + + // Resigned because of lost majority + events_d.next().assertEq(101L, HOLDING, NONE); + events_e.next().assertEq(102L, HOLDING, NONE); + + // Reset to [A,B,C], notified by reset command. + events_a.batch(2).assertContains(101L, WAITING, HOLDING).assertContains(102L, WAITING, HOLDING); + + merge(0, 3); + waitUntilLeaderElected(0, 1, 2, 3, 4); + assertTrue(raft(0).isLeader()); // A has longer log + + // Reset to [A,B,C], holder is A all the time + assertEquals(service_b.lock(101L).get(3, SECONDS), WAITING); + events_b.next().assertEq(101L, NONE, WAITING); + assertEquals(service_c.lock(102L).get(3, SECONDS), WAITING); + events_c.next().assertEq(102L, NONE, WAITING); + assertEquals(service_a.lockStatus(101L), HOLDING); + assertEquals(service_a.lockStatus(102L), HOLDING); + assertEquals(service_d.lockStatus(101L), NONE); events_d.assertEmpty(); + assertEquals(service_e.lockStatus(102L), NONE); events_e.assertEmpty(); + + // Partition into subgroups without majority + partition(new int[]{0, 1}, new int[]{2}, new int[]{3, 4}); + + // Resigned because of lost majority + events_a.batch(2).assertContains(101L, HOLDING, NONE).assertContains(102L, HOLDING, NONE); + events_b.next().assertEq(101L, WAITING, NONE); + events_c.next().assertEq(102L, WAITING, NONE); + + merge(0, 2, 3); + waitUntilLeaderElected(0, 1, 2, 3, 4); + + // Reset to empty, there is no holder after reset + assertEquals(service_d.lock(101L).get(3, SECONDS), HOLDING); + events_d.next().assertEq(101L, NONE, HOLDING); + assertEquals(service_e.lock(101L).get(3, SECONDS), WAITING); + events_e.next().assertEq(101L, NONE, WAITING); + assertEquals(service_a.lockStatus(101L), NONE); + assertEquals(service_a.lockStatus(102L), NONE); events_a.assertEmpty(); + assertEquals(service_b.lockStatus(101L), NONE); events_b.assertEmpty(); + assertEquals(service_c.lockStatus(102L), NONE); events_c.assertEmpty(); + + channel(3).disconnect(); + + // Resigned because of disconnection + events_d.next().assertEq(101L, HOLDING, NONE); + + // Reset to [B,C,D,E], notified by reset command. + events_e.next().assertEq(101L, WAITING, HOLDING); + } + + public void snapshot() throws Exception { + enableEvents(); + + assertEquals(service_a.lock(101L).get(3, SECONDS), HOLDING); + events_a.next().assertEq(101L, NONE, HOLDING); + assertEquals(service_b.lock(101L).get(3, SECONDS), WAITING); + events_b.next().assertEq(101L, NONE, WAITING); + + partition(new int[]{1}, new int[]{0, 2, 3, 4}); + + events_b.next().assertEq(101, WAITING, NONE); + + service_a.unlock(101).get(3, SECONDS); + assertEquals(service_c.lock(101).get(3, SECONDS), HOLDING); + assertEquals(service_d.lock(101).get(3, SECONDS), WAITING); + service_c.unlock(101).get(3, SECONDS); + service_d.unlock(101).get(3, SECONDS); + + List services = List.of(service_a, service_c, service_d, service_e); + for (int i = 0; i < 100; i++) { + long key = -(i + 1); + for (int t = 0, len = services.size(); t < len; t++) { + services.get((i + t) % len).lock(key).get(3, SECONDS); + } + services.get(i % services.size()).unlock(key).get(3, SECONDS); + } + + service_c.unlockAll(); + + leader().snapshotAsync().get(3, SECONDS); + + merge(0, 1); + + waitUntilNodesApplyAllLogs(); + assertTrue(events_b.queue.isEmpty()); + + Map> state = service_a.dumpState(); + assertEquals(service_b.dumpState(), state); + assertEquals(service_c.dumpState(), state); + assertEquals(service_d.dumpState(), state); + assertEquals(service_e.dumpState(), state); + } + + public void mutex_atomicity() throws Exception { + Lock a = service_a.mutex(101); + Lock b = service_b.mutex(101); + Lock c = service_c.mutex(101); + Lock d = service_d.mutex(101); + Lock e = service_e.mutex(101); + + class MutableInt { + int value; + } + MutableInt count = new MutableInt(); + List threads = Stream.of(a, b, c, d, e).flatMap(t -> Stream.of(t, t)).map(t -> new Thread(() -> { + for (int i = 0; i < 100; i++) { + t.lock(); + try { + int v = count.value; + LockSupport.parkNanos(10); + count.value = v + 1; + } finally { + t.unlock(); + } + } + })).collect(toList()); + + threads.forEach(Thread::start); + for (Thread t : threads) t.join(); + + assertEquals(count.value, 1000); + } + + public void mutex_interruption() throws InterruptedException { + Mutex a = service_a.mutex(101); + Mutex b = service_b.mutex(101); + a.lock(); + List> list; + try { + CompletableFuture.runAsync(() -> { + interruptAfter(1); + assertThrows(InterruptedException.class, a::lockInterruptibly); + + interruptAfter(1); + assertThrows(InterruptedException.class, b::lockInterruptibly); + + interruptAfter(1); + assertThrows(InterruptedException.class, () -> a.tryLock(30, SECONDS)); + + interruptAfter(1); + assertThrows(InterruptedException.class, () -> b.tryLock(30, SECONDS)); + }).join(); + + BlockingQueue> interrupted = new LinkedBlockingQueue<>(); + list = Stream.of(a, b).map(t -> CompletableFuture.runAsync(() -> { + interrupted.add(interruptAfter(1)); + t.lock(); + try { + assertTrue(Thread.currentThread().isInterrupted()); + } finally { + t.unlock(); + } + assertTrue(Thread.currentThread().isInterrupted()); + })).collect(toList()); + for (int i = 0, l = list.size(); i < l; i++) interrupted.take().join(); + } finally { + a.unlock(); + } + list.forEach(CompletableFuture::join); + } + + public void mutex_timeout() { + Mutex a = service_a.mutex(101); + Mutex b = service_b.mutex(101); + a.lock(); + try { + CompletableFuture.runAsync(() -> { + try { + long timeout = SECONDS.toNanos(1); + Offset error = offset(MILLISECONDS.toNanos(100)); + + long begin = System.nanoTime(); + assertFalse(a.tryLock(timeout, NANOSECONDS)); + assertThat(System.nanoTime() - begin).isCloseTo(timeout, error); + + begin = System.nanoTime(); + assertFalse(b.tryLock(timeout, NANOSECONDS)); + assertThat(System.nanoTime() - begin).isCloseTo(timeout, error); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + }).join(); + } finally { + a.unlock(); + } + } + + public void mutex_race() { + Mutex a = service_a.mutex(101); + Mutex b = service_b.mutex(101); + Mutex c = service_c.mutex(101); + + List.of(Stream.of(a, b), Stream.of(b, c)).forEach(stream -> { + stream.map(t -> CompletableFuture.runAsync(() -> { + for (int i = 0; i < 3000; i++) { + t.lock(); t.unlock(); + } + })).collect(toList()).forEach(t -> { + try { + t.get(10, SECONDS); + } catch (Exception e) { + throw new RuntimeException(e); + } + }); + }); + } + + public void mutex_reentrant() throws InterruptedException { + Mutex a = service_a.mutex(101); + Mutex b = service_b.mutex(101); + + for (int i = 0; i < 10; i++) { + b.lock(); + assertEquals(b.getHolder(), Thread.currentThread()); + } + + for (int i = 0; i < 10; i++) { + assertTrue(b.tryLock()); + assertEquals(b.getHolder(), Thread.currentThread()); + } + + for (int i = 0; i < 10; i++) { + assertTrue(b.tryLock(1, SECONDS)); + assertEquals(b.getHolder(), Thread.currentThread()); + } + + for (int i = 0; i < 30; i++) { + assertEquals(b.getHolder(), Thread.currentThread()); + assertFalse(CompletableFuture.supplyAsync(b::tryLock).join()); + assertFalse(a.tryLock()); + b.unlock(); + } + + assertNull(b.getHolder()); + assertTrue(b.tryLock()); + assertEquals(b.getHolder(), Thread.currentThread()); + b.unlock(); + assertNull(b.getHolder()); + } + + public void mutex_inconsistency() throws Exception { + Mutex a = service_a.mutex(101); + Mutex b = service_b.mutex(101); + + for (Mutex mutex : List.of(a, b)) { + CompletableFuture unlocked = new CompletableFuture<>(); + CompletableFuture locked = new CompletableFuture<>(); + mutex.setUnexpectedUnlockHandler(unlocked::complete); + mutex.setUnexpectedLockHandler(locked::complete); + + mutex.lock(); + try { + // Unexpected unlock + mutex.service().unlock(101).join(); + // Callback + assertSame(unlocked.get(3, SECONDS), mutex); + // Inconsistent state + assertSame(mutex.getHolder(), Thread.currentThread()); + assertEquals(mutex.getStatus(), NONE); + } finally { + mutex.unlock(); + } + + // Unexpected lock + mutex.service().lock(101).join(); + // Callback + assertSame(locked.get(3, SECONDS), mutex); + // Inconsistent state + assertEquals(mutex.getStatus(), HOLDING); + // Fix it with mutex instance + if (mutex.tryLock()) mutex.unlock(); + assertEquals(mutex.getStatus(), NONE); + } + + // Unexpected unlock for waiting status, it will retry to lock instead of calling the handler. + a.lock(); CompletableFuture f; + try { + // lock async + f = CompletableFuture.runAsync(() -> { + b.lock(); + try { + assertEquals(service_b.lockStatus(101), HOLDING); + } finally { + b.unlock(); + } + }); + // Make sure the thread is blocked in WAITING status + Util.waitUntil(5000, 1000, () -> service_b.lockStatus(101) == WAITING); + // Successfully unlock with lock service + service_b.unlock(101).get(3, SECONDS); + // The thread is awakened by unlocking, and lock again. + Util.waitUntil(5000, 1000, () -> service_b.lockStatus(101) == WAITING); + } finally { + a.unlock(); + } + f.get(5, SECONDS); // check error + } + + public void mutex_exception() { + Mutex a = service_a.mutex(101); + + service_a.raft.throwingInterceptor(new Exception("thrown error")); + assertThatThrownBy(a::lock).isInstanceOf(LockService.RaftException.class) + .cause().isInstanceOf(Exception.class).hasMessage("thrown error"); + + service_a.raft.errorInterceptor(new Exception("returned error")); + assertThatThrownBy(a::lock).isInstanceOf(LockService.RaftException.class) + .cause().isInstanceOf(Exception.class).hasMessage("returned error"); + + service_a.raft.voidInterceptor(); + a.setTimeout(1000); + assertThatThrownBy(a::lock).isInstanceOf(LockService.RaftException.class) + .cause().isInstanceOf(TimeoutException.class); + + service_a.raft.removeInterceptor(); + a.lock(); + try { + service_a.raft.throwingInterceptor(new Exception("thrown error")); + } finally { + assertThatThrownBy(a::unlock).isInstanceOf(LockService.RaftException.class) + .cause().isInstanceOf(Exception.class).hasMessage("thrown error"); + } + + a.lock(); + service_a.raft.removeInterceptor(); + a.unlock(); + } + + private CompletableFuture interruptAfter(int delay) { + CompletableFuture done = new CompletableFuture<>(); + Thread thread = Thread.currentThread(); + CompletableFuture.delayedExecutor(delay, SECONDS).execute(() -> { + thread.interrupt(); done.complete(null); + }); + return done; + } + + private Service reconnect(JChannel ch, LockService.Listener listener) throws Exception { + Service service = new Service(ch); + if (listener != null) service.addListener(listener); + ch.connect(clusterName()); return service; + } + + private void partition(int[]... partitions) throws TimeoutException { + List> parts = stream(partitions).map(t -> stream(t).mapToObj(this::channel).collect(toList())) + .collect(toList()); + for (List p : parts) { + var s = parts.stream().filter(t -> t != p).flatMap(t -> t.stream().map(JChannel::address)).collect(toList()); + p.forEach(t -> t.stack().getBottomProtocol().up(new org.jgroups.Event(org.jgroups.Event.SUSPECT, s))); + Util.waitUntilAllChannelsHaveSameView(30_000, 1000, p.toArray(JChannel[]::new)); + } + } + + private void merge(int... coordinators) throws TimeoutException { + List coords = stream(coordinators).mapToObj(this::channel).collect(toList()); + Map views = coords.stream().collect(toMap(JChannel::address, JChannel::view)); + coords.forEach(t -> t.stack().getBottomProtocol().up(new org.jgroups.Event(org.jgroups.Event.MERGE, views))); + for (JChannel ch : coords) { + GMS gms = ch.stack().findProtocol(GMS.class); + Util.waitUntil(30_000, 1000, () -> !gms.isMergeTaskRunning()); + } + } + + private void waitUntilNodesApplyAllLogs(int... indexes) throws TimeoutException { + RAFT[] rafts = indexes.length > 0 ? IntStream.of(indexes).mapToObj(this::raft).toArray(RAFT[]::new) : + stream(channels()).map(this::raft).toArray(RAFT[]::new); + Util.waitUntil(30_000, 1000, () -> { + long last = -1; + for (RAFT raft : rafts) { + if (last == -1) last = raft.lastAppended(); + else if (raft.lastAppended() != last) return false; + if (raft.commitIndex() != last) return false; + } + return true; + }); + } + + private void waitUntilLeaderElected(int... indexes) throws TimeoutException { + RAFT[] rafts = IntStream.of(indexes).mapToObj(this::raft).toArray(RAFT[]::new); + Util.waitUntil(30_000, 1000, () -> { + Address leader = null; long term = 0; + for (RAFT raft : rafts) { + Address a = raft.leader(); + if (a == null) return false; + if (leader == null) { + leader = a; term = raft.currentTerm(); + } else if (!leader.equals(a) || term != raft.currentTerm()) { + return false; + } + } + return true; + }); + } +}