Skip to content

Commit

Permalink
#205 #206 refactor backend
Browse files Browse the repository at this point in the history
Signed-off-by: Niklas Teschner <[email protected]>
  • Loading branch information
nikkite99 committed Jul 14, 2023
1 parent 2031cbc commit b9053db
Show file tree
Hide file tree
Showing 9 changed files with 9 additions and 78 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,5 +23,4 @@ public ResponseEntity<Void> flush() {
.forEach(cache -> Objects.requireNonNull(cacheManager.getCache(cache)).clear());
return new ResponseEntity<>(HttpStatus.OK);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -27,5 +27,4 @@ public ResponseEntity<ClusterDetailDto> getClusterDetails(@RequestParam String c
public ResponseEntity<ClustersDto> getAll() {
return new ResponseEntity<>(new ClustersDto(clusterService.getAllNames()), HttpStatus.OK);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -36,5 +36,4 @@ public ResponseEntity<MessagesDto> getMessages(@RequestParam String topic,
Set<MessageDto> messageDtos = messageService.getLatestMessagesFiltered(topic, numMessages, producers, subscriptions);
return new ResponseEntity<>(new MessagesDto(messageDtos), HttpStatus.OK);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ public class NamespaceController {
public ResponseEntity<NamespacesDto> getAll(@RequestParam(required = false, defaultValue = "") List<String> tenants,
@RequestParam(required = false, defaultValue = "") List<String> namespaces) {
if (!namespaces.isEmpty()) {
return wrapInEntity(getAllForNamespaces(namespaces));
return wrapInEntity(namespaceService.getAllForNamespaces(namespaces));
} else {
return wrapInEntity(getAllForTenants(tenants));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@ public class ClusterService {
@Cacheable("cluster.allNames")
public List<ClusterDto> getAllNames() {
try {
return pulsarAdmin.clusters().getClusters()
.stream().map(ClusterDto::create)
return pulsarAdmin.clusters().getClusters().stream()
.map(ClusterDto::create)
.map(this::enrichWithCardDetails)
.toList();
} catch (PulsarAdminException e) {
Expand All @@ -41,7 +41,6 @@ public ClusterDetailDto getClusterDetails(String clusterName) {
ClusterData clusterData = getClusterData(clusterName);
List<String> activeBrokers = getActiveBrokers(clusterName);
List<String> tenantsAllowedForCluster = getTenantsAllowedForCluster(clusterName);

return ClusterDetailDto.builder()
.name(clusterName)
.serviceUrl(clusterData.getServiceUrl())
Expand Down Expand Up @@ -90,9 +89,9 @@ private List<String> getTenantsAllowedForCluster(String clusterName) throws Puls
}

private ClusterDto enrichWithCardDetails(ClusterDto clusterDto) {
List<String> tenats = getTenantsAllowedForCluster(clusterDto.getName());
long numberOfNamespaces = tenats.stream().mapToLong(t -> namespaceService.getAllOfTenant(t).size()).sum();
clusterDto.setNumberOfTenants(tenats.size());
List<String> tenants = getTenantsAllowedForCluster(clusterDto.getName());
long numberOfNamespaces = tenants.stream().mapToLong(t -> namespaceService.getAllOfTenant(t).size()).sum();
clusterDto.setNumberOfTenants(tenants.size());
clusterDto.setNumberOfNamespaces(numberOfNamespaces);
return clusterDto;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,9 @@
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.common.api.proto.CommandSubscribe;
import org.apache.pulsar.common.naming.TopicName;
import org.springframework.stereotype.Service;

import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Set;
import java.util.*;
import java.util.stream.Collectors;

@Service
Expand All @@ -39,15 +33,13 @@ public Set<MessageDto> getLatestMessagesFiltered(String topic, Integer numMessag
if (!subscriptions.isEmpty()) {
messageDtos = filterBySubscription(messageDtos, numMessages, topic, subscriptions);
}

return messageDtos;
}

private Set<MessageDto> filterBySubscription(Set<MessageDto> messageDtos, Integer numMessages, String topic, List<String> subscriptions) {
List<String> messageIds = subscriptions.stream()
.flatMap(s -> peekMessageIds(topic, s, numMessages).stream())
.toList();

return messageDtos.stream()
.filter(m -> messageIds.contains(m.getMessageId()))
.collect(Collectors.toCollection(LinkedHashSet::new));
Expand All @@ -57,15 +49,15 @@ private List<String> peekMessageIds(String topic, String subscription, Integer n
try {
if (pulsarAdmin.topics().getSubscriptions(topic).contains(subscription)) {
return pulsarAdmin.topics().peekMessages(topic, subscription, numMessages).stream()
.map(m -> m.getMessageId().toString()).toList();
.map(m -> m.getMessageId().toString())
.toList();
}
return Collections.emptyList();
} catch (PulsarAdminException e) {
throw new PulsarApiException(String.format("Could not get Messages for subscription %s", subscription), e);
}
}


private Set<MessageDto> filterByProducers(Set<MessageDto> messageDtos, List<String> producers) {
return messageDtos.stream()
.filter(m -> producers.contains(m.getProducer()))
Expand Down Expand Up @@ -107,8 +99,4 @@ private String getSchemaIfExists(String topic) {
return "";
}
}

public boolean inValidTopicName(MessageDto messageDto) {
return !TopicName.isValid(messageDto.getTopic());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,6 @@ private NamespaceDetailDto enrichWithNamespaceData(NamespaceDetailDto namespace)
try {

Namespaces namespaces = pulsarAdmin.namespaces();

namespace.setBundlesData(namespaces.getBundles(namespace.getId()));
namespace.setMessagesTTL(namespaces.getNamespaceMessageTTL(namespace.getId()));
namespace.setRetentionPolicies(namespaces.getRetention(namespace.getId()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,6 @@ public TenantDetailDto getTenantDetails(String tenantName) {
.build();
}


private TenantInfo getTenantInfo(String tenantName) {
try {
return pulsarAdmin.tenants().getTenantInfo(tenantName);
Expand All @@ -72,7 +71,6 @@ private TenantInfo getTenantInfo(String tenantName) {
}
}


private TenantDto enrichWithCardDetails(TenantDto tenantDto) {
try {
List<String> namespaces = pulsarAdmin.namespaces().getNamespaces(tenantDto.getName());
Expand Down

This file was deleted.

0 comments on commit b9053db

Please sign in to comment.