Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: queue message payloads #20749

Open
wants to merge 6 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -295,10 +295,10 @@ protected void handleJSON(final ValueMap valueMap) {
}

/**
* Should only prepare resync after the if (locked ||
* Should only prepare resync after the (locked ||
* !isNextExpectedMessage(serverId)) {...} since
* stateTree.repareForResync() will remove the nodes, and if locked is
* true, it will return without handling the message, thus won't adding
* true, it will return without handling the message, thus won't add
* nodes back.
*
* This is related to https://github.com/vaadin/flow/issues/8699 It
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,11 @@
*/
package com.vaadin.client.communication;

import java.util.ArrayList;
import java.util.List;

import com.google.gwt.core.client.GWT;
import com.google.gwt.user.client.Timer;

import com.vaadin.client.ConnectionIndicator;
import com.vaadin.client.Console;
Expand Down Expand Up @@ -67,6 +71,10 @@ public enum ResynchronizationState {

private JsonObject pushPendingMessage;

private List<JsonObject> messageQueue = new ArrayList<>();

private Timer resendMessageTimer;

/**
* Creates a new instance connected to the given registry.
*
Expand Down Expand Up @@ -119,7 +127,13 @@ private void doSendInvocationsToServer() {
JsonObject payload = pushPendingMessage;
pushPendingMessage = null;
registry.getRequestResponseTracker().startRequest();
send(payload);
sendPayload(payload);
return;
} else if (hasQueuedMessages() && resendMessageTimer == null) {
if (!registry.getRequestResponseTracker().hasActiveRequest()) {
registry.getRequestResponseTracker().startRequest();
}
sendPayload(messageQueue.get(0));
return;
}

Expand All @@ -146,6 +160,8 @@ private void doSendInvocationsToServer() {
if (resynchronizationState == ResynchronizationState.SEND_TO_SERVER) {
resynchronizationState = ResynchronizationState.WAITING_FOR_RESPONSE;
Console.warn("Resynchronizing from server");
messageQueue.clear();
resetTimer();
extraJson.put(ApplicationConstants.RESYNCHRONIZE_ID, true);
}
if (showLoadingIndicator) {
Expand All @@ -166,7 +182,6 @@ protected void send(final JsonArray reqInvocations,
final JsonObject extraJson) {
registry.getRequestResponseTracker().startRequest();
send(preparePayload(reqInvocations, extraJson));

}

private JsonObject preparePayload(final JsonArray reqInvocations,
Expand All @@ -177,10 +192,6 @@ private JsonObject preparePayload(final JsonArray reqInvocations,
payload.put(ApplicationConstants.CSRF_TOKEN, csrfToken);
}
payload.put(ApplicationConstants.RPC_INVOCATIONS, reqInvocations);
payload.put(ApplicationConstants.SERVER_SYNC_ID,
registry.getMessageHandler().getLastSeenServerSyncId());
payload.put(ApplicationConstants.CLIENT_TO_SERVER_ID,
clientToServerMessageId++);
if (extraJson != null) {
for (String key : extraJson.keys()) {
JsonValue value = extraJson.get(key);
Expand All @@ -192,12 +203,43 @@ private JsonObject preparePayload(final JsonArray reqInvocations,

/**
* Sends an asynchronous or synchronous UIDL request to the server using the
* given URI.
* given URI. Adds message to message queue and postpones sending if queue
* not empty.
*
* @param payload
* The contents of the request to send
*/
public void send(final JsonObject payload) {
if (hasQueuedMessages()) {
messageQueue.add(payload);
return;
}
messageQueue.add(payload);
sendPayload(payload);
}

/**
* Sends an asynchronous or synchronous UIDL request to the server using the
* given URI.
*
* @param payload
* The contents of the request to send
*/
private void sendPayload(final JsonObject payload) {
payload.put(ApplicationConstants.SERVER_SYNC_ID,
registry.getMessageHandler().getLastSeenServerSyncId());
if (!payload.hasKey(ApplicationConstants.CLIENT_TO_SERVER_ID)) {
// We are resending the message so we should not up the clientId
payload.put(ApplicationConstants.CLIENT_TO_SERVER_ID,
clientToServerMessageId++);
}

if (!registry.getRequestResponseTracker().hasActiveRequest()) {
// Direct calls to send from outside probably have not started
// request.
registry.getRequestResponseTracker().startRequest();
}

if (push != null && push.isBidirectional()) {
// When using bidirectional transport, the payload is not resent
// to the server during reconnection attempts.
Expand All @@ -211,6 +253,31 @@ public void send(final JsonObject payload) {
} else {
Console.debug("send XHR");
registry.getXhrConnection().send(payload);

resetTimer();
// resend last payload if response hasn't come in.
resendMessageTimer = new Timer() {
@Override
public void run() {
resendMessageTimer
.schedule(registry.getApplicationConfiguration()
.getMaxMessageSuspendTimeout() + 500);
if (!registry.getRequestResponseTracker()
.hasActiveRequest()) {
registry.getRequestResponseTracker().startRequest();
}
registry.getXhrConnection().send(payload);
}
};
resendMessageTimer.schedule(registry.getApplicationConfiguration()
.getMaxMessageSuspendTimeout() + 500);
}
}

private void resetTimer() {
if (resendMessageTimer != null) {
resendMessageTimer.cancel();
resendMessageTimer = null;
}
}

Expand Down Expand Up @@ -289,6 +356,8 @@ public String getCommunicationMethodName() {
*/
public void resynchronize() {
if (requestResynchronize()) {
messageQueue.clear();
resetTimer();
sendInvocationsToServer();
}
}
Expand All @@ -311,12 +380,26 @@ public void setClientToServerMessageId(int nextExpectedId, boolean force) {
ApplicationConstants.CLIENT_TO_SERVER_ID) < nextExpectedId) {
pushPendingMessage = null;
}
if (hasQueuedMessages()) {
synchronized (messageQueue) {
// If queued message is the expected one. remove from queue
// and sen next message if any.
if (messageQueue.get(0)
.getNumber(ApplicationConstants.CLIENT_TO_SERVER_ID)
+ 1 == nextExpectedId) {
resetTimer();
messageQueue.remove(0);
}
}
}
return;
}
if (force) {
Console.debug(
"Forced update of clientId to " + clientToServerMessageId);
clientToServerMessageId = nextExpectedId;
messageQueue.clear();
resetTimer();
return;
}

Expand Down Expand Up @@ -372,4 +455,8 @@ void clearResynchronizationState() {
ResynchronizationState getResynchronizationState() {
return resynchronizationState;
}

public boolean hasQueuedMessages() {
return !messageQueue.isEmpty();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -110,9 +110,10 @@ public void endRequest() {
hasActiveRequest = false;

if ((registry.getUILifecycle().isRunning()
&& registry.getServerRpcQueue().isFlushPending())
&& (registry.getServerRpcQueue().isFlushPending())
|| registry.getMessageSender()
.getResynchronizationState() == ResynchronizationState.SEND_TO_SERVER) {
.getResynchronizationState() == ResynchronizationState.SEND_TO_SERVER
|| registry.getMessageSender().hasQueuedMessages())) {
// Send the pending RPCs immediately.
// This might be an unnecessary optimization as ServerRpcQueue has a
// finally scheduled command which trigger the send if we do not do
Expand Down
2 changes: 1 addition & 1 deletion flow-client/src/test/frontend/FlowTests.ts
Original file line number Diff line number Diff line change
Expand Up @@ -748,7 +748,7 @@ function stubServerRemoteFunction(
handlers.leaveNavigation();
}
}
req.respond(200, { 'content-type': 'application/json' }, 'for(;;);[{}]');
req.respond(200, {'content-type': 'application/json'}, 'for(;;);[{"syncId":' + (payload["syncId"] + 1) + ',"clientId":' + (payload["clientId"] + 1) + '}]');
});
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ public RpcRequest(String jsonString, boolean isSyncIdCheckEnabled) {
this.csrfToken = csrfToken;
}

if (isSyncIdCheckEnabled) {
if (isSyncIdCheckEnabled && !isUnloadBeaconRequest()) {
syncId = (int) json
.getNumber(ApplicationConstants.SERVER_SYNC_ID);
} else {
Expand All @@ -131,7 +131,10 @@ public RpcRequest(String jsonString, boolean isSyncIdCheckEnabled) {
clientToServerMessageId = (int) json
.getNumber(ApplicationConstants.CLIENT_TO_SERVER_ID);
} else {
getLogger().warn("Server message without client id received");
if (!isUnloadBeaconRequest()) {
getLogger()
.warn("Server message without client id received");
}
clientToServerMessageId = -1;
}
invocations = json.getArray(ApplicationConstants.RPC_INVOCATIONS);
Expand Down
1 change: 1 addition & 0 deletions flow-tests/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -335,6 +335,7 @@
<module>test-react-adapter</module>
<module>test-react-adapter/pom-production.xml</module>
<module>test-legacy-frontend</module>
<module>test-client-queue</module>
</modules>
</profile>
<profile>
Expand Down
63 changes: 63 additions & 0 deletions flow-tests/test-client-queue/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<artifactId>flow-tests</artifactId>
<groupId>com.vaadin</groupId>
<version>24.7-SNAPSHOT</version>
</parent>
<artifactId>flow-client-queue-test</artifactId>
<name>Test Flow client queue</name>

<packaging>war</packaging>
<properties>
<maven.deploy.skip>true</maven.deploy.skip>
<!-- Test checks client log so java.util.logging.Level import is needed -->
<enforcer.skip>true</enforcer.skip>
</properties>

<dependencies>
<dependency>
<groupId>com.vaadin</groupId>
<artifactId>flow-test-resources</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>com.vaadin</groupId>
<artifactId>vaadin-dev-server</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>com.vaadin</groupId>
<artifactId>flow-html-components-testbench</artifactId>
<version>${project.version}</version>
<scope>test</scope>
</dependency>
</dependencies>

<build>
<plugins>
<!-- Run flow plugin to build frontend -->
<plugin>
<groupId>com.vaadin</groupId>
<artifactId>flow-maven-plugin</artifactId>
<executions>
<execution>
<goals>
<goal>prepare-frontend</goal>
</goals>
</execution>
</executions>
<!-- <configuration>-->
<!-- <frontendHotdeploy>true</frontendHotdeploy>-->
<!-- </configuration>-->
</plugin>
<!-- Run jetty before integration tests, and stop after -->
<plugin>
<groupId>org.eclipse.jetty.ee10</groupId>
<artifactId>jetty-ee10-maven-plugin</artifactId>
</plugin>
</plugins>
</build>

</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/*
* Copyright 2000-2025 Vaadin Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package com.vaadin.flow.misc.ui;

import java.util.List;

import com.vaadin.flow.function.DeploymentConfiguration;
import com.vaadin.flow.server.RequestHandler;
import com.vaadin.flow.server.ServiceException;
import com.vaadin.flow.server.VaadinServlet;
import com.vaadin.flow.server.VaadinServletService;
import com.vaadin.flow.server.communication.UidlRequestHandler;

public class CustomService extends VaadinServletService {

public CustomService(VaadinServlet servlet,
DeploymentConfiguration deploymentConfiguration) {
super(servlet, deploymentConfiguration);
}

@Override
protected List<RequestHandler> createRequestHandlers()
throws ServiceException {
List<RequestHandler> requestHandlers = super.createRequestHandlers();
requestHandlers.replaceAll(handler -> {
if (handler instanceof UidlRequestHandler) {
return new CustomUidlRequestHandler();
}
return handler;
});
return requestHandlers;
}
}
Loading
Loading