Skip to content

Commit

Permalink
[CELEBORN-1401] Add SSL support for ratis communication
Browse files Browse the repository at this point in the history
### What changes were proposed in this pull request?

When SSL is enabled for master, secure the Ratis communication as well with TLS

### Why are the changes needed?

Currently, when TLS is enabled for RPC, Ratis comms still goes in the clear - add support for TLS.
Note that currently this only supports GRPC, and not netty.

### Does this PR introduce _any_ user-facing change?
Secures ratis communication when TLS is enabled at master for rpc.

### How was this patch tested?
Local tests and additional unit tests added

Closes apache#2515 from mridulm/CELEBORN-1401-add-ratis-ssl-support.

Authored-by: Mridul Muralidharan <mridulatgmail.com>
Signed-off-by: Shuang <[email protected]>
  • Loading branch information
Mridul Muralidharan authored and RexXiong committed May 17, 2024
1 parent 8a10a2d commit a13d167
Show file tree
Hide file tree
Showing 11 changed files with 405 additions and 89 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ public TransportContext(
this.conf = conf;
this.msgHandler = msgHandler;
this.closeIdleConnections = closeIdleConnections;
this.sslFactory = createSslFactory();
this.sslFactory = SSLFactory.createSslFactory(conf);
this.channelsLimiter = channelsLimiter;
this.enableHeartbeat = enableHeartbeat;
this.source = source;
Expand Down Expand Up @@ -216,37 +216,6 @@ public TransportChannelHandler initializePipeline(
}
}

private SSLFactory createSslFactory() {
if (conf.sslEnabled()) {

if (conf.sslEnabledAndKeysAreValid()) {
return new SSLFactory.Builder()
.requestedProtocol(conf.sslProtocol())
.requestedCiphers(conf.sslRequestedCiphers())
.autoSslEnabled(conf.autoSslEnabled())
.keyStore(conf.sslKeyStore(), conf.sslKeyStorePassword())
.trustStore(
conf.sslTrustStore(),
conf.sslTrustStorePassword(),
conf.sslTrustStoreReloadingEnabled(),
conf.sslTrustStoreReloadIntervalMs())
.build();
} else {
logger.error(
"SSL encryption enabled but keyStore is not configured for "
+ conf.getModuleName()
+ "! Please ensure the configured keys are present.");
throw new IllegalArgumentException(
conf.getModuleName()
+ " SSL encryption enabled for "
+ conf.getModuleName()
+ " but keyStore not configured !");
}
} else {
return null;
}
}

private TransportChannelHandler createChannelHandler(
Channel channel, BaseMessageHandler msgHandler) {
TransportResponseHandler responseHandler = new TransportResponseHandler(conf, channel);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import java.security.cert.X509Certificate;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
Expand All @@ -47,6 +48,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.celeborn.common.network.util.TransportConf;
import org.apache.celeborn.common.util.JavaUtils;

/**
Expand Down Expand Up @@ -96,6 +98,18 @@ private void initJdkSslContext(final Builder b) throws IOException, GeneralSecur
this.jdkSslContext = createSSLContext(requestedProtocol, keyManagers, trustManagers);
}

public List<KeyManager> getKeyManagers() {
return null != keyManagers
? Collections.unmodifiableList(Arrays.asList(keyManagers))
: Collections.emptyList();
}

public List<TrustManager> getTrustManagers() {
return null != trustManagers
? Collections.unmodifiableList(Arrays.asList(trustManagers))
: Collections.emptyList();
}

/*
* As b.trustStore is null, credulousTrustStoreManagers will be used - and so all
* certs will be accepted - and hence self-signed cert from lifecycle manager will
Expand All @@ -119,7 +133,7 @@ private void configureAutoSsl(Builder b) {
}

public boolean hasKeyManagers() {
return null != keyManagers;
return null != keyManagers && keyManagers.length > 0;
}

public void destroy() {
Expand Down Expand Up @@ -327,7 +341,7 @@ private static TrustManager[] trustStoreManagers(
}
}

private static TrustManager[] defaultTrustManagers(File trustStore, String trustStorePassword)
public static TrustManager[] defaultTrustManagers(File trustStore, String trustStorePassword)
throws IOException, KeyStoreException, CertificateException, NoSuchAlgorithmException {
try (InputStream input = Files.asByteSource(trustStore).openStream()) {
KeyStore ks = KeyStore.getInstance(KeyStore.getDefaultType());
Expand Down Expand Up @@ -436,4 +450,35 @@ private static List<String> addIfSupported(String[] supported, String... names)
}
return enabled;
}

public static SSLFactory createSslFactory(TransportConf conf) {
if (conf.sslEnabled()) {

if (conf.sslEnabledAndKeysAreValid()) {
return new SSLFactory.Builder()
.requestedProtocol(conf.sslProtocol())
.requestedCiphers(conf.sslRequestedCiphers())
.autoSslEnabled(conf.autoSslEnabled())
.keyStore(conf.sslKeyStore(), conf.sslKeyStorePassword())
.trustStore(
conf.sslTrustStore(),
conf.sslTrustStorePassword(),
conf.sslTrustStoreReloadingEnabled(),
conf.sslTrustStoreReloadIntervalMs())
.build();
} else {
logger.error(
"SSL encryption enabled but keyStore is not configured for "
+ conf.getModuleName()
+ "! Please ensure the configured keys are present.");
throw new IllegalArgumentException(
conf.getModuleName()
+ " SSL encryption enabled for "
+ conf.getModuleName()
+ " but keyStore not configured !");
}
} else {
return null;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,19 +26,34 @@
import java.nio.file.StandardCopyOption;
import java.security.*;
import java.security.cert.Certificate;
import java.security.cert.CertificateEncodingException;
import java.security.cert.X509Certificate;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;

import javax.security.auth.x500.X500Principal;
import java.util.*;
import java.util.stream.Stream;

import org.apache.commons.io.FileUtils;
import org.bouncycastle.x509.X509V1CertificateGenerator;
import org.bouncycastle.asn1.x500.X500Name;
import org.bouncycastle.asn1.x509.BasicConstraints;
import org.bouncycastle.asn1.x509.Extension;
import org.bouncycastle.asn1.x509.GeneralName;
import org.bouncycastle.asn1.x509.GeneralNames;
import org.bouncycastle.cert.X509v3CertificateBuilder;
import org.bouncycastle.cert.jcajce.JcaX509CertificateConverter;
import org.bouncycastle.cert.jcajce.JcaX509CertificateHolder;
import org.bouncycastle.cert.jcajce.JcaX509v3CertificateBuilder;
import org.bouncycastle.jce.provider.BouncyCastleProvider;
import org.bouncycastle.operator.ContentSigner;
import org.bouncycastle.operator.jcajce.JcaContentSignerBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SslSampleConfigs {

private static final Logger LOG = LoggerFactory.getLogger(SslSampleConfigs.class);

static {
Security.addProvider(new BouncyCastleProvider());
}

public static final String DEFAULT_KEY_STORE_PATH = getResourceAsAbsolutePath("/ssl/server.jks");
public static final String SECOND_KEY_STORE_PATH =
getResourceAsAbsolutePath("/ssl/server_another.jks");
Expand Down Expand Up @@ -113,34 +128,95 @@ public static <T extends Certificate> void createTrustStore(
* @param algorithm the signing algorithm, eg "SHA1withRSA"
* @return the self-signed certificate
*/
public static X509Certificate generateCertificate(
String dn, KeyPair pair, int days, String algorithm) throws Exception {
return generateCertificate(dn, pair, days, algorithm, false, null, null, null);
}

/**
* Create a self-signed X.509 Certificate.
*
* @param dn the X.509 Distinguished Name, eg "CN=Test, L=London, C=GB"
* @param pair the KeyPair for the server
* @param days how many days from now the Certificate is valid for
* @param algorithm the signing algorithm, eg "SHA1withRSA"
* @param generateCaCert Is this request to generate a CA cert
* @param altNames Optional: Alternate names to be added to the cert - we add them as both
* hostnames and ip's.
* @param caKeyPair Optional: the KeyPair of the CA, to be used to sign this certificate. caCert
* should also be specified to use it
* @param caCert Optional: the CA cert, to be used to sign this certificate. caKeyPair should also
* be specified to use it
* @return the signed certificate (signed using ca if provided, else self-signed)
*/
@SuppressWarnings("deprecation")
public static X509Certificate generateCertificate(
String dn, KeyPair pair, int days, String algorithm)
throws CertificateEncodingException, InvalidKeyException, IllegalStateException,
NoSuchAlgorithmException, SignatureException {
String dn,
KeyPair pair,
int days,
String algorithm,
boolean generateCaCert,
String[] altNames,
KeyPair caKeyPair,
X509Certificate caCert)
throws Exception {

Date from = new Date();
Date to = new Date(from.getTime() + days * 86400000L);
BigInteger sn = new BigInteger(64, new SecureRandom());
KeyPair keyPair = pair;
X509V1CertificateGenerator certGen = new X509V1CertificateGenerator();
X500Principal dnName = new X500Principal(dn);

certGen.setSerialNumber(sn);
certGen.setIssuerDN(dnName);
certGen.setNotBefore(from);
certGen.setNotAfter(to);
certGen.setSubjectDN(dnName);
certGen.setPublicKey(keyPair.getPublic());
certGen.setSignatureAlgorithm(algorithm);

X509Certificate cert = certGen.generate(pair.getPrivate());
return cert;
X500Name subjectName = new X500Name(dn);

X500Name issuerName;
KeyPair signingKeyPair;

if (caKeyPair != null && caCert != null) {
issuerName = new JcaX509CertificateHolder(caCert).getSubject();
signingKeyPair = caKeyPair;
} else {
issuerName = subjectName;
// self signed
signingKeyPair = pair;
}

X509v3CertificateBuilder certBuilder =
new JcaX509v3CertificateBuilder(
issuerName, sn, from, to, new X500Name(dn), pair.getPublic());

if (null != altNames) {
Stream<GeneralName> dnsStream =
Arrays.stream(altNames).map(h -> new GeneralName(GeneralName.dNSName, h));
Stream<GeneralName> ipStream =
Arrays.stream(altNames)
.map(
h -> {
try {
return new GeneralName(GeneralName.iPAddress, h);
} catch (Exception ex) {
return null;
}
})
.filter(Objects::nonNull);

GeneralName[] arr = Stream.concat(dnsStream, ipStream).toArray(GeneralName[]::new);
GeneralNames names = new GeneralNames(arr);

certBuilder.addExtension(Extension.subjectAlternativeName, false, names);
LOG.info("Added subjectAlternativeName extension for hosts : " + Arrays.toString(altNames));
}

if (generateCaCert) {
certBuilder.addExtension(Extension.basicConstraints, true, new BasicConstraints(true));
LOG.info("Added CA cert extension");
}

ContentSigner signer =
new JcaContentSignerBuilder(algorithm).build(signingKeyPair.getPrivate());
return new JcaX509CertificateConverter().getCertificate(certBuilder.build(signer));
}

public static KeyPair generateKeyPair(String algorithm) throws NoSuchAlgorithmException {
KeyPairGenerator keyGen = KeyPairGenerator.getInstance(algorithm);
keyGen.initialize(1024);
keyGen.initialize(4096);
return keyGen.genKeyPair();
}

Expand Down Expand Up @@ -178,7 +254,7 @@ public static void createKeyStore(
}

private static KeyStore createEmptyKeyStore() throws GeneralSecurityException, IOException {
KeyStore ks = KeyStore.getInstance(KeyStore.getDefaultType());
KeyStore ks = KeyStore.getInstance("PKCS12");
ks.load(null, null); // initialize
return ks;
}
Expand Down
2 changes: 2 additions & 0 deletions docs/security.md
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ start="<!--begin-include-->"
end="<!--end-include-->"
!}

When SSL is enabled for `rpc_service`, Raft communication between masters are secured **only when** `celeborn.master.ha.ratis.raft.rpc.type` is set to `grpc`.

Note that `celeborn.ssl`, **without any module**, can be used to set SSL default values which applies to all modules.

Also note that `data` module at application side, maps to `push` and `fetch` at worker - hence, for SSL configuration, worker configuration for `push` and `fetch` should be compatible with each other and with `data` at application side.
Expand Down
7 changes: 7 additions & 0 deletions master/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,13 @@
<artifactId>jersey-test-framework-provider-jetty</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.celeborn</groupId>
<artifactId>celeborn-common_${scala.binary.version}</artifactId>
<version>${project.version}</version>
<type>test-jar</type>
<scope>test</scope>
</dependency>
</dependencies>

<build>
Expand Down
Loading

0 comments on commit a13d167

Please sign in to comment.