From a2b144a59054bc31e71ee0b90ef7ba458f92f543 Mon Sep 17 00:00:00 2001 From: Jiwe Guo Date: Thu, 21 Nov 2024 21:51:41 +0800 Subject: [PATCH 1/5] init --- .../MQTTAuthenticationService.java | 4 +- .../common/event/PulsarEventCenterImpl.java | 6 +- .../mqtt/proxy/MQTTProxyProtocolHandler.java | 105 +++++++++++++----- .../handlers/mqtt/proxy/MQTTProxyService.java | 27 +++-- .../mqtt/proxy/MQTTProxyServiceConfig.java | 25 +++++ .../handler/PulsarServiceLookupHandler.java | 53 +-------- pom.xml | 24 ++++ 7 files changed, 152 insertions(+), 92 deletions(-) create mode 100644 mqtt-proxy/src/main/java/io/streamnative/pulsar/handlers/mqtt/proxy/MQTTProxyServiceConfig.java diff --git a/mqtt-common/src/main/java/io/streamnative/pulsar/handlers/mqtt/common/authentication/MQTTAuthenticationService.java b/mqtt-common/src/main/java/io/streamnative/pulsar/handlers/mqtt/common/authentication/MQTTAuthenticationService.java index c4cfa26c..a37b4f3d 100644 --- a/mqtt-common/src/main/java/io/streamnative/pulsar/handlers/mqtt/common/authentication/MQTTAuthenticationService.java +++ b/mqtt-common/src/main/java/io/streamnative/pulsar/handlers/mqtt/common/authentication/MQTTAuthenticationService.java @@ -45,8 +45,8 @@ public class MQTTAuthenticationService { @Getter private final Map authenticationProviders; - public MQTTAuthenticationService(BrokerService brokerService, List authenticationMethods) { - this.authenticationService = brokerService.getAuthenticationService(); + public MQTTAuthenticationService(AuthenticationService authenticationService, List authenticationMethods) { + this.authenticationService = authenticationService; this.authenticationProviders = getAuthenticationProviders(authenticationMethods); } diff --git a/mqtt-common/src/main/java/io/streamnative/pulsar/handlers/mqtt/common/event/PulsarEventCenterImpl.java b/mqtt-common/src/main/java/io/streamnative/pulsar/handlers/mqtt/common/event/PulsarEventCenterImpl.java index 687700b0..5cf0b7be 100644 --- a/mqtt-common/src/main/java/io/streamnative/pulsar/handlers/mqtt/common/event/PulsarEventCenterImpl.java +++ b/mqtt-common/src/main/java/io/streamnative/pulsar/handlers/mqtt/common/event/PulsarEventCenterImpl.java @@ -21,6 +21,7 @@ import org.apache.bookkeeper.common.util.OrderedExecutor; import org.apache.commons.collections4.CollectionUtils; import org.apache.pulsar.broker.service.BrokerService; +import org.apache.pulsar.metadata.api.MetadataStore; import org.apache.pulsar.metadata.api.Notification; @Slf4j @@ -29,13 +30,12 @@ public class PulsarEventCenterImpl implements Consumer, PulsarEven private final OrderedExecutor callbackExecutor; @SuppressWarnings("UnstableApiUsage") - public PulsarEventCenterImpl(BrokerService brokerService, int poolThreadNum) { + public PulsarEventCenterImpl(MetadataStore configurationMetadataStore, int poolThreadNum) { this.listeners = new CopyOnWriteArrayList<>(); this.callbackExecutor = OrderedExecutor.newBuilder() .numThreads(poolThreadNum) .name("mqtt-notification-workers").build(); - brokerService.getPulsar() - .getConfigurationMetadataStore().registerListener(this); + configurationMetadataStore.registerListener(this); } diff --git a/mqtt-proxy/src/main/java/io/streamnative/pulsar/handlers/mqtt/proxy/MQTTProxyProtocolHandler.java b/mqtt-proxy/src/main/java/io/streamnative/pulsar/handlers/mqtt/proxy/MQTTProxyProtocolHandler.java index 6d86fc9f..1170a805 100644 --- a/mqtt-proxy/src/main/java/io/streamnative/pulsar/handlers/mqtt/proxy/MQTTProxyProtocolHandler.java +++ b/mqtt-proxy/src/main/java/io/streamnative/pulsar/handlers/mqtt/proxy/MQTTProxyProtocolHandler.java @@ -18,6 +18,8 @@ import static io.streamnative.pulsar.handlers.mqtt.common.utils.ConfigurationUtils.PROTOCOL_PROXY_NAME; import static io.streamnative.pulsar.handlers.mqtt.common.utils.ConfigurationUtils.PROXY_PREFIX; import static io.streamnative.pulsar.handlers.mqtt.common.utils.ConfigurationUtils.getProxyListenerPort; +import static org.apache.commons.lang3.StringUtils.isNotBlank; + import com.google.common.collect.ImmutableMap; import io.netty.channel.ChannelInitializer; import io.netty.channel.socket.SocketChannel; @@ -30,32 +32,41 @@ import java.util.concurrent.ScheduledExecutorService; import lombok.Getter; import lombok.extern.slf4j.Slf4j; -import org.apache.pulsar.broker.ServiceConfiguration; import org.apache.pulsar.broker.ServiceConfigurationUtils; -import org.apache.pulsar.broker.protocol.ProtocolHandler; -import org.apache.pulsar.broker.service.BrokerService; +import org.apache.pulsar.client.api.AuthenticationFactory; +import org.apache.pulsar.client.api.PulsarClientException; +import org.apache.pulsar.client.impl.PulsarClientImpl; +import org.apache.pulsar.client.impl.conf.ClientConfigurationData; +import org.apache.pulsar.proxy.extensions.ProxyExtension; +import org.apache.pulsar.proxy.server.ProxyConfiguration; +import org.apache.pulsar.proxy.server.ProxyService; /** * MQTT Protocol Handler load and run by Pulsar Service. */ @Slf4j -public class MQTTProxyProtocolHandler implements ProtocolHandler { +public class MQTTProxyProtocolHandler implements ProxyExtension { @Getter private MQTTProxyConfiguration proxyConfig; @Getter - private BrokerService brokerService; + private ProxyService proxyService; @Getter private String bindAddress; - private MQTTProxyService proxyService; + @Getter + private String advertisedAddress; + + private MQTTProxyService mqttProxyService; private ScheduledExecutorService sslContextRefresher; + private MQTTProxyServiceConfig proxyServiceConfig; + @Override - public String protocolName() { + public String extensionName() { return PROTOCOL_PROXY_NAME; } @@ -65,40 +76,81 @@ public boolean accept(String protocol) { } @Override - public void initialize(ServiceConfiguration conf) throws Exception { - // init config + public void initialize(ProxyConfiguration conf) throws Exception { proxyConfig = ConfigurationUtils.create(conf.getProperties(), MQTTProxyConfiguration.class); // We have to enable ack batch message individual. proxyConfig.setAcknowledgmentAtBatchIndexLevelEnabled(true); this.bindAddress = ServiceConfigurationUtils.getDefaultOrConfiguredAddress(proxyConfig.getBindAddress()); + this.advertisedAddress = + ServiceConfigurationUtils.getDefaultOrConfiguredAddress(proxyConfig.getAdvertisedAddress()); } @Override - public String getProtocolDataToAdvertise() { - if (log.isDebugEnabled()) { - log.debug("Get configured listener: {}", proxyConfig.getMqttProxyListeners()); - } - return proxyConfig.getMqttProxyListeners(); - } - - @Override - public void start(BrokerService brokerService) { - this.brokerService = brokerService; + public void start(ProxyService service) { + this.proxyService = service; try { - proxyService = new MQTTProxyService(brokerService, proxyConfig); - proxyService.start0(); + this.proxyServiceConfig = initProxyConfig(); + this.mqttProxyService = new MQTTProxyService(proxyServiceConfig); + this.mqttProxyService.start0(); log.info("Start MQTT proxy service "); } catch (Exception ex) { log.error("Failed to start MQTT proxy service.", ex); } } + private MQTTProxyServiceConfig initProxyConfig() { + MQTTProxyServiceConfig config = new MQTTProxyServiceConfig(); + config.setBindAddress(bindAddress); + config.setAdvertisedAddress(advertisedAddress); + config.setProxyConfiguration(proxyConfig); +// config.setLocalMetadataStore(proxyService.); + config.setPulsarClient(getClient()); + return config; + } + + private PulsarClientImpl getClient() { + ClientConfigurationData conf = new ClientConfigurationData(); + conf.setServiceUrl(proxyConfig.isTlsEnabled() + ? pulsarService.getBrokerServiceUrlTls() : pulsarService.getBrokerServiceUrl()); + conf.setTlsAllowInsecureConnection(proxyConfig.isTlsAllowInsecureConnection()); + conf.setTlsTrustCertsFilePath(proxyConfig.getTlsCertificateFilePath()); + + if (proxyConfig.isBrokerClientTlsEnabled()) { + if (proxyConfig.isBrokerClientTlsEnabledWithKeyStore()) { + conf.setUseKeyStoreTls(true); + conf.setTlsTrustStoreType(proxyConfig.getBrokerClientTlsTrustStoreType()); + conf.setTlsTrustStorePath(proxyConfig.getBrokerClientTlsTrustStore()); + conf.setTlsTrustStorePassword(proxyConfig.getBrokerClientTlsTrustStorePassword()); + } else { + conf.setTlsTrustCertsFilePath( + isNotBlank(proxyConfig.getBrokerClientTrustCertsFilePath()) + ? proxyConfig.getBrokerClientTrustCertsFilePath() + : proxyConfig.getTlsCertificateFilePath()); + } + } + + try { + if (isNotBlank(proxyConfig.getBrokerClientAuthenticationPlugin())) { + conf.setAuthPluginClassName(proxyConfig.getBrokerClientAuthenticationPlugin()); + conf.setAuthParams(proxyConfig.getBrokerClientAuthenticationParameters()); + conf.setAuthParamMap(null); + conf.setAuthentication(AuthenticationFactory.create( + proxyConfig.getBrokerClientAuthenticationPlugin(), + proxyConfig.getBrokerClientAuthenticationParameters())); + } + return new PulsarClientImpl(conf); + } catch (PulsarClientException e) { + log.error("Failed to create PulsarClient", e); + throw new IllegalArgumentException(e); + } + } + @Override public Map> newChannelInitializers() { try { checkArgument(proxyConfig != null); checkArgument(proxyConfig.getMqttProxyListeners() != null); - checkArgument(brokerService != null); + checkArgument(proxyService != null); this.sslContextRefresher = Executors.newSingleThreadScheduledExecutor( new DefaultThreadFactory("mop-ssl-context-refresher")); @@ -111,10 +163,9 @@ public Map> newChannelIniti for (String listener: parts) { if (listener.startsWith(PROXY_PREFIX)) { builder.put( - new InetSocketAddress(brokerService.pulsar().getBindAddress(), - getProxyListenerPort(listener)), + new InetSocketAddress(bindAddress, getProxyListenerPort(listener)), new MQTTProxyChannelInitializer( - proxyService, proxyConfig, false, false, sslContextRefresher)); + mqttProxyService, proxyConfig, false, false, sslContextRefresher)); } } return builder.build(); @@ -129,8 +180,8 @@ public void close() { if (sslContextRefresher != null) { sslContextRefresher.shutdownNow(); } - if (proxyService != null) { - proxyService.close(); + if (mqttProxyService != null) { + mqttProxyService.close(); } } diff --git a/mqtt-proxy/src/main/java/io/streamnative/pulsar/handlers/mqtt/proxy/MQTTProxyService.java b/mqtt-proxy/src/main/java/io/streamnative/pulsar/handlers/mqtt/proxy/MQTTProxyService.java index 7651a046..084b33c1 100644 --- a/mqtt-proxy/src/main/java/io/streamnative/pulsar/handlers/mqtt/proxy/MQTTProxyService.java +++ b/mqtt-proxy/src/main/java/io/streamnative/pulsar/handlers/mqtt/proxy/MQTTProxyService.java @@ -43,6 +43,7 @@ import org.apache.pulsar.broker.PulsarService; import org.apache.pulsar.broker.service.BrokerService; import org.apache.pulsar.common.util.netty.EventLoopUtil; +import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended; /** * This service is used for redirecting MQTT client request to proper MQTT protocol handler Broker. @@ -53,8 +54,6 @@ public class MQTTProxyService implements Closeable { @Getter private final MQTTProxyConfiguration proxyConfig; @Getter - private final PulsarService pulsarService; - @Getter private final MQTTAuthenticationService authenticationService; @Getter private final MQTTConnectionManager connectionManager; @@ -68,6 +67,8 @@ public class MQTTProxyService implements Closeable { private final PSKConfiguration pskConfiguration; @Getter private final MQTTProxyAdapter proxyAdapter; + @Getter + private final MQTTProxyServiceConfig proxyServiceConfig; private Channel listenChannel; private Channel listenChannelTls; @@ -79,25 +80,27 @@ public class MQTTProxyService implements Closeable { private DefaultThreadFactory workerThreadFactory = new DefaultThreadFactory("mqtt-redirect-io"); private ScheduledExecutorService sslContextRefresher; - public MQTTProxyService(BrokerService brokerService, MQTTProxyConfiguration proxyConfig) { - configValid(proxyConfig); - this.pulsarService = brokerService.getPulsar(); - this.proxyConfig = proxyConfig; + public MQTTProxyService(MQTTProxyServiceConfig proxyServiceConfig) { + this.proxyServiceConfig = proxyServiceConfig; + configValid(proxyServiceConfig.getProxyConfiguration()); + this.proxyConfig = proxyServiceConfig.getProxyConfiguration(); + this.pskConfiguration = new PSKConfiguration(proxyConfig.getMqttTlsPskIdentityHint(), proxyConfig.getMqttTlsPskIdentity(), proxyConfig.getMqttTlsPskIdentityFile(), proxyConfig.getMqttTlsProtocols(), proxyConfig.getMqttTlsCiphers()); this.authenticationService = proxyConfig.isMqttAuthenticationEnabled() - ? new MQTTAuthenticationService(brokerService, proxyConfig.getMqttAuthenticationMethods()) : null; + ? new MQTTAuthenticationService(proxyServiceConfig.getAuthenticationService() + , proxyConfig.getMqttAuthenticationMethods()) : null; if (authenticationService != null && proxyConfig.isMqttProxyMTlsAuthenticationEnabled()) { AuthenticationProviderMTls providerMTls = new AuthenticationProviderMTls(); try { - providerMTls.initialize(brokerService.pulsar().getLocalMetadataStore()); + providerMTls.initialize(proxyServiceConfig.getLocalMetadataStore()); authenticationService.getAuthenticationProviders().put(AUTH_MTLS, providerMTls); } catch (Exception e) { log.error("Failed to initialize MQTT authentication method {} ", AUTH_MTLS, e); } } - this.connectionManager = new MQTTConnectionManager(pulsarService.getAdvertisedAddress()); + this.connectionManager = new MQTTConnectionManager(proxyServiceConfig.getAdvertisedAddress()); this.eventService = proxyConfig.isSystemEventEnabled() ? new SystemTopicBasedSystemEventService(pulsarService) : new DisabledSystemEventService(); @@ -107,7 +110,7 @@ public MQTTProxyService(BrokerService brokerService, MQTTProxyConfiguration prox false, acceptorThreadFactory); this.workerGroup = EventLoopUtil.newEventLoopGroup(proxyConfig.getMqttProxyNumIOThreads(), false, workerThreadFactory); - this.eventCenter = new PulsarEventCenterImpl(brokerService, + this.eventCenter = new PulsarEventCenterImpl(proxyServiceConfig.getConfigMetadataStore(), proxyConfig.getEventCenterCallbackPoolThreadNum()); this.proxyAdapter = new MQTTProxyAdapter(this); this.sslContextRefresher = Executors.newSingleThreadScheduledExecutor( @@ -166,7 +169,7 @@ public void start() throws MQTTProxyException { throw new MQTTProxyException(e); } } - this.lookupHandler = new PulsarServiceLookupHandler(pulsarService, proxyConfig); + this.lookupHandler = new PulsarServiceLookupHandler(proxyServiceConfig); this.eventService.start(); } @@ -207,7 +210,7 @@ public void start0() throws MQTTProxyException { throw new MQTTProxyException(e); } } - this.lookupHandler = new PulsarServiceLookupHandler(pulsarService, proxyConfig); + this.lookupHandler = new PulsarServiceLookupHandler(proxyServiceConfig); this.eventService.start(); } diff --git a/mqtt-proxy/src/main/java/io/streamnative/pulsar/handlers/mqtt/proxy/MQTTProxyServiceConfig.java b/mqtt-proxy/src/main/java/io/streamnative/pulsar/handlers/mqtt/proxy/MQTTProxyServiceConfig.java new file mode 100644 index 00000000..e4c16885 --- /dev/null +++ b/mqtt-proxy/src/main/java/io/streamnative/pulsar/handlers/mqtt/proxy/MQTTProxyServiceConfig.java @@ -0,0 +1,25 @@ +package io.streamnative.pulsar.handlers.mqtt.proxy; + + +import lombok.Data; +import org.apache.pulsar.broker.authentication.AuthenticationService; +import org.apache.pulsar.client.impl.PulsarClientImpl; +import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended; + +@Data +public class MQTTProxyServiceConfig { + + private MQTTProxyConfiguration proxyConfiguration; + + private MetadataStoreExtended localMetadataStore; + + private MetadataStoreExtended configMetadataStore; + + private AuthenticationService authenticationService; + + private PulsarClientImpl pulsarClient; + + private String bindAddress; + + private String advertisedAddress; +} diff --git a/mqtt-proxy/src/main/java/io/streamnative/pulsar/handlers/mqtt/proxy/handler/PulsarServiceLookupHandler.java b/mqtt-proxy/src/main/java/io/streamnative/pulsar/handlers/mqtt/proxy/handler/PulsarServiceLookupHandler.java index 7d0497f4..15335c87 100644 --- a/mqtt-proxy/src/main/java/io/streamnative/pulsar/handlers/mqtt/proxy/handler/PulsarServiceLookupHandler.java +++ b/mqtt-proxy/src/main/java/io/streamnative/pulsar/handlers/mqtt/proxy/handler/PulsarServiceLookupHandler.java @@ -13,7 +13,6 @@ */ package io.streamnative.pulsar.handlers.mqtt.proxy.handler; -import static org.apache.commons.lang3.StringUtils.isNotBlank; import io.streamnative.pulsar.handlers.mqtt.common.utils.ConfigurationUtils; import io.streamnative.pulsar.handlers.mqtt.proxy.MQTTProxyConfiguration; import java.net.InetSocketAddress; @@ -26,15 +25,13 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; import java.util.stream.Collectors; +import io.streamnative.pulsar.handlers.mqtt.proxy.MQTTProxyServiceConfig; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.tuple.Pair; -import org.apache.pulsar.broker.PulsarService; import org.apache.pulsar.broker.loadbalance.LoadManager; import org.apache.pulsar.broker.service.BrokerServiceException; -import org.apache.pulsar.client.api.AuthenticationFactory; import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.impl.PulsarClientImpl; -import org.apache.pulsar.client.impl.conf.ClientConfigurationData; import org.apache.pulsar.client.util.ExecutorProvider; import org.apache.pulsar.client.util.ScheduledExecutorProvider; import org.apache.pulsar.common.naming.TopicName; @@ -53,18 +50,15 @@ public class PulsarServiceLookupHandler implements LookupHandler { private final String protocolHandlerName = "mqtt"; private final PulsarClientImpl pulsarClient; private final MetadataCache localBrokerDataCache; - private final PulsarService pulsarService; private final MQTTProxyConfiguration proxyConfig; private final ExecutorProvider executorProvider; - public PulsarServiceLookupHandler(PulsarService pulsarService, MQTTProxyConfiguration proxyConfig) { - this.pulsarService = pulsarService; - this.proxyConfig = proxyConfig; + public PulsarServiceLookupHandler(MQTTProxyServiceConfig proxyServiceConfig) { + this.proxyConfig = proxyServiceConfig.getProxyConfiguration(); this.executorProvider = new ScheduledExecutorProvider(proxyConfig.getLookupThreadPoolNum(), "mop-lookup-thread"); - this.localBrokerDataCache = pulsarService - .getLocalMetadataStore().getMetadataCache(LocalBrokerData.class); - this.pulsarClient = getClient(proxyConfig); + this.localBrokerDataCache = proxyServiceConfig.getLocalMetadataStore().getMetadataCache(LocalBrokerData.class); + this.pulsarClient = proxyServiceConfig.getPulsarClient(); } private void findBroker(TopicName topicName, @@ -173,41 +167,4 @@ public void close() { } catch (PulsarClientException ignore) { } } - - private PulsarClientImpl getClient(MQTTProxyConfiguration proxyConfig) { - ClientConfigurationData conf = new ClientConfigurationData(); - conf.setServiceUrl(proxyConfig.isTlsEnabled() - ? pulsarService.getBrokerServiceUrlTls() : pulsarService.getBrokerServiceUrl()); - conf.setTlsAllowInsecureConnection(proxyConfig.isTlsAllowInsecureConnection()); - conf.setTlsTrustCertsFilePath(proxyConfig.getTlsCertificateFilePath()); - - if (proxyConfig.isBrokerClientTlsEnabled()) { - if (proxyConfig.isBrokerClientTlsEnabledWithKeyStore()) { - conf.setUseKeyStoreTls(true); - conf.setTlsTrustStoreType(proxyConfig.getBrokerClientTlsTrustStoreType()); - conf.setTlsTrustStorePath(proxyConfig.getBrokerClientTlsTrustStore()); - conf.setTlsTrustStorePassword(proxyConfig.getBrokerClientTlsTrustStorePassword()); - } else { - conf.setTlsTrustCertsFilePath( - isNotBlank(proxyConfig.getBrokerClientTrustCertsFilePath()) - ? proxyConfig.getBrokerClientTrustCertsFilePath() - : proxyConfig.getTlsCertificateFilePath()); - } - } - - try { - if (isNotBlank(proxyConfig.getBrokerClientAuthenticationPlugin())) { - conf.setAuthPluginClassName(proxyConfig.getBrokerClientAuthenticationPlugin()); - conf.setAuthParams(proxyConfig.getBrokerClientAuthenticationParameters()); - conf.setAuthParamMap(null); - conf.setAuthentication(AuthenticationFactory.create( - proxyConfig.getBrokerClientAuthenticationPlugin(), - proxyConfig.getBrokerClientAuthenticationParameters())); - } - return new PulsarClientImpl(conf); - } catch (PulsarClientException e) { - log.error("Failed to create PulsarClient", e); - throw new IllegalArgumentException(e); - } - } } diff --git a/pom.xml b/pom.xml index ee82464b..a878eeed 100644 --- a/pom.xml +++ b/pom.xml @@ -98,6 +98,30 @@ + + io.streamnative + pulsar-proxy + 4.0.0.0 + + + io.grpc + grpc-all + + + io.grpc + grpc-core + + + io.grpc + grpc-testing + + + io.grpc + grpc-auth + + + + io.grpc From 33ecfeefd5a469789f2846472df6cdbca4cb82a4 Mon Sep 17 00:00:00 2001 From: Jiwe Guo Date: Thu, 21 Nov 2024 22:02:03 +0800 Subject: [PATCH 2/5] 1 --- .../mqtt/proxy/MQTTProxyProtocolHandler.java | 33 +++++++++++++++---- 1 file changed, 26 insertions(+), 7 deletions(-) diff --git a/mqtt-proxy/src/main/java/io/streamnative/pulsar/handlers/mqtt/proxy/MQTTProxyProtocolHandler.java b/mqtt-proxy/src/main/java/io/streamnative/pulsar/handlers/mqtt/proxy/MQTTProxyProtocolHandler.java index 1170a805..06edd792 100644 --- a/mqtt-proxy/src/main/java/io/streamnative/pulsar/handlers/mqtt/proxy/MQTTProxyProtocolHandler.java +++ b/mqtt-proxy/src/main/java/io/streamnative/pulsar/handlers/mqtt/proxy/MQTTProxyProtocolHandler.java @@ -19,7 +19,6 @@ import static io.streamnative.pulsar.handlers.mqtt.common.utils.ConfigurationUtils.PROXY_PREFIX; import static io.streamnative.pulsar.handlers.mqtt.common.utils.ConfigurationUtils.getProxyListenerPort; import static org.apache.commons.lang3.StringUtils.isNotBlank; - import com.google.common.collect.ImmutableMap; import io.netty.channel.ChannelInitializer; import io.netty.channel.socket.SocketChannel; @@ -33,10 +32,13 @@ import lombok.Getter; import lombok.extern.slf4j.Slf4j; import org.apache.pulsar.broker.ServiceConfigurationUtils; +import org.apache.pulsar.broker.resources.PulsarResources; import org.apache.pulsar.client.api.AuthenticationFactory; import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.impl.PulsarClientImpl; import org.apache.pulsar.client.impl.conf.ClientConfigurationData; +import org.apache.pulsar.metadata.api.MetadataStoreException; +import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended; import org.apache.pulsar.proxy.extensions.ProxyExtension; import org.apache.pulsar.proxy.server.ProxyConfiguration; import org.apache.pulsar.proxy.server.ProxyService; @@ -50,6 +52,9 @@ public class MQTTProxyProtocolHandler implements ProxyExtension { @Getter private MQTTProxyConfiguration proxyConfig; + @Getter + private ProxyConfiguration conf; + @Getter private ProxyService proxyService; @@ -77,12 +82,13 @@ public boolean accept(String protocol) { @Override public void initialize(ProxyConfiguration conf) throws Exception { - proxyConfig = ConfigurationUtils.create(conf.getProperties(), MQTTProxyConfiguration.class); + this.conf = conf; + this.proxyConfig = ConfigurationUtils.create(conf.getProperties(), MQTTProxyConfiguration.class); // We have to enable ack batch message individual. - proxyConfig.setAcknowledgmentAtBatchIndexLevelEnabled(true); - this.bindAddress = ServiceConfigurationUtils.getDefaultOrConfiguredAddress(proxyConfig.getBindAddress()); + this.proxyConfig.setAcknowledgmentAtBatchIndexLevelEnabled(true); + this.bindAddress = ServiceConfigurationUtils.getDefaultOrConfiguredAddress(conf.getBindAddress()); this.advertisedAddress = - ServiceConfigurationUtils.getDefaultOrConfiguredAddress(proxyConfig.getAdvertisedAddress()); + ServiceConfigurationUtils.getDefaultOrConfiguredAddress(conf.getAdvertisedAddress()); } @Override @@ -98,16 +104,29 @@ public void start(ProxyService service) { } } - private MQTTProxyServiceConfig initProxyConfig() { + private MQTTProxyServiceConfig initProxyConfig() throws Exception { MQTTProxyServiceConfig config = new MQTTProxyServiceConfig(); config.setBindAddress(bindAddress); config.setAdvertisedAddress(advertisedAddress); config.setProxyConfiguration(proxyConfig); -// config.setLocalMetadataStore(proxyService.); + config.setLocalMetadataStore(createLocalMetadataStore()); + config.setConfigMetadataStore(createConfigurationMetadataStore()); config.setPulsarClient(getClient()); return config; } + public MetadataStoreExtended createLocalMetadataStore() throws MetadataStoreException { + return PulsarResources.createLocalMetadataStore(conf.getMetadataStoreUrl(), + conf.getMetadataStoreSessionTimeoutMillis(), + conf.isMetadataStoreAllowReadOnlyOperations()); + } + + public MetadataStoreExtended createConfigurationMetadataStore() throws MetadataStoreException { + return PulsarResources.createConfigMetadataStore(conf.getConfigurationMetadataStoreUrl(), + conf.getMetadataStoreSessionTimeoutMillis(), + conf.isMetadataStoreAllowReadOnlyOperations()); + } + private PulsarClientImpl getClient() { ClientConfigurationData conf = new ClientConfigurationData(); conf.setServiceUrl(proxyConfig.isTlsEnabled() From f328dc539930d1e57cf7c1ed9fd1f7379a70ebd7 Mon Sep 17 00:00:00 2001 From: Jiwe Guo Date: Thu, 21 Nov 2024 22:26:46 +0800 Subject: [PATCH 3/5] commit --- .../mqtt/broker/MQTTProtocolHandler.java | 31 +++++++++++++++---- .../handlers/mqtt/broker/MQTTService.java | 4 +-- .../mqtt/proxy/MQTTProxyServiceConfig.java | 10 +++--- 3 files changed, 32 insertions(+), 13 deletions(-) diff --git a/mqtt-broker/src/main/java/io/streamnative/pulsar/handlers/mqtt/broker/MQTTProtocolHandler.java b/mqtt-broker/src/main/java/io/streamnative/pulsar/handlers/mqtt/broker/MQTTProtocolHandler.java index 4b42c2bd..5d47a135 100644 --- a/mqtt-broker/src/main/java/io/streamnative/pulsar/handlers/mqtt/broker/MQTTProtocolHandler.java +++ b/mqtt-broker/src/main/java/io/streamnative/pulsar/handlers/mqtt/broker/MQTTProtocolHandler.java @@ -35,6 +35,8 @@ import java.util.Map; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; + +import io.streamnative.pulsar.handlers.mqtt.proxy.MQTTProxyServiceConfig; import lombok.Getter; import lombok.extern.slf4j.Slf4j; import org.apache.pulsar.broker.ServiceConfiguration; @@ -57,6 +59,9 @@ public class MQTTProtocolHandler implements ProtocolHandler { @Getter private String bindAddress; + @Getter + private String advertisedAddress; + private MQTTProxyService proxyService; @Getter @@ -77,10 +82,12 @@ public boolean accept(String protocol) { @Override public void initialize(ServiceConfiguration conf) throws Exception { // init config - mqttConfig = ConfigurationUtils.create(conf.getProperties(), MQTTServerConfiguration.class); + this. mqttConfig = ConfigurationUtils.create(conf.getProperties(), MQTTServerConfiguration.class); // We have to enable ack batch message individual. - mqttConfig.setAcknowledgmentAtBatchIndexLevelEnabled(true); + this.mqttConfig.setAcknowledgmentAtBatchIndexLevelEnabled(true); this.bindAddress = ServiceConfigurationUtils.getDefaultOrConfiguredAddress(mqttConfig.getBindAddress()); + this.advertisedAddress = + ServiceConfigurationUtils.getDefaultOrConfiguredAddress(mqttConfig.getAdvertisedAddress()); } @Override @@ -97,11 +104,10 @@ public void start(BrokerService brokerService) { mqttService = new MQTTService(brokerService, mqttConfig); if (mqttConfig.isMqttProxyEnabled() || mqttConfig.isMqttProxyEnable()) { try { - MQTTProxyConfiguration proxyConfig = - ConfigurationUtils.create(mqttConfig.getProperties(), MQTTProxyConfiguration.class); - proxyService = new MQTTProxyService(brokerService, proxyConfig); + final MQTTProxyServiceConfig config = initProxyConfig(); + proxyService = new MQTTProxyService(config); proxyService.start(); - log.info("Start MQTT proxy service at port: {}", proxyConfig.getMqttProxyPort()); + log.info("Start MQTT proxy service at port: {}", config.getProxyConfiguration().getMqttProxyPort()); } catch (Exception ex) { log.error("Failed to start MQTT proxy service.", ex); } @@ -114,6 +120,19 @@ public void start(BrokerService brokerService) { MopVersion.getBuildTime()); } + private MQTTProxyServiceConfig initProxyConfig() throws Exception { + MQTTProxyConfiguration proxyConfig = + ConfigurationUtils.create(mqttConfig.getProperties(), MQTTProxyConfiguration.class); + MQTTProxyServiceConfig config = new MQTTProxyServiceConfig(); + config.setBindAddress(bindAddress); + config.setAdvertisedAddress(advertisedAddress); + config.setProxyConfiguration(proxyConfig); + config.setLocalMetadataStore(brokerService.pulsar().getLocalMetadataStore()); + config.setConfigMetadataStore(brokerService.pulsar().getConfigurationMetadataStore()); + config.setPulsarClient(brokerService.pulsar().getClient()); + return config; + } + @Override public Map> newChannelInitializers() { checkArgument(mqttConfig != null); diff --git a/mqtt-broker/src/main/java/io/streamnative/pulsar/handlers/mqtt/broker/MQTTService.java b/mqtt-broker/src/main/java/io/streamnative/pulsar/handlers/mqtt/broker/MQTTService.java index a341c27d..d9c1e15e 100644 --- a/mqtt-broker/src/main/java/io/streamnative/pulsar/handlers/mqtt/broker/MQTTService.java +++ b/mqtt-broker/src/main/java/io/streamnative/pulsar/handlers/mqtt/broker/MQTTService.java @@ -102,14 +102,14 @@ public MQTTService(BrokerService brokerService, MQTTServerConfiguration serverCo this.metricsProvider = new MQTTMetricsProvider(metricsCollector); this.pulsarService.addPrometheusRawMetricsProvider(metricsProvider); this.authenticationService = serverConfiguration.isMqttAuthenticationEnabled() - ? new MQTTAuthenticationService(brokerService, + ? new MQTTAuthenticationService(brokerService.getAuthenticationService(), serverConfiguration.getMqttAuthenticationMethods()) : null; this.connectionManager = new MQTTConnectionManager(pulsarService.getAdvertisedAddress()); this.subscriptionManager = new MQTTSubscriptionManager(); if (getServerConfiguration().isMqttProxyEnabled()) { this.eventCenter = new DisableEventCenter(); } else { - this.eventCenter = new PulsarEventCenterImpl(brokerService, + this.eventCenter = new PulsarEventCenterImpl(brokerService.pulsar().getConfigurationMetadataStore(), serverConfiguration.getEventCenterCallbackPoolThreadNum()); } this.eventService = new DisabledSystemEventService(); diff --git a/mqtt-proxy/src/main/java/io/streamnative/pulsar/handlers/mqtt/proxy/MQTTProxyServiceConfig.java b/mqtt-proxy/src/main/java/io/streamnative/pulsar/handlers/mqtt/proxy/MQTTProxyServiceConfig.java index e4c16885..9dc58c68 100644 --- a/mqtt-proxy/src/main/java/io/streamnative/pulsar/handlers/mqtt/proxy/MQTTProxyServiceConfig.java +++ b/mqtt-proxy/src/main/java/io/streamnative/pulsar/handlers/mqtt/proxy/MQTTProxyServiceConfig.java @@ -3,21 +3,21 @@ import lombok.Data; import org.apache.pulsar.broker.authentication.AuthenticationService; -import org.apache.pulsar.client.impl.PulsarClientImpl; -import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended; +import org.apache.pulsar.client.api.PulsarClient; +import org.apache.pulsar.metadata.api.MetadataStore; @Data public class MQTTProxyServiceConfig { private MQTTProxyConfiguration proxyConfiguration; - private MetadataStoreExtended localMetadataStore; + private MetadataStore localMetadataStore; - private MetadataStoreExtended configMetadataStore; + private MetadataStore configMetadataStore; private AuthenticationService authenticationService; - private PulsarClientImpl pulsarClient; + private PulsarClient pulsarClient; private String bindAddress; From fc5cf34f9187f40de70f567003b4bfebe7572166 Mon Sep 17 00:00:00 2001 From: Jiwe Guo Date: Fri, 22 Nov 2024 09:55:33 +0800 Subject: [PATCH 4/5] fix --- .../mqtt/broker/MQTTProtocolHandler.java | 6 ++-- .../MQTTAuthenticationService.java | 1 - .../common/event/PulsarEventCenterImpl.java | 1 - .../SystemTopicBasedSystemEventService.java | 30 ++++++++++--------- .../mqtt/proxy/MQTTProxyProtocolHandler.java | 14 +++++++-- .../handlers/mqtt/proxy/MQTTProxyService.java | 6 ++-- .../mqtt/proxy/MQTTProxyServiceConfig.java | 3 ++ .../handler/PulsarServiceLookupHandler.java | 4 +-- 8 files changed, 38 insertions(+), 27 deletions(-) diff --git a/mqtt-broker/src/main/java/io/streamnative/pulsar/handlers/mqtt/broker/MQTTProtocolHandler.java b/mqtt-broker/src/main/java/io/streamnative/pulsar/handlers/mqtt/broker/MQTTProtocolHandler.java index 5d47a135..f16bb838 100644 --- a/mqtt-broker/src/main/java/io/streamnative/pulsar/handlers/mqtt/broker/MQTTProtocolHandler.java +++ b/mqtt-broker/src/main/java/io/streamnative/pulsar/handlers/mqtt/broker/MQTTProtocolHandler.java @@ -31,12 +31,11 @@ import io.streamnative.pulsar.handlers.mqtt.common.utils.ConfigurationUtils; import io.streamnative.pulsar.handlers.mqtt.proxy.MQTTProxyConfiguration; import io.streamnative.pulsar.handlers.mqtt.proxy.MQTTProxyService; +import io.streamnative.pulsar.handlers.mqtt.proxy.MQTTProxyServiceConfig; import java.net.InetSocketAddress; import java.util.Map; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; - -import io.streamnative.pulsar.handlers.mqtt.proxy.MQTTProxyServiceConfig; import lombok.Getter; import lombok.extern.slf4j.Slf4j; import org.apache.pulsar.broker.ServiceConfiguration; @@ -82,7 +81,7 @@ public boolean accept(String protocol) { @Override public void initialize(ServiceConfiguration conf) throws Exception { // init config - this. mqttConfig = ConfigurationUtils.create(conf.getProperties(), MQTTServerConfiguration.class); + this.mqttConfig = ConfigurationUtils.create(conf.getProperties(), MQTTServerConfiguration.class); // We have to enable ack batch message individual. this.mqttConfig.setAcknowledgmentAtBatchIndexLevelEnabled(true); this.bindAddress = ServiceConfigurationUtils.getDefaultOrConfiguredAddress(mqttConfig.getBindAddress()); @@ -130,6 +129,7 @@ private MQTTProxyServiceConfig initProxyConfig() throws Exception { config.setLocalMetadataStore(brokerService.pulsar().getLocalMetadataStore()); config.setConfigMetadataStore(brokerService.pulsar().getConfigurationMetadataStore()); config.setPulsarClient(brokerService.pulsar().getClient()); + config.setPulsarResources(brokerService.pulsar().getPulsarResources()); return config; } diff --git a/mqtt-common/src/main/java/io/streamnative/pulsar/handlers/mqtt/common/authentication/MQTTAuthenticationService.java b/mqtt-common/src/main/java/io/streamnative/pulsar/handlers/mqtt/common/authentication/MQTTAuthenticationService.java index a37b4f3d..ef63396f 100644 --- a/mqtt-common/src/main/java/io/streamnative/pulsar/handlers/mqtt/common/authentication/MQTTAuthenticationService.java +++ b/mqtt-common/src/main/java/io/streamnative/pulsar/handlers/mqtt/common/authentication/MQTTAuthenticationService.java @@ -31,7 +31,6 @@ import org.apache.pulsar.broker.authentication.AuthenticationDataSource; import org.apache.pulsar.broker.authentication.AuthenticationProvider; import org.apache.pulsar.broker.authentication.AuthenticationService; -import org.apache.pulsar.broker.service.BrokerService; /** * MQTT authentication service. diff --git a/mqtt-common/src/main/java/io/streamnative/pulsar/handlers/mqtt/common/event/PulsarEventCenterImpl.java b/mqtt-common/src/main/java/io/streamnative/pulsar/handlers/mqtt/common/event/PulsarEventCenterImpl.java index 5cf0b7be..4da9b287 100644 --- a/mqtt-common/src/main/java/io/streamnative/pulsar/handlers/mqtt/common/event/PulsarEventCenterImpl.java +++ b/mqtt-common/src/main/java/io/streamnative/pulsar/handlers/mqtt/common/event/PulsarEventCenterImpl.java @@ -20,7 +20,6 @@ import lombok.extern.slf4j.Slf4j; import org.apache.bookkeeper.common.util.OrderedExecutor; import org.apache.commons.collections4.CollectionUtils; -import org.apache.pulsar.broker.service.BrokerService; import org.apache.pulsar.metadata.api.MetadataStore; import org.apache.pulsar.metadata.api.Notification; diff --git a/mqtt-common/src/main/java/io/streamnative/pulsar/handlers/mqtt/common/systemtopic/SystemTopicBasedSystemEventService.java b/mqtt-common/src/main/java/io/streamnative/pulsar/handlers/mqtt/common/systemtopic/SystemTopicBasedSystemEventService.java index 0a836aec..1a9063d2 100644 --- a/mqtt-common/src/main/java/io/streamnative/pulsar/handlers/mqtt/common/systemtopic/SystemTopicBasedSystemEventService.java +++ b/mqtt-common/src/main/java/io/streamnative/pulsar/handlers/mqtt/common/systemtopic/SystemTopicBasedSystemEventService.java @@ -16,19 +16,22 @@ import com.github.benmanes.caffeine.cache.AsyncLoadingCache; import com.github.benmanes.caffeine.cache.Caffeine; import com.google.common.annotations.Beta; +import io.netty.util.concurrent.DefaultThreadFactory; import java.util.ArrayList; import java.util.List; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import lombok.extern.slf4j.Slf4j; import org.apache.bookkeeper.common.util.JsonUtil; -import org.apache.pulsar.broker.PulsarServerException; -import org.apache.pulsar.broker.PulsarService; +import org.apache.pulsar.broker.resources.PulsarResources; import org.apache.pulsar.broker.systopic.SystemTopicClient; import org.apache.pulsar.client.api.Message; import org.apache.pulsar.client.api.MessageId; +import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.util.RetryUtil; import org.apache.pulsar.common.naming.NamespaceName; @@ -48,22 +51,19 @@ public class SystemTopicBasedSystemEventService implements SystemEventService { private static final long CACHE_EXPIRE_TIME_MILLIS = TimeUnit.MINUTES.toMillis(10); private static final String WRITER_KEY = "writer"; - private final PulsarService pulsarService; + private final PulsarResources pulsarResources; private final SystemTopicClient systemTopicClient; private final List listeners; private volatile SystemTopicClient.Reader reader; private final AtomicBoolean initReader = new AtomicBoolean(false); private final AtomicInteger maxRetry = new AtomicInteger(0); + private final ScheduledExecutorService executor; private final AsyncLoadingCache> writerCaches; - public SystemTopicBasedSystemEventService(PulsarService pulsarService) { - this.pulsarService = pulsarService; - try { - this.systemTopicClient = new MQTTEventSystemTopicClient(pulsarService.getClient(), SYSTEM_EVENT_TOPIC); - } catch (PulsarServerException e) { - throw new IllegalStateException(e); - } + public SystemTopicBasedSystemEventService(PulsarResources pulsarResources, PulsarClient client) { + this.pulsarResources = pulsarResources; + this.systemTopicClient = new MQTTEventSystemTopicClient(client, SYSTEM_EVENT_TOPIC); this.listeners = new ArrayList<>(); writerCaches = Caffeine.newBuilder() .expireAfterAccess(CACHE_EXPIRE_TIME_MILLIS, TimeUnit.MILLISECONDS) @@ -71,6 +71,8 @@ public SystemTopicBasedSystemEventService(PulsarService pulsarService) { ((SystemTopicClient.Writer) v).closeAsync(); }) .buildAsync((key, executor) -> systemTopicClient.newWriterAsync()); + executor = Executors.newSingleThreadScheduledExecutor( + new DefaultThreadFactory("mop_system_topic_client")); } @Override @@ -134,14 +136,13 @@ protected CompletableFuture> createReader() Backoff backoff = new Backoff(1, TimeUnit.SECONDS, 3, TimeUnit.SECONDS, 10, TimeUnit.SECONDS); - RetryUtil.retryAsynchronously(systemTopicClient::newReaderAsync, backoff, pulsarService.getExecutor(), result); + RetryUtil.retryAsynchronously(systemTopicClient::newReaderAsync, backoff, executor, result); return result; } @Override public void start() { - CompletableFuture checkNamespaceFuture = pulsarService - .getPulsarResources() + CompletableFuture checkNamespaceFuture = pulsarResources .getNamespaceResources() .namespaceExistsAsync(NamespaceName.SYSTEM_NAMESPACE); checkNamespaceFuture.thenAccept(ret -> { @@ -149,7 +150,7 @@ public void start() { startReader(); } else { if (maxRetry.incrementAndGet() < 10) { - pulsarService.getExecutor().schedule(this::start, 1, TimeUnit.SECONDS); + executor.schedule(this::start, 1, TimeUnit.SECONDS); } } }).exceptionally(ex -> { @@ -161,6 +162,7 @@ public void start() { @Override public void close() { closeReader(); + executor.shutdownNow(); } private void startReader() { diff --git a/mqtt-proxy/src/main/java/io/streamnative/pulsar/handlers/mqtt/proxy/MQTTProxyProtocolHandler.java b/mqtt-proxy/src/main/java/io/streamnative/pulsar/handlers/mqtt/proxy/MQTTProxyProtocolHandler.java index 06edd792..69bb9f6f 100644 --- a/mqtt-proxy/src/main/java/io/streamnative/pulsar/handlers/mqtt/proxy/MQTTProxyProtocolHandler.java +++ b/mqtt-proxy/src/main/java/io/streamnative/pulsar/handlers/mqtt/proxy/MQTTProxyProtocolHandler.java @@ -26,11 +26,13 @@ import io.streamnative.pulsar.handlers.mqtt.common.utils.ConfigurationUtils; import io.streamnative.pulsar.handlers.mqtt.proxy.channel.MQTTProxyChannelInitializer; import java.net.InetSocketAddress; +import java.util.List; import java.util.Map; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import lombok.Getter; import lombok.extern.slf4j.Slf4j; +import org.apache.pulsar.broker.PulsarServerException; import org.apache.pulsar.broker.ServiceConfigurationUtils; import org.apache.pulsar.broker.resources.PulsarResources; import org.apache.pulsar.client.api.AuthenticationFactory; @@ -39,6 +41,7 @@ import org.apache.pulsar.client.impl.conf.ClientConfigurationData; import org.apache.pulsar.metadata.api.MetadataStoreException; import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended; +import org.apache.pulsar.policies.data.loadbalancer.ServiceLookupData; import org.apache.pulsar.proxy.extensions.ProxyExtension; import org.apache.pulsar.proxy.server.ProxyConfiguration; import org.apache.pulsar.proxy.server.ProxyService; @@ -111,6 +114,7 @@ private MQTTProxyServiceConfig initProxyConfig() throws Exception { config.setProxyConfiguration(proxyConfig); config.setLocalMetadataStore(createLocalMetadataStore()); config.setConfigMetadataStore(createConfigurationMetadataStore()); + config.setPulsarResources(new PulsarResources(config.getLocalMetadataStore(), config.getConfigMetadataStore())); config.setPulsarClient(getClient()); return config; } @@ -127,10 +131,16 @@ public MetadataStoreExtended createConfigurationMetadataStore() throws MetadataS conf.isMetadataStoreAllowReadOnlyOperations()); } - private PulsarClientImpl getClient() { + private PulsarClientImpl getClient() throws Exception { + final List availableBrokers = + proxyService.getDiscoveryProvider().getAvailableBrokers(); + if (availableBrokers.isEmpty()) { + throw new PulsarServerException("No active broker is available"); + } + final ServiceLookupData serviceLookupData = availableBrokers.get(0); ClientConfigurationData conf = new ClientConfigurationData(); conf.setServiceUrl(proxyConfig.isTlsEnabled() - ? pulsarService.getBrokerServiceUrlTls() : pulsarService.getBrokerServiceUrl()); + ? serviceLookupData.getPulsarServiceUrlTls() : serviceLookupData.getPulsarServiceUrl()); conf.setTlsAllowInsecureConnection(proxyConfig.isTlsAllowInsecureConnection()); conf.setTlsTrustCertsFilePath(proxyConfig.getTlsCertificateFilePath()); diff --git a/mqtt-proxy/src/main/java/io/streamnative/pulsar/handlers/mqtt/proxy/MQTTProxyService.java b/mqtt-proxy/src/main/java/io/streamnative/pulsar/handlers/mqtt/proxy/MQTTProxyService.java index 084b33c1..31c5e6ac 100644 --- a/mqtt-proxy/src/main/java/io/streamnative/pulsar/handlers/mqtt/proxy/MQTTProxyService.java +++ b/mqtt-proxy/src/main/java/io/streamnative/pulsar/handlers/mqtt/proxy/MQTTProxyService.java @@ -40,10 +40,7 @@ import java.util.concurrent.ScheduledExecutorService; import lombok.Getter; import lombok.extern.slf4j.Slf4j; -import org.apache.pulsar.broker.PulsarService; -import org.apache.pulsar.broker.service.BrokerService; import org.apache.pulsar.common.util.netty.EventLoopUtil; -import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended; /** * This service is used for redirecting MQTT client request to proper MQTT protocol handler Broker. @@ -102,7 +99,8 @@ public MQTTProxyService(MQTTProxyServiceConfig proxyServiceConfig) { } this.connectionManager = new MQTTConnectionManager(proxyServiceConfig.getAdvertisedAddress()); this.eventService = proxyConfig.isSystemEventEnabled() - ? new SystemTopicBasedSystemEventService(pulsarService) + ? new SystemTopicBasedSystemEventService(proxyServiceConfig.getPulsarResources(), + proxyServiceConfig.getPulsarClient()) : new DisabledSystemEventService(); this.eventService.addListener(connectionManager.getEventListener()); this.eventService.addListener(new RetainedMessageHandler(eventService).getEventListener()); diff --git a/mqtt-proxy/src/main/java/io/streamnative/pulsar/handlers/mqtt/proxy/MQTTProxyServiceConfig.java b/mqtt-proxy/src/main/java/io/streamnative/pulsar/handlers/mqtt/proxy/MQTTProxyServiceConfig.java index 9dc58c68..5dc9378d 100644 --- a/mqtt-proxy/src/main/java/io/streamnative/pulsar/handlers/mqtt/proxy/MQTTProxyServiceConfig.java +++ b/mqtt-proxy/src/main/java/io/streamnative/pulsar/handlers/mqtt/proxy/MQTTProxyServiceConfig.java @@ -3,6 +3,7 @@ import lombok.Data; import org.apache.pulsar.broker.authentication.AuthenticationService; +import org.apache.pulsar.broker.resources.PulsarResources; import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.metadata.api.MetadataStore; @@ -19,6 +20,8 @@ public class MQTTProxyServiceConfig { private PulsarClient pulsarClient; + private PulsarResources pulsarResources; + private String bindAddress; private String advertisedAddress; diff --git a/mqtt-proxy/src/main/java/io/streamnative/pulsar/handlers/mqtt/proxy/handler/PulsarServiceLookupHandler.java b/mqtt-proxy/src/main/java/io/streamnative/pulsar/handlers/mqtt/proxy/handler/PulsarServiceLookupHandler.java index 15335c87..fcc56981 100644 --- a/mqtt-proxy/src/main/java/io/streamnative/pulsar/handlers/mqtt/proxy/handler/PulsarServiceLookupHandler.java +++ b/mqtt-proxy/src/main/java/io/streamnative/pulsar/handlers/mqtt/proxy/handler/PulsarServiceLookupHandler.java @@ -15,6 +15,7 @@ import io.streamnative.pulsar.handlers.mqtt.common.utils.ConfigurationUtils; import io.streamnative.pulsar.handlers.mqtt.proxy.MQTTProxyConfiguration; +import io.streamnative.pulsar.handlers.mqtt.proxy.MQTTProxyServiceConfig; import java.net.InetSocketAddress; import java.util.Arrays; import java.util.Collections; @@ -25,7 +26,6 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; import java.util.stream.Collectors; -import io.streamnative.pulsar.handlers.mqtt.proxy.MQTTProxyServiceConfig; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.tuple.Pair; import org.apache.pulsar.broker.loadbalance.LoadManager; @@ -58,7 +58,7 @@ public PulsarServiceLookupHandler(MQTTProxyServiceConfig proxyServiceConfig) { this.executorProvider = new ScheduledExecutorProvider(proxyConfig.getLookupThreadPoolNum(), "mop-lookup-thread"); this.localBrokerDataCache = proxyServiceConfig.getLocalMetadataStore().getMetadataCache(LocalBrokerData.class); - this.pulsarClient = proxyServiceConfig.getPulsarClient(); + this.pulsarClient = (PulsarClientImpl) proxyServiceConfig.getPulsarClient(); } private void findBroker(TopicName topicName, From 5aa107882d7de162f445447fd9ba920e236c6cd7 Mon Sep 17 00:00:00 2001 From: Jiwe Guo Date: Fri, 22 Nov 2024 10:18:46 +0800 Subject: [PATCH 5/5] 1 --- .../handlers/mqtt/common/utils/PulsarTopicUtils.java | 6 ++++-- .../proxy/impl/MQTTProxyProtocolMethodProcessor.java | 10 +++++----- pom.xml | 2 +- 3 files changed, 10 insertions(+), 8 deletions(-) diff --git a/mqtt-common/src/main/java/io/streamnative/pulsar/handlers/mqtt/common/utils/PulsarTopicUtils.java b/mqtt-common/src/main/java/io/streamnative/pulsar/handlers/mqtt/common/utils/PulsarTopicUtils.java index 6b14e02a..88eb126a 100644 --- a/mqtt-common/src/main/java/io/streamnative/pulsar/handlers/mqtt/common/utils/PulsarTopicUtils.java +++ b/mqtt-common/src/main/java/io/streamnative/pulsar/handlers/mqtt/common/utils/PulsarTopicUtils.java @@ -28,6 +28,7 @@ import org.apache.commons.lang3.tuple.Pair; import org.apache.pulsar.broker.PulsarService; import org.apache.pulsar.broker.namespace.LookupOptions; +import org.apache.pulsar.broker.namespace.NamespaceService; import org.apache.pulsar.broker.service.BrokerServiceException; import org.apache.pulsar.broker.service.Subscription; import org.apache.pulsar.broker.service.Topic; @@ -197,14 +198,15 @@ public static TopicFilter getTopicFilter(String mqttTopicFilter) { } public static CompletableFuture> asyncGetTopicListFromTopicSubscription(String topicFilter, - String defaultTenant, String defaultNamespace, PulsarService pulsarService, String defaultTopicDomain) { + String defaultTenant, String defaultNamespace, + NamespaceService namespaceService, String defaultTopicDomain) { if (MqttUtils.isRegexFilter(topicFilter)) { TopicFilter filter = PulsarTopicUtils.getTopicFilter(topicFilter); Pair domainNamespacePair = PulsarTopicUtils .getTopicDomainAndNamespaceFromTopicFilter(topicFilter, defaultTenant, defaultNamespace, defaultTopicDomain); - return pulsarService.getNamespaceService().getListOfTopics( + return namespaceService.getListOfTopics( domainNamespacePair.getRight(), domainNamespacePair.getLeft() == TopicDomain.persistent ? CommandGetTopicsOfNamespace.Mode.PERSISTENT : CommandGetTopicsOfNamespace.Mode.NON_PERSISTENT).thenCompose(topics -> diff --git a/mqtt-proxy/src/main/java/io/streamnative/pulsar/handlers/mqtt/proxy/impl/MQTTProxyProtocolMethodProcessor.java b/mqtt-proxy/src/main/java/io/streamnative/pulsar/handlers/mqtt/proxy/impl/MQTTProxyProtocolMethodProcessor.java index 9d495418..1996180f 100644 --- a/mqtt-proxy/src/main/java/io/streamnative/pulsar/handlers/mqtt/proxy/impl/MQTTProxyProtocolMethodProcessor.java +++ b/mqtt-proxy/src/main/java/io/streamnative/pulsar/handlers/mqtt/proxy/impl/MQTTProxyProtocolMethodProcessor.java @@ -77,8 +77,6 @@ @Slf4j public class MQTTProxyProtocolMethodProcessor extends AbstractCommonProtocolMethodProcessor { - private final PulsarService pulsarService; - @Getter private Connection connection; private final LookupHandler lookupHandler; @@ -94,6 +92,7 @@ public class MQTTProxyProtocolMethodProcessor extends AbstractCommonProtocolMeth private final MQTTConnectionManager connectionManager; private final SystemEventService eventService; private final MQTTProxyAdapter proxyAdapter; + private final MQTTProxyService proxyService; private final AtomicBoolean isDisconnected = new AtomicBoolean(false); private final AutoSubscribeHandler autoSubscribeHandler; @@ -106,7 +105,7 @@ public MQTTProxyProtocolMethodProcessor(MQTTProxyService proxyService, ChannelHa super(proxyService.getAuthenticationService(), proxyService.getProxyConfig().isMqttAuthenticationEnabled(), ctx); - pulsarService = proxyService.getPulsarService(); + this.proxyService = proxyService; this.lookupHandler = proxyService.getLookupHandler(); this.proxyConfig = proxyService.getProxyConfig(); this.connectionManager = proxyService.getConnectionManager(); @@ -152,7 +151,7 @@ public void doProcessConnect(MqttAdapterMessage adapter, String userRole, ConnectEvent connectEvent = ConnectEvent.builder() .clientId(connection.getClientId()) - .address(pulsarService.getAdvertisedAddress()) + .address(proxyService.getProxyServiceConfig().getAdvertisedAddress()) .build(); eventService.sendConnectEvent(connectEvent); } @@ -365,7 +364,8 @@ private CompletableFuture doSubscribe(final MqttAdapterMessage adapter, fi final int packetId = message.variableHeader().messageId(); List> futures = message.payload().topicSubscriptions().stream() .map(subscription -> PulsarTopicUtils.asyncGetTopicListFromTopicSubscription(subscription.topicName(), - proxyConfig.getDefaultTenant(), proxyConfig.getDefaultNamespace(), pulsarService, + proxyConfig.getDefaultTenant(), proxyConfig.getDefaultNamespace(), + proxyService.getProxyServiceConfig().getPulsarResources().getNamespaceResources(), proxyConfig.getDefaultTopicDomain()) .thenCompose(pulsarTopicNames -> { if (CollectionUtils.isEmpty(pulsarTopicNames)) { diff --git a/pom.xml b/pom.xml index a878eeed..d4ffdb52 100644 --- a/pom.xml +++ b/pom.xml @@ -101,7 +101,7 @@ io.streamnative pulsar-proxy - 4.0.0.0 + ${pulsar.version} io.grpc