diff --git a/modules/common/src/main/java/com/tsurugidb/tsubakuro/channel/common/connection/Disposer.java b/modules/common/src/main/java/com/tsurugidb/tsubakuro/channel/common/connection/Disposer.java index 9e9ec445..b33016e2 100644 --- a/modules/common/src/main/java/com/tsurugidb/tsubakuro/channel/common/connection/Disposer.java +++ b/modules/common/src/main/java/com/tsurugidb/tsubakuro/channel/common/connection/Disposer.java @@ -17,8 +17,7 @@ import java.io.IOException; import java.io.UncheckedIOException; -import java.util.ArrayDeque; -import java.util.Queue; +import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; @@ -39,13 +38,13 @@ public class Disposer extends Thread { private AtomicBoolean started = new AtomicBoolean(); - private Queue> futureResponseQueue = new ArrayDeque<>(); + private ConcurrentLinkedQueue> futureResponseQueue = new ConcurrentLinkedQueue<>(); - private Queue serverResourceQueue = new ArrayDeque<>(); + private ConcurrentLinkedQueue serverResourceQueue = new ConcurrentLinkedQueue<>(); private AtomicBoolean empty = new AtomicBoolean(); - private final AtomicReference shutdown = new AtomicReference<>(); + private ConcurrentLinkedQueue shutdownQueue = new ConcurrentLinkedQueue<>(); private final AtomicReference close = new AtomicReference<>(); @@ -85,10 +84,7 @@ public void run() { boolean shutdownProcessed = false; while (true) { - ForegroundFutureResponse futureResponse; - synchronized (futureResponseQueue) { - futureResponse = futureResponseQueue.poll(); - } + var futureResponse = futureResponseQueue.poll(); if (futureResponse != null) { try { var obj = futureResponse.retrieve(); @@ -119,10 +115,7 @@ public void run() { continue; } } - DelayedClose serverResource; - synchronized (serverResourceQueue) { - serverResource = serverResourceQueue.poll(); - } + var serverResource = serverResourceQueue.poll(); if (serverResource != null) { try { serverResource.delayedClose(); @@ -134,18 +127,17 @@ public void run() { notifyEmpty(); if (!shutdownProcessed) { try { - var sh = shutdown.get(); - if (sh != null) { - sh.shutdown(); - shutdownProcessed = true; + while (!shutdownQueue.isEmpty()) { + shutdownQueue.poll().shutdown(); } + shutdownProcessed = true; } catch (IOException e) { exception = addSuppressed(exception, e); } } var cl = close.get(); if (cl != null) { - if (shutdownProcessed || shutdown.get() == null) { + if (shutdownProcessed || shutdownQueue.isEmpty()) { try { cl.delayedClose(); } catch (ServerException | IOException | InterruptedException e) { @@ -162,8 +154,13 @@ public void run() { } if (exception != null) { - LOG.info(exception.getMessage()); - throw new UncheckedIOException(new IOException(exception)); + LOG.error(exception.getMessage()); + exception.printStackTrace(); + if (exception instanceof IOException) { + throw new UncheckedIOException((IOException) exception); + } else { + throw new UncheckedIOException(new IOException(exception)); + } } } @@ -177,12 +174,10 @@ private Exception addSuppressed(Exception exception, Exception e) { } synchronized void add(ForegroundFutureResponse futureResponse) { - if (close.get() != null) { + if (close.get() != null || !shutdownQueue.isEmpty()) { throw new AssertionError("Session already closed"); } - synchronized (futureResponseQueue) { - futureResponseQueue.add(futureResponse); - } + futureResponseQueue.add(futureResponse); if (!started.getAndSet(true)) { this.start(); } @@ -194,12 +189,10 @@ synchronized void add(ForegroundFutureResponse futureResponse) { * @param resource the DelayedClose to be added */ public synchronized void add(DelayedClose resource) { - if (close.get() != null) { + if (close.get() != null || !shutdownQueue.isEmpty()) { throw new AssertionError("Session already closed"); } - synchronized (serverResourceQueue) { - serverResourceQueue.add(resource); - } + serverResourceQueue.add(resource); if (!started.getAndSet(true)) { this.start(); } @@ -212,13 +205,15 @@ public synchronized void add(DelayedClose resource) { * @param c the clean up procesure to be registered * @throws IOException An error was occurred in c.shoutdown() execution. */ - public synchronized void registerDelayedShutdown(DelayedShutdown c) throws IOException { - if (!started.getAndSet(true)) { - empty.set(true); - c.shutdown(); - return; + public void registerDelayedShutdown(DelayedShutdown c) throws IOException { + synchronized (this) { + if (started.getAndSet(true)) { + shutdownQueue.add(c); + return; + } } - shutdown.set(c); + empty.set(true); + c.shutdown(); } /** @@ -231,22 +226,22 @@ public synchronized void registerDelayedShutdown(DelayedShutdown c) throws IOExc * @throws InterruptedException if interrupted while disposing the session */ public synchronized void registerDelayedClose(DelayedClose c) throws ServerException, IOException, InterruptedException { - if (!started.getAndSet(true)) { - empty.set(true); - c.delayedClose(); - return; - } - if (futureResponseQueue.isEmpty() && serverResourceQueue.isEmpty()) { - c.delayedClose(); - close.set(new DelayedClose() { - @Override - public void delayedClose() { - // do nothing + synchronized (this) { + if (started.getAndSet(true)) { + if (!futureResponseQueue.isEmpty() || !serverResourceQueue.isEmpty()) { + close.set(c); + return; } - }); - return; + close.set(new DelayedClose() { + @Override + public void delayedClose() { + // do nothing + } + }); + } } - close.set(c); + empty.set(true); + c.delayedClose(); } /** diff --git a/modules/session/src/main/java/com/tsurugidb/tsubakuro/sql/impl/PreparedStatementImpl.java b/modules/session/src/main/java/com/tsurugidb/tsubakuro/sql/impl/PreparedStatementImpl.java index 2158fafd..b39b15fb 100644 --- a/modules/session/src/main/java/com/tsurugidb/tsubakuro/sql/impl/PreparedStatementImpl.java +++ b/modules/session/src/main/java/com/tsurugidb/tsubakuro/sql/impl/PreparedStatementImpl.java @@ -44,7 +44,7 @@ public class PreparedStatementImpl implements PreparedStatement { private final SqlService service; private final ServerResource.CloseHandler closeHandler; - private final AtomicBoolean added = new AtomicBoolean(); + private final AtomicBoolean addedToDisposer = new AtomicBoolean(); private final AtomicBoolean closed = new AtomicBoolean(); private long timeout = 0; private TimeUnit unit; @@ -101,7 +101,7 @@ public void setCloseTimeout(long t, TimeUnit u) { @Override public void close() throws IOException, ServerException, InterruptedException { if (disposer != null) { - if (!added.getAndSet(true)) { + if (!addedToDisposer.getAndSet(true)) { disposer.add(new Disposer.DelayedClose() { @Override public void delayedClose() throws ServerException, IOException, InterruptedException { diff --git a/modules/session/src/main/java/com/tsurugidb/tsubakuro/sql/impl/TransactionImpl.java b/modules/session/src/main/java/com/tsurugidb/tsubakuro/sql/impl/TransactionImpl.java index 0bbf6dd5..2e534a6c 100644 --- a/modules/session/src/main/java/com/tsurugidb/tsubakuro/sql/impl/TransactionImpl.java +++ b/modules/session/src/main/java/com/tsurugidb/tsubakuro/sql/impl/TransactionImpl.java @@ -249,26 +249,24 @@ public FutureResponse executeLoad( } @Override - public FutureResponse commit(@Nonnull SqlRequest.CommitStatus status) throws IOException { + public synchronized FutureResponse commit(@Nonnull SqlRequest.CommitStatus status) throws IOException { Objects.requireNonNull(status); - synchronized (this) { - switch (state.get()) { - case INITIAL: - commitResult = service.send(SqlRequest.Commit.newBuilder() - .setTransactionHandle(transaction.getTransactionHandle()) - .setNotificationType(status) - .setAutoDispose(true) - .build()); - state.set(State.COMMITTED); - return commitResult; - case COMMITTED: - throw new IOException("transaction already committed"); - case ROLLBACKED: - throw new IOException("transaction already rollbacked"); - default: - throw new IOException("transaction already closed"); - } + switch (state.get()) { + case INITIAL: + commitResult = service.send(SqlRequest.Commit.newBuilder() + .setTransactionHandle(transaction.getTransactionHandle()) + .setNotificationType(status) + .setAutoDispose(true) + .build()); + state.set(State.COMMITTED); + return commitResult; + case COMMITTED: + return commitResult; + case ROLLBACKED: + throw new IOException("transaction already rollbacked"); + default: + throw new IOException("transaction already closed"); } } @@ -290,30 +288,26 @@ public FutureResponse rollback() throws IOException { } @Override - public void setCloseTimeout(long t, TimeUnit u) { - synchronized (this) { - timeout = t; - unit = u; - } + public synchronized void setCloseTimeout(long t, TimeUnit u) { + timeout = t; + unit = u; } @Override - public FutureResponse getSqlServiceException() throws IOException { - synchronized (this) { - if (state.get() == State.CLOSED) { - throw new IOException("transaction already closed"); - } - var cr = commitResult; - if (cr != null && cr.isDone()) { - try { - cr.get(); - return FutureResponse.returns(null); - } catch (IOException | ServerException | InterruptedException e) { - return sendAndGetSqlServiceException(); - } + public synchronized FutureResponse getSqlServiceException() throws IOException { + if (state.get() == State.CLOSED) { + throw new IOException("transaction already closed"); + } + var cr = commitResult; + if (cr != null && cr.isDone()) { + try { + cr.get(); + return FutureResponse.returns(null); + } catch (IOException | ServerException | InterruptedException e) { + return sendAndGetSqlServiceException(); } - return sendAndGetSqlServiceException(); } + return sendAndGetSqlServiceException(); } private FutureResponse sendAndGetSqlServiceException() throws IOException { return service.send(SqlRequest.GetErrorInfo.newBuilder() @@ -326,6 +320,37 @@ public String getTransactionId() { return transaction.getTransactionId().getId(); } + @Override + public synchronized void close() throws IOException, ServerException, InterruptedException { + switch (state.get()) { + case INITIAL: + case ROLLBACKED: + break; + case COMMITTED: + if (commitResult.isDone()) { + doClose(); + return; + } + break; + case TO_BE_CLOSED: + case TO_BE_CLOSED_WITH_COMMIT: + case TO_BE_CLOSED_WITH_ROLLBACK: + case CLOSED: + return; + } + if (disposer != null) { + disposer.add(new Disposer.DelayedClose() { + @Override + public void delayedClose() throws ServerException, IOException, InterruptedException { + doClose(); + } + }); + state.set(toBeClosed(state.get())); + return; + } + doClose(); + } + private State toBeClosed(State s) { switch (s) { case INITIAL: @@ -339,100 +364,65 @@ private State toBeClosed(State s) { } } - @Override - public void close() throws IOException, ServerException, InterruptedException { - synchronized (this) { - switch (state.get()) { - case INITIAL: - case ROLLBACKED: - break; - case COMMITTED: - if (commitResult.isDone()) { - doClose(); - return; - } - break; - case TO_BE_CLOSED: - case TO_BE_CLOSED_WITH_COMMIT: - case TO_BE_CLOSED_WITH_ROLLBACK: - case CLOSED: - return; - } - if (disposer != null) { - disposer.add(new Disposer.DelayedClose() { - @Override - public void delayedClose() throws ServerException, IOException, InterruptedException { - doClose(); - } - }); - state.set(toBeClosed(state.get())); - return; - } - doClose(); - } - } - - private void doClose() throws IOException, ServerException, InterruptedException { - synchronized (this) { - boolean needDispose = true; - boolean needRollback = false; + private synchronized void doClose() throws IOException, ServerException, InterruptedException { + boolean needDispose = true; + boolean needRollback = false; - switch (state.get()) { - case INITIAL: - case TO_BE_CLOSED: - needRollback = true; - break; - case COMMITTED: - case TO_BE_CLOSED_WITH_COMMIT: - try { - commitResult.get(); - needDispose = false; - } catch (IOException | ServerException | InterruptedException e) { - needDispose = true; - } - break; - case ROLLBACKED: - case TO_BE_CLOSED_WITH_ROLLBACK: - break; - case CLOSED: - return; - } + switch (state.get()) { + case INITIAL: + case TO_BE_CLOSED: + needRollback = true; + break; + case COMMITTED: + case TO_BE_CLOSED_WITH_COMMIT: try { - if (needRollback) { - // FIXME need to consider rollback is suitable here - try (var rollback = submitRollback()) { - if (timeout == 0) { - rollback.get(); - } else { - rollback.get(timeout, unit); - } - } catch (TimeoutException e) { - LOG.warn("timeout occurred in the transaction rollback", e); - throw new ResponseTimeoutException(e.getMessage(), e); + commitResult.get(); + needDispose = false; + } catch (IOException | ServerException | InterruptedException e) { + needDispose = true; + } + break; + case ROLLBACKED: + case TO_BE_CLOSED_WITH_ROLLBACK: + break; + case CLOSED: + return; + } + try { + if (needRollback) { + // FIXME need to consider rollback is suitable here + try (var rollback = submitRollback()) { + if (timeout == 0) { + rollback.get(); + } else { + rollback.get(timeout, unit); } + } catch (TimeoutException e) { + LOG.warn("timeout occurred in the transaction rollback", e); + throw new ResponseTimeoutException(e.getMessage(), e); } - } finally { - if (closeHandler != null) { - Lang.suppress( - e -> LOG.warn("error occurred while collecting garbage", e), - () -> closeHandler.onClosed(this)); - } - if (needDispose) { - try (var futureResponse = service.send(SqlRequest.DisposeTransaction.newBuilder() - .setTransactionHandle(transaction.getTransactionHandle()) - .build())) { - if (timeout == 0) { - futureResponse.get(); - } else { - futureResponse.get(timeout, unit); - } - } catch (TimeoutException e) { - LOG.warn("timeout occurred in the transaction disposal", e); - throw new ResponseTimeoutException(e.getMessage(), e); + } + } finally { + if (closeHandler != null) { + Lang.suppress( + e -> LOG.warn("error occurred while collecting garbage", e), + () -> closeHandler.onClosed(this)); + } + if (needDispose) { + try (var futureResponse = service.send(SqlRequest.DisposeTransaction.newBuilder() + .setTransactionHandle(transaction.getTransactionHandle()) + .build())) { + if (timeout == 0) { + futureResponse.get(); + } else { + futureResponse.get(timeout, unit); } + } catch (TimeoutException e) { + LOG.warn("timeout occurred in the transaction disposal", e); + throw new ResponseTimeoutException(e.getMessage(), e); } - state.set(State.CLOSED); } + state.set(State.CLOSED); } }