From 4eac6941e7b4e60978af39f5e9fb566504521e2f Mon Sep 17 00:00:00 2001 From: Vincent Royer Date: Fri, 3 Aug 2018 10:11:25 +0200 Subject: [PATCH] Fix cassandra discovery for k8s --- .../discovery/CassandraDiscovery.java | 43 ++++++++++++------- 1 file changed, 28 insertions(+), 15 deletions(-) diff --git a/core/src/main/java/org/elassandra/discovery/CassandraDiscovery.java b/core/src/main/java/org/elassandra/discovery/CassandraDiscovery.java index ad4a1a43132..b6e61f21fe8 100644 --- a/core/src/main/java/org/elassandra/discovery/CassandraDiscovery.java +++ b/core/src/main/java/org/elassandra/discovery/CassandraDiscovery.java @@ -155,8 +155,9 @@ protected void doStart() { for(InetAddress endpoint : StorageService.instance.getTokenMetadata().getAllEndpoints()) { if (!this.localAddress.equals(endpoint) && this.localDc.equals(DatabaseDescriptor.getEndpointSnitch().getDatacenter(endpoint))) { String hostId = StorageService.instance.getHostId(endpoint).toString(); - UntypedResultSet.Row row = executeInternal("SELECT preferred_ip, rpc_address from system." + SystemKeyspace.PEERS+" WHERE peer = ?", endpoint).one(); - if (row != null) { + UntypedResultSet rs = executeInternal("SELECT preferred_ip, rpc_address from system." + SystemKeyspace.PEERS+" WHERE peer = ?", endpoint); + if (!rs.isEmpty()) { + UntypedResultSet.Row row = rs.one(); EndpointState epState = Gossiper.instance.getEndpointStateForEndpoint(endpoint); clusterGroup.update(hostId, endpoint, @@ -656,7 +657,7 @@ public void onRemove(InetAddress endpoint) { } } else if (isMember(endpoint)) { DiscoveryNode removedNode = this.nodes().findByInetAddress(endpoint); - if (removedNode != null) { + if (removedNode != null && !this.localNode().getId().equals(removedNode.getId())) { logger.warn("Removing node ip={} node={} => disconnecting", endpoint, removedNode); if (this.metaDataVersionAckListener.get() != null) { notifyMetaDataVersionAckListener(Gossiper.instance.getEndpointStateForEndpoint(endpoint)); @@ -824,18 +825,30 @@ public synchronized boolean update(String hostId, InetAddress endpoint, InetAddr } */ return true; - } else if (!dn.getName().equals(buildNodeName(endpoint)) || !dn.getInetAddress().equals(Boolean.getBoolean("es.use_internal_address") ? internalIp : rpcAddress)) { - DiscoveryNode dn2 = new DiscoveryNode(buildNodeName(endpoint), - hostId, - new InetSocketTransportAddress(Boolean.getBoolean("es.use_internal_address") ? internalIp : rpcAddress, publishPort()), - dn.getAttributes(), - CASSANDRA_ROLES, - Version.CURRENT, - status); - members.replace(hostId, dn, dn2); - logger.debug("Update node host_id={} endpoint={} internal_ip={}, rpc_address={}, status={}", - hostId, NetworkAddress.format(endpoint), NetworkAddress.format(internalIp), NetworkAddress.format(rpcAddress), status); - return true; + } + if (localNode().getId().equals(hostId)) { + // ignore GOSSIP update related to our self node. + logger.debug("Ignoring GOSSIP update for node id={} ip={} because it's mine", hostId, endpoint); + return false; + } + if (!dn.getName().equals(buildNodeName(endpoint)) || !dn.getInetAddress().equals(Boolean.getBoolean("es.use_internal_address") ? internalIp : rpcAddress)) { + if (status.equals(DiscoveryNodeStatus.ALIVE)) { + DiscoveryNode dn2 = new DiscoveryNode(buildNodeName(endpoint), + hostId, + new InetSocketTransportAddress(Boolean.getBoolean("es.use_internal_address") ? internalIp : rpcAddress, publishPort()), + dn.getAttributes(), + CASSANDRA_ROLES, + Version.CURRENT, + status); + members.replace(hostId, dn, dn2); + logger.debug("Update node host_id={} endpoint={} internal_ip={}, rpc_address={}, status={}", + hostId, NetworkAddress.format(endpoint), NetworkAddress.format(internalIp), NetworkAddress.format(rpcAddress), status); + return true; + } else { + logger.debug("Ignoring node host_id={} endpoint={} internal_ip={}, rpc_address={}, status={}", + hostId, NetworkAddress.format(endpoint), NetworkAddress.format(internalIp), NetworkAddress.format(rpcAddress), status); + return false; + } } else if (!dn.getStatus().equals(status)) { dn.status(status); logger.debug("Update node host_id={} endpoint={} internal_ip={} rpc_address={}, status={}",