From 74aacb07ef008a1374978a79e51ed1c4b575f768 Mon Sep 17 00:00:00 2001 From: Jiwei Guo Date: Thu, 2 Jul 2026 17:18:02 +0800 Subject: [PATCH 1/8] Fix master compile issue --- .../SystemTopicBasedSystemEventService.java | 2 +- .../handlers/mqtt/common/utils/RetryUtil.java | 61 +++++++++++++++++++ mqtt-proxy/pom.xml | 5 +- .../mqtt/proxy/web/admin/Devices.java | 12 ++-- .../mqtt/proxy/web/admin/WebResource.java | 10 +-- pom.xml | 1 - 6 files changed, 75 insertions(+), 16 deletions(-) create mode 100644 mqtt-common/src/main/java/io/streamnative/pulsar/handlers/mqtt/common/utils/RetryUtil.java diff --git a/mqtt-common/src/main/java/io/streamnative/pulsar/handlers/mqtt/common/systemtopic/SystemTopicBasedSystemEventService.java b/mqtt-common/src/main/java/io/streamnative/pulsar/handlers/mqtt/common/systemtopic/SystemTopicBasedSystemEventService.java index 072bb4ff2..352995d3f 100644 --- a/mqtt-common/src/main/java/io/streamnative/pulsar/handlers/mqtt/common/systemtopic/SystemTopicBasedSystemEventService.java +++ b/mqtt-common/src/main/java/io/streamnative/pulsar/handlers/mqtt/common/systemtopic/SystemTopicBasedSystemEventService.java @@ -23,6 +23,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; +import io.streamnative.pulsar.handlers.mqtt.common.utils.RetryUtil; import lombok.extern.slf4j.Slf4j; import org.apache.bookkeeper.common.util.JsonUtil; import org.apache.pulsar.broker.PulsarServerException; @@ -31,7 +32,6 @@ import org.apache.pulsar.client.api.Message; import org.apache.pulsar.client.api.MessageId; import org.apache.pulsar.client.api.PulsarClientException; -import org.apache.pulsar.client.util.RetryUtil; import org.apache.pulsar.common.naming.NamespaceName; import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.util.Backoff; diff --git a/mqtt-common/src/main/java/io/streamnative/pulsar/handlers/mqtt/common/utils/RetryUtil.java b/mqtt-common/src/main/java/io/streamnative/pulsar/handlers/mqtt/common/utils/RetryUtil.java new file mode 100644 index 000000000..919d68223 --- /dev/null +++ b/mqtt-common/src/main/java/io/streamnative/pulsar/handlers/mqtt/common/utils/RetryUtil.java @@ -0,0 +1,61 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.streamnative.pulsar.handlers.mqtt.common.utils; + +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.function.Supplier; +import org.apache.pulsar.common.util.Backoff; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class RetryUtil { + + private static final Logger log = LoggerFactory.getLogger(RetryUtil.class); + + public static void retryAsynchronously(Supplier> supplier, Backoff backoff, + ScheduledExecutorService scheduledExecutorService, + CompletableFuture callback) { + if (backoff.getMax().isZero() || backoff.getMax().isNegative()) { + throw new IllegalArgumentException("Illegal max retry time"); + } + if (backoff.getInitial().isZero() || backoff.getInitial().isNegative()) { + throw new IllegalArgumentException("Illegal initial time"); + } + scheduledExecutorService.execute(() -> + executeWithRetry(supplier, backoff, scheduledExecutorService, callback)); + } + + private static void executeWithRetry(Supplier> supplier, Backoff backoff, + ScheduledExecutorService scheduledExecutorService, + CompletableFuture callback) { + supplier.get().whenComplete((result, e) -> { + if (e != null) { + long next = backoff.next().toMillis(); + boolean isMandatoryStop = backoff.isMandatoryStopMade(); + if (isMandatoryStop) { + callback.completeExceptionally(e); + } else { + log.warn("Execution with retry fail, because of {}, will retry in {} ms", e.getMessage(), next); + scheduledExecutorService.schedule(() -> + executeWithRetry(supplier, backoff, scheduledExecutorService, callback), + next, TimeUnit.MILLISECONDS); + } + return; + } + callback.complete(result); + }); + } +} diff --git a/mqtt-proxy/pom.xml b/mqtt-proxy/pom.xml index 4de7cb39f..94849fc63 100644 --- a/mqtt-proxy/pom.xml +++ b/mqtt-proxy/pom.xml @@ -64,9 +64,8 @@ ${swagger-annotations.version} - javax.ws.rs - javax.ws.rs-api - ${javax.ws.rs.version} + jakarta.ws.rs + jakarta.ws.rs-api javax.servlet diff --git a/mqtt-proxy/src/main/java/io/streamnative/pulsar/handlers/mqtt/proxy/web/admin/Devices.java b/mqtt-proxy/src/main/java/io/streamnative/pulsar/handlers/mqtt/proxy/web/admin/Devices.java index b1ef0bf73..d4180cda6 100644 --- a/mqtt-proxy/src/main/java/io/streamnative/pulsar/handlers/mqtt/proxy/web/admin/Devices.java +++ b/mqtt-proxy/src/main/java/io/streamnative/pulsar/handlers/mqtt/proxy/web/admin/Devices.java @@ -18,12 +18,12 @@ import io.swagger.annotations.ApiResponse; import io.swagger.annotations.ApiResponses; import java.util.Collection; -import javax.ws.rs.GET; -import javax.ws.rs.Path; -import javax.ws.rs.Produces; -import javax.ws.rs.container.AsyncResponse; -import javax.ws.rs.container.Suspended; -import javax.ws.rs.core.MediaType; +import jakarta.ws.rs.GET; +import jakarta.ws.rs.Path; +import jakarta.ws.rs.Produces; +import jakarta.ws.rs.container.AsyncResponse; +import jakarta.ws.rs.container.Suspended; +import jakarta.ws.rs.core.MediaType; import org.apache.pulsar.broker.web.RestException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; diff --git a/mqtt-proxy/src/main/java/io/streamnative/pulsar/handlers/mqtt/proxy/web/admin/WebResource.java b/mqtt-proxy/src/main/java/io/streamnative/pulsar/handlers/mqtt/proxy/web/admin/WebResource.java index 5dd439188..8e0ccbfb1 100644 --- a/mqtt-proxy/src/main/java/io/streamnative/pulsar/handlers/mqtt/proxy/web/admin/WebResource.java +++ b/mqtt-proxy/src/main/java/io/streamnative/pulsar/handlers/mqtt/proxy/web/admin/WebResource.java @@ -27,11 +27,11 @@ import java.util.function.Supplier; import javax.servlet.ServletContext; import javax.servlet.http.HttpServletRequest; -import javax.ws.rs.WebApplicationException; -import javax.ws.rs.container.AsyncResponse; -import javax.ws.rs.core.Context; -import javax.ws.rs.core.Response.Status; -import javax.ws.rs.core.UriInfo; +import jakarta.ws.rs.WebApplicationException; +import jakarta.ws.rs.container.AsyncResponse; +import jakarta.ws.rs.core.Context; +import jakarta.ws.rs.core.Response.Status; +import jakarta.ws.rs.core.UriInfo; import org.apache.pulsar.broker.authentication.AuthenticationDataSource; import org.apache.pulsar.broker.service.BrokerServiceException; import org.apache.pulsar.broker.web.AuthenticationFilter; diff --git a/pom.xml b/pom.xml index 48adc7d86..4775f2b90 100644 --- a/pom.xml +++ b/pom.xml @@ -74,7 +74,6 @@ 2.0.1.Final 2.42 1.6.15 - 2.1.1 2.5.2 From bb560889b19016859d08bd8f521d43858c21de48 Mon Sep 17 00:00:00 2001 From: Jiwei Guo Date: Thu, 2 Jul 2026 17:23:57 +0800 Subject: [PATCH 2/8] fix checkstyle --- .../SystemTopicBasedSystemEventService.java | 2 +- .../pulsar/handlers/mqtt/proxy/web/admin/Devices.java | 2 +- .../handlers/mqtt/proxy/web/admin/WebResource.java | 10 +++++----- 3 files changed, 7 insertions(+), 7 deletions(-) diff --git a/mqtt-common/src/main/java/io/streamnative/pulsar/handlers/mqtt/common/systemtopic/SystemTopicBasedSystemEventService.java b/mqtt-common/src/main/java/io/streamnative/pulsar/handlers/mqtt/common/systemtopic/SystemTopicBasedSystemEventService.java index 352995d3f..d2be6a1d1 100644 --- a/mqtt-common/src/main/java/io/streamnative/pulsar/handlers/mqtt/common/systemtopic/SystemTopicBasedSystemEventService.java +++ b/mqtt-common/src/main/java/io/streamnative/pulsar/handlers/mqtt/common/systemtopic/SystemTopicBasedSystemEventService.java @@ -16,6 +16,7 @@ import com.github.benmanes.caffeine.cache.AsyncLoadingCache; import com.github.benmanes.caffeine.cache.Caffeine; import com.google.common.annotations.Beta; +import io.streamnative.pulsar.handlers.mqtt.common.utils.RetryUtil; import java.time.Duration; import java.util.ArrayList; import java.util.List; @@ -23,7 +24,6 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; -import io.streamnative.pulsar.handlers.mqtt.common.utils.RetryUtil; import lombok.extern.slf4j.Slf4j; import org.apache.bookkeeper.common.util.JsonUtil; import org.apache.pulsar.broker.PulsarServerException; diff --git a/mqtt-proxy/src/main/java/io/streamnative/pulsar/handlers/mqtt/proxy/web/admin/Devices.java b/mqtt-proxy/src/main/java/io/streamnative/pulsar/handlers/mqtt/proxy/web/admin/Devices.java index d4180cda6..ab0bc036e 100644 --- a/mqtt-proxy/src/main/java/io/streamnative/pulsar/handlers/mqtt/proxy/web/admin/Devices.java +++ b/mqtt-proxy/src/main/java/io/streamnative/pulsar/handlers/mqtt/proxy/web/admin/Devices.java @@ -17,13 +17,13 @@ import io.swagger.annotations.ApiOperation; import io.swagger.annotations.ApiResponse; import io.swagger.annotations.ApiResponses; -import java.util.Collection; import jakarta.ws.rs.GET; import jakarta.ws.rs.Path; import jakarta.ws.rs.Produces; import jakarta.ws.rs.container.AsyncResponse; import jakarta.ws.rs.container.Suspended; import jakarta.ws.rs.core.MediaType; +import java.util.Collection; import org.apache.pulsar.broker.web.RestException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; diff --git a/mqtt-proxy/src/main/java/io/streamnative/pulsar/handlers/mqtt/proxy/web/admin/WebResource.java b/mqtt-proxy/src/main/java/io/streamnative/pulsar/handlers/mqtt/proxy/web/admin/WebResource.java index 8e0ccbfb1..9c99fd5f1 100644 --- a/mqtt-proxy/src/main/java/io/streamnative/pulsar/handlers/mqtt/proxy/web/admin/WebResource.java +++ b/mqtt-proxy/src/main/java/io/streamnative/pulsar/handlers/mqtt/proxy/web/admin/WebResource.java @@ -20,6 +20,11 @@ import io.streamnative.pulsar.handlers.mqtt.common.MQTTCommonConfiguration; import io.streamnative.pulsar.handlers.mqtt.proxy.MQTTProxyService; import io.streamnative.pulsar.handlers.mqtt.proxy.web.WebService; +import jakarta.ws.rs.WebApplicationException; +import jakarta.ws.rs.container.AsyncResponse; +import jakarta.ws.rs.core.Context; +import jakarta.ws.rs.core.Response.Status; +import jakarta.ws.rs.core.UriInfo; import java.time.Duration; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; @@ -27,11 +32,6 @@ import java.util.function.Supplier; import javax.servlet.ServletContext; import javax.servlet.http.HttpServletRequest; -import jakarta.ws.rs.WebApplicationException; -import jakarta.ws.rs.container.AsyncResponse; -import jakarta.ws.rs.core.Context; -import jakarta.ws.rs.core.Response.Status; -import jakarta.ws.rs.core.UriInfo; import org.apache.pulsar.broker.authentication.AuthenticationDataSource; import org.apache.pulsar.broker.service.BrokerServiceException; import org.apache.pulsar.broker.web.AuthenticationFilter; From a6205e7e8bfcc4ad4157ae1c714ce9fb1d575b1e Mon Sep 17 00:00:00 2001 From: Jiwei Guo Date: Thu, 2 Jul 2026 20:16:13 +0800 Subject: [PATCH 3/8] fix checkstyle --- pom.xml | 2 -- tests/pom.xml | 1 - 2 files changed, 3 deletions(-) diff --git a/pom.xml b/pom.xml index 4775f2b90..e5fc8b492 100644 --- a/pom.xml +++ b/pom.xml @@ -70,7 +70,6 @@ 3.1.2 3.1.8 0.8.7 - 1.56.0 2.0.1.Final 2.42 1.6.15 @@ -132,7 +131,6 @@ io.opentelemetry opentelemetry-sdk-extension-autoconfigure-spi - ${opentelemetry.version} org.slf4j diff --git a/tests/pom.xml b/tests/pom.xml index d8d0b9215..2e9e1dad9 100644 --- a/tests/pom.xml +++ b/tests/pom.xml @@ -84,7 +84,6 @@ io.opentelemetry opentelemetry-sdk-extension-autoconfigure-spi - ${opentelemetry.version} test From 6ac6cf27851f5acfe1794fd8ccd7f7f4cd71da5f Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Thu, 2 Jul 2026 12:33:14 +0000 Subject: [PATCH 4/8] test: stabilize ProxyHttpTest admin endpoint assertion --- .../mqtt3/fusesource/proxy/ProxyHttpTest.java | 32 +++++++++++-------- 1 file changed, 18 insertions(+), 14 deletions(-) diff --git a/tests/src/test/java/io/streamnative/pulsar/handlers/mqtt/mqtt3/fusesource/proxy/ProxyHttpTest.java b/tests/src/test/java/io/streamnative/pulsar/handlers/mqtt/mqtt3/fusesource/proxy/ProxyHttpTest.java index ebb9db1f8..c8ad2516d 100644 --- a/tests/src/test/java/io/streamnative/pulsar/handlers/mqtt/mqtt3/fusesource/proxy/ProxyHttpTest.java +++ b/tests/src/test/java/io/streamnative/pulsar/handlers/mqtt/mqtt3/fusesource/proxy/ProxyHttpTest.java @@ -23,11 +23,13 @@ import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.List; +import java.util.concurrent.TimeUnit; import lombok.extern.slf4j.Slf4j; import org.apache.http.HttpResponse; import org.apache.http.client.HttpClient; import org.apache.http.client.methods.HttpGet; import org.apache.http.impl.client.HttpClientBuilder; +import org.awaitility.Awaitility; import org.fusesource.mqtt.client.BlockingConnection; import org.fusesource.mqtt.client.MQTT; import org.fusesource.mqtt.client.QoS; @@ -48,8 +50,8 @@ protected MQTTCommonConfiguration initConfig() throws Exception { @Test public void testGetDeviceList() throws Exception { - int index = random.nextInt(mqttProxyPortList.size()); List mqttProxyPortList = getMqttProxyPortList(); + int index = random.nextInt(mqttProxyPortList.size()); List mqttProxyHttpPortList = getMqttProxyHttpPortList(); MQTT mqttProducer = new MQTT(); int port = mqttProxyPortList.get(index); @@ -62,19 +64,21 @@ public void testGetDeviceList() throws Exception { Thread.sleep(4000); HttpClient httpClient = HttpClientBuilder.create().build(); final String mopEndPoint = "http://localhost:" + mqttProxyHttpPortList.get(index) + "/admin/devices/list"; - HttpResponse response = httpClient.execute(new HttpGet(mopEndPoint)); - InputStream inputStream = response.getEntity().getContent(); - InputStreamReader isReader = new InputStreamReader(inputStream); - BufferedReader reader = new BufferedReader(isReader); - StringBuffer buffer = new StringBuffer(); - String str; - while ((str = reader.readLine()) != null){ - buffer.append(str); - } - String ret = buffer.toString(); - ArrayList deviceList = new Gson().fromJson(ret, ArrayList.class); - Assert.assertEquals(deviceList.size(), 1); - Assert.assertTrue(deviceList.contains(clientId)); + Awaitility.await().atMost(30, TimeUnit.SECONDS).untilAsserted(() -> { + HttpResponse response = httpClient.execute(new HttpGet(mopEndPoint)); + InputStream inputStream = response.getEntity().getContent(); + InputStreamReader isReader = new InputStreamReader(inputStream); + BufferedReader reader = new BufferedReader(isReader); + StringBuffer buffer = new StringBuffer(); + String str; + while ((str = reader.readLine()) != null){ + buffer.append(str); + } + String ret = buffer.toString(); + ArrayList deviceList = new Gson().fromJson(ret, ArrayList.class); + Assert.assertEquals(deviceList.size(), 1); + Assert.assertTrue(deviceList.contains(clientId)); + }); } } From 6fda08aa9f4a349090ebdbc32e78548bb42f59d0 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Thu, 2 Jul 2026 12:51:00 +0000 Subject: [PATCH 5/8] Fix ProxyHttpTest: add ignoreExceptions() to Awaitility to retry on connection refused --- .../handlers/mqtt/mqtt3/fusesource/proxy/ProxyHttpTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/src/test/java/io/streamnative/pulsar/handlers/mqtt/mqtt3/fusesource/proxy/ProxyHttpTest.java b/tests/src/test/java/io/streamnative/pulsar/handlers/mqtt/mqtt3/fusesource/proxy/ProxyHttpTest.java index c8ad2516d..4c36edb6c 100644 --- a/tests/src/test/java/io/streamnative/pulsar/handlers/mqtt/mqtt3/fusesource/proxy/ProxyHttpTest.java +++ b/tests/src/test/java/io/streamnative/pulsar/handlers/mqtt/mqtt3/fusesource/proxy/ProxyHttpTest.java @@ -64,7 +64,7 @@ public void testGetDeviceList() throws Exception { Thread.sleep(4000); HttpClient httpClient = HttpClientBuilder.create().build(); final String mopEndPoint = "http://localhost:" + mqttProxyHttpPortList.get(index) + "/admin/devices/list"; - Awaitility.await().atMost(30, TimeUnit.SECONDS).untilAsserted(() -> { + Awaitility.await().atMost(30, TimeUnit.SECONDS).ignoreExceptions().untilAsserted(() -> { HttpResponse response = httpClient.execute(new HttpGet(mopEndPoint)); InputStream inputStream = response.getEntity().getContent(); InputStreamReader isReader = new InputStreamReader(inputStream); From 51f101a9e71a0fd9f2218cb3100e23c6dbf1d6de Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Thu, 2 Jul 2026 13:08:08 +0000 Subject: [PATCH 6/8] Fix ProxyHttpTest: call webService.start() in MQTTProxyService.start0() --- .../pulsar/handlers/mqtt/proxy/MQTTProxyService.java | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/mqtt-proxy/src/main/java/io/streamnative/pulsar/handlers/mqtt/proxy/MQTTProxyService.java b/mqtt-proxy/src/main/java/io/streamnative/pulsar/handlers/mqtt/proxy/MQTTProxyService.java index cd4b903f3..05f89eec5 100644 --- a/mqtt-proxy/src/main/java/io/streamnative/pulsar/handlers/mqtt/proxy/MQTTProxyService.java +++ b/mqtt-proxy/src/main/java/io/streamnative/pulsar/handlers/mqtt/proxy/MQTTProxyService.java @@ -264,6 +264,7 @@ public void start0() throws MQTTProxyException { } this.lookupHandler = new PulsarServiceLookupHandler(pulsarService, proxyConfig); this.eventService.start(); + this.webService.start(); } @Override @@ -294,5 +295,10 @@ public void close() { if (sslContextRefresher != null) { sslContextRefresher.shutdownNow(); } + try { + this.webService.close(); + } catch (Exception e) { + log.error("Failed to close web service.", e); + } } } From 751b145e732631f0643a1cc9f1431ee8c7c4638a Mon Sep 17 00:00:00 2001 From: Jiwei Guo Date: Thu, 2 Jul 2026 21:25:48 +0800 Subject: [PATCH 7/8] Fix proxy web servlet Jakarta migration --- .../handlers/mqtt/broker/rest/MQTTServiceServlet.java | 8 ++++---- mqtt-proxy/pom.xml | 9 +++++---- .../pulsar/handlers/mqtt/proxy/web/WebService.java | 6 +++--- .../handlers/mqtt/proxy/web/admin/WebResource.java | 4 ++-- pom.xml | 11 ++++++++++- 5 files changed, 24 insertions(+), 14 deletions(-) diff --git a/mqtt-broker/src/main/java/io/streamnative/pulsar/handlers/mqtt/broker/rest/MQTTServiceServlet.java b/mqtt-broker/src/main/java/io/streamnative/pulsar/handlers/mqtt/broker/rest/MQTTServiceServlet.java index 6f201ac17..603c4fe5b 100644 --- a/mqtt-broker/src/main/java/io/streamnative/pulsar/handlers/mqtt/broker/rest/MQTTServiceServlet.java +++ b/mqtt-broker/src/main/java/io/streamnative/pulsar/handlers/mqtt/broker/rest/MQTTServiceServlet.java @@ -15,16 +15,16 @@ import com.google.common.base.Splitter; import io.streamnative.pulsar.handlers.mqtt.common.systemtopic.PSKEvent; +import jakarta.servlet.ServletException; +import jakarta.servlet.http.HttpServlet; +import jakarta.servlet.http.HttpServletRequest; +import jakarta.servlet.http.HttpServletResponse; import java.io.BufferedReader; import java.io.IOException; import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; import java.net.URLDecoder; import java.nio.charset.StandardCharsets; -import javax.servlet.ServletException; -import javax.servlet.http.HttpServlet; -import javax.servlet.http.HttpServletRequest; -import javax.servlet.http.HttpServletResponse; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.tuple.Pair; diff --git a/mqtt-proxy/pom.xml b/mqtt-proxy/pom.xml index 94849fc63..9e4ed8d69 100644 --- a/mqtt-proxy/pom.xml +++ b/mqtt-proxy/pom.xml @@ -41,8 +41,9 @@ jetty-server - org.eclipse.jetty.ee8 - jetty-ee8-servlet + org.eclipse.jetty.ee10 + jetty-ee10-servlet + ${jetty.ee10.version} org.glassfish.jersey.core @@ -68,8 +69,8 @@ jakarta.ws.rs-api - javax.servlet - javax.servlet-api + jakarta.servlet + jakarta.servlet-api com.google.protobuf diff --git a/mqtt-proxy/src/main/java/io/streamnative/pulsar/handlers/mqtt/proxy/web/WebService.java b/mqtt-proxy/src/main/java/io/streamnative/pulsar/handlers/mqtt/proxy/web/WebService.java index edb1e93b9..731d14c7e 100644 --- a/mqtt-proxy/src/main/java/io/streamnative/pulsar/handlers/mqtt/proxy/web/WebService.java +++ b/mqtt-proxy/src/main/java/io/streamnative/pulsar/handlers/mqtt/proxy/web/WebService.java @@ -32,8 +32,8 @@ import org.apache.pulsar.broker.web.WebExecutorThreadPool; import org.apache.pulsar.common.util.PulsarSslFactory; import org.apache.pulsar.jetty.metrics.JettyStatisticsCollector; -import org.eclipse.jetty.ee8.servlet.ServletContextHandler; -import org.eclipse.jetty.ee8.servlet.ServletHolder; +import org.eclipse.jetty.ee10.servlet.ServletContextHandler; +import org.eclipse.jetty.ee10.servlet.ServletHolder; import org.eclipse.jetty.server.ConnectionFactory; import org.eclipse.jetty.server.ForwardedRequestCustomizer; import org.eclipse.jetty.server.Handler; @@ -174,7 +174,7 @@ public void addServlet(String path, ServletHolder servletHolder, boolean require if (attributeMap != null) { attributeMap.forEach(servletContextHandler::setAttribute); } - handlers.add(servletContextHandler.get()); + handlers.add(servletContextHandler); } public void addStaticResources(String basePath, String resourcePath) { diff --git a/mqtt-proxy/src/main/java/io/streamnative/pulsar/handlers/mqtt/proxy/web/admin/WebResource.java b/mqtt-proxy/src/main/java/io/streamnative/pulsar/handlers/mqtt/proxy/web/admin/WebResource.java index 9c99fd5f1..da50906a1 100644 --- a/mqtt-proxy/src/main/java/io/streamnative/pulsar/handlers/mqtt/proxy/web/admin/WebResource.java +++ b/mqtt-proxy/src/main/java/io/streamnative/pulsar/handlers/mqtt/proxy/web/admin/WebResource.java @@ -20,6 +20,8 @@ import io.streamnative.pulsar.handlers.mqtt.common.MQTTCommonConfiguration; import io.streamnative.pulsar.handlers.mqtt.proxy.MQTTProxyService; import io.streamnative.pulsar.handlers.mqtt.proxy.web.WebService; +import jakarta.servlet.ServletContext; +import jakarta.servlet.http.HttpServletRequest; import jakarta.ws.rs.WebApplicationException; import jakarta.ws.rs.container.AsyncResponse; import jakarta.ws.rs.core.Context; @@ -30,8 +32,6 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeoutException; import java.util.function.Supplier; -import javax.servlet.ServletContext; -import javax.servlet.http.HttpServletRequest; import org.apache.pulsar.broker.authentication.AuthenticationDataSource; import org.apache.pulsar.broker.service.BrokerServiceException; import org.apache.pulsar.broker.web.AuthenticationFilter; diff --git a/pom.xml b/pom.xml index e5fc8b492..24d27c979 100644 --- a/pom.xml +++ b/pom.xml @@ -71,7 +71,8 @@ 3.1.8 0.8.7 2.0.1.Final - 2.42 + 3.1.10 + 12.1.10 1.6.15 2.5.2 @@ -85,6 +86,14 @@ netty-codec-http2 io.netty + + jetty-ee8-servlet + org.eclipse.jetty.ee8 + + + javax.servlet-api + javax.servlet + From ee1e9fce89894695ef7d1f3a76b0267da8a23356 Mon Sep 17 00:00:00 2001 From: Jiwei Guo Date: Thu, 2 Jul 2026 21:47:18 +0800 Subject: [PATCH 8/8] Fix broker additional servlet stats endpoint --- mqtt-broker/pom.xml | 5 +++ .../mqtt/broker/rest/MQTTServiceServlet.java | 42 +++++++++++++++---- 2 files changed, 39 insertions(+), 8 deletions(-) diff --git a/mqtt-broker/pom.xml b/mqtt-broker/pom.xml index 4c09fd1be..98d8319ff 100644 --- a/mqtt-broker/pom.xml +++ b/mqtt-broker/pom.xml @@ -33,6 +33,11 @@ pulsar-protocol-handler-mqtt-proxy ${project.version} + + javax.servlet + javax.servlet-api + provided + diff --git a/mqtt-broker/src/main/java/io/streamnative/pulsar/handlers/mqtt/broker/rest/MQTTServiceServlet.java b/mqtt-broker/src/main/java/io/streamnative/pulsar/handlers/mqtt/broker/rest/MQTTServiceServlet.java index 603c4fe5b..be069f2f2 100644 --- a/mqtt-broker/src/main/java/io/streamnative/pulsar/handlers/mqtt/broker/rest/MQTTServiceServlet.java +++ b/mqtt-broker/src/main/java/io/streamnative/pulsar/handlers/mqtt/broker/rest/MQTTServiceServlet.java @@ -15,16 +15,16 @@ import com.google.common.base.Splitter; import io.streamnative.pulsar.handlers.mqtt.common.systemtopic.PSKEvent; -import jakarta.servlet.ServletException; -import jakarta.servlet.http.HttpServlet; -import jakarta.servlet.http.HttpServletRequest; -import jakarta.servlet.http.HttpServletResponse; import java.io.BufferedReader; import java.io.IOException; import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; import java.net.URLDecoder; import java.nio.charset.StandardCharsets; +import javax.servlet.ServletException; +import javax.servlet.http.HttpServlet; +import javax.servlet.http.HttpServletRequest; +import javax.servlet.http.HttpServletResponse; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.tuple.Pair; @@ -42,7 +42,7 @@ public class MQTTServiceServlet extends HttpServlet { // Define transient by spotbugs private final transient PulsarService pulsar; - private static volatile Pair metricsCollectorRef; + private volatile Pair metricsCollectorRef; private volatile Object mqttService; @@ -56,7 +56,8 @@ public MQTTServiceServlet(PulsarService pulsar) { protected void doGet(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException { response.setContentType("text/plain"); - if ("/stats".equals(getRequestPath(request))) { + String requestPath = getRequestPath(request); + if ("/stats".equals(requestPath)) { response.setStatus(HttpStatus.OK_200); response.getOutputStream().write(getJsonStats()); } else { @@ -125,6 +126,7 @@ private byte[] getJsonStats() { Pair metricsCollector = getMetricsCollector(); return (byte[]) metricsCollector.getRight().invoke(metricsCollector.getLeft()); } catch (Throwable ex) { + log.warn("Failed to get MQTT JSON stats", ex); return ex.getMessage().getBytes(StandardCharsets.UTF_8); } } @@ -134,7 +136,7 @@ private Object getMqttService() throws IllegalAccessException, InvocationTargetE if (mqttService == null) { synchronized (LOCK) { if (mqttService == null) { - ProtocolHandler protocolHandler = pulsar.getProtocolHandlers().protocol("mqtt"); + ProtocolHandler protocolHandler = getProtocolHandler(); Method mqttServiceMethod = getMethod(protocolHandler.getClass(), "getMqttService"); mqttService = mqttServiceMethod.invoke(protocolHandler); } @@ -143,6 +145,17 @@ private Object getMqttService() throws IllegalAccessException, InvocationTargetE return mqttService; } + private ProtocolHandler getProtocolHandler() throws IllegalAccessException, InvocationTargetException, + NoSuchMethodException { + ProtocolHandler protocolHandler = pulsar.getProtocolHandlers().protocol("mqtt"); + Method mqttServiceMethod = getOptionalMethod(protocolHandler.getClass(), "getMqttService"); + if (mqttServiceMethod != null) { + return protocolHandler; + } + Method getHandlerMethod = getMethod(protocolHandler.getClass(), "getHandler"); + return (ProtocolHandler) getHandlerMethod.invoke(protocolHandler); + } + private Pair getMetricsCollector() throws IllegalAccessException, InvocationTargetException, NoSuchMethodException { if (metricsCollectorRef == null) { @@ -160,7 +173,20 @@ private Pair getMetricsCollector() throws IllegalAccessException private Method getMethod(Class clazz, String methodName, Class... parameterTypes) throws NoSuchMethodException { - Method method = clazz.getMethod(methodName, parameterTypes); + Method method = getOptionalMethod(clazz, methodName, parameterTypes); + if (method == null) { + throw new NoSuchMethodException(clazz.getName() + "." + methodName); + } + return method; + } + + private Method getOptionalMethod(Class clazz, String methodName, Class... parameterTypes) { + Method method; + try { + method = clazz.getMethod(methodName, parameterTypes); + } catch (NoSuchMethodException e) { + return null; + } method.setAccessible(true); return method; }