Skip to content

Commit

Permalink
added support for verification (MD5 on data only), which required new…
Browse files Browse the repository at this point in the history
… reverseFilter path for plug-ins, which in turn required refactoring SyncObject so it can be originated in either source or target
  • Loading branch information
twincitiesguy committed Apr 3, 2015
1 parent f2f7031 commit 8728793
Show file tree
Hide file tree
Showing 57 changed files with 1,868 additions and 770 deletions.
9 changes: 0 additions & 9 deletions src/main/java/com/emc/vipr/sync/SyncPlugin.java
Original file line number Diff line number Diff line change
Expand Up @@ -144,15 +144,6 @@ public String summarizeConfig() {
return summary.toString();
}

protected void safeClose(Closeable closeable) {
try {
if (closeable != null) closeable.close();
} catch (Throwable t) {
l4j.warn("could not close resource", t);
}

}

protected <T> T time(Function<T> function, String name) {
return TimingUtil.time(this, name, function);
}
Expand Down
75 changes: 50 additions & 25 deletions src/main/java/com/emc/vipr/sync/ViPRSync.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,12 @@
package com.emc.vipr.sync;

import com.emc.vipr.sync.filter.SyncFilter;
import com.emc.vipr.sync.model.SyncObject;
import com.emc.vipr.sync.model.object.SyncObject;
import com.emc.vipr.sync.source.SyncSource;
import com.emc.vipr.sync.target.SyncTarget;
import com.emc.vipr.sync.util.ConfigurationException;
import com.emc.vipr.sync.util.OptionBuilder;
import com.emc.vipr.sync.util.SyncUtil;
import com.emc.vipr.sync.util.TimingUtil;
import org.apache.commons.cli.*;
import org.apache.log4j.Level;
Expand Down Expand Up @@ -48,6 +49,7 @@
*/
public class ViPRSync implements Runnable {
private static final Logger l4j = Logger.getLogger(ViPRSync.class);
private static final Logger stackTraceLog = Logger.getLogger(ViPRSync.class.getName() + ".stackTrace");

private static final String VERSION_OPTION = "version";
private static final String VERSION_DESC = "Displays package version";
Expand Down Expand Up @@ -115,6 +117,12 @@ public class ViPRSync implements Runnable {
private static final String FORGET_FAILED_OPTION = "forget-failed";
private static final String FORGET_FAILED_DESC = "By default, ViPRSync tracks all failed objects and displays a summary of failures when finished. To save memory in large migrations, this option will disable this summary. If you use this option, be sure your logging is at an appropriate level and that you are capturing failures in a log file.";

private static final String VERIFY_OPTION = "verify";
private static final String VERIFY_DESC = "Supported source plugins will delete each source object once it is successfully synced (does not include directories). Use this option with care! Be sure log levels are appropriate to capture transferred (source deleted) objects.";

private static final String VERIFY_ONLY_OPTION = "verify-only";
private static final String VERIFY_ONLY_DESC = "Supported source plugins will delete each source object once it is successfully synced (does not include directories). Use this option with care! Be sure log levels are appropriate to capture transferred (source deleted) objects.";

private static final String DELETE_SOURCE_OPTION = "delete-source";
private static final String DELETE_SOURCE_DESC = "Supported source plugins will delete each source object once it is successfully synced (does not include directories). Use this option with care! Be sure log levels are appropriate to capture transferred (source deleted) objects.";

Expand Down Expand Up @@ -276,6 +284,9 @@ protected static ViPRSync cliBootstrap(String[] args) throws ParseException {
// configure failed object tracking
if (line.hasOption(FORGET_FAILED_OPTION)) sync.setRememberFailed(false);

if (line.hasOption(VERIFY_OPTION)) sync.setVerify(true);
if (line.hasOption(VERIFY_ONLY_OPTION)) sync.setVerifyOnly(true);

// configure whether to delete source objects after they are successfully synced
if (line.hasOption(DELETE_SOURCE_OPTION)) sync.setDeleteSource(true);

Expand Down Expand Up @@ -370,6 +381,8 @@ protected static Options mainOptions() {
options.addOption(new OptionBuilder().withLongOpt(TIMING_WINDOW_OPTION).withDescription(TIMING_WINDOW_DESC)
.hasArg().withArgName(TIMING_WINDOW_ARG_NAME).create());
options.addOption(new OptionBuilder().withLongOpt(FORGET_FAILED_OPTION).withDescription(FORGET_FAILED_DESC).create());
options.addOption(new OptionBuilder().withLongOpt(VERIFY_OPTION).withDescription(VERIFY_DESC).create());
options.addOption(new OptionBuilder().withLongOpt(VERIFY_ONLY_OPTION).withDescription(VERIFY_ONLY_DESC).create());
options.addOption(new OptionBuilder().withLongOpt(DELETE_SOURCE_OPTION).withDescription(DELETE_SOURCE_DESC).create());

OptionGroup loggingOpts = new OptionGroup();
Expand Down Expand Up @@ -429,6 +442,8 @@ protected static String versionLine() {
protected boolean timingsEnabled = false;
protected int timingWindow = 10000;
protected boolean rememberFailed = true;
protected boolean verify = false;
protected boolean verifyOnly = false;
protected boolean deleteSource = false;
protected String logLevel;

Expand Down Expand Up @@ -573,6 +588,8 @@ public String summarizeConfig() {
summary.append(" - timingsEnabled: ").append(timingsEnabled).append("\n");
summary.append(" - timingWindow: ").append(timingWindow).append("\n");
summary.append(" - rememberFailed: ").append(rememberFailed).append("\n");
summary.append(" - verify: ").append(verify).append("\n");
summary.append(" - verifyOnly: ").append(verifyOnly).append("\n");
summary.append(" - deleteSource: ").append(deleteSource).append("\n");
summary.append(" - logLevel: ").append(logLevel).append("\n");
summary.append("Source: ").append(source.summarizeConfig());
Expand Down Expand Up @@ -627,31 +644,14 @@ protected synchronized void complete(SyncObject syncObject) {
* @param t the error that caused the failure.
*/
protected synchronized void failed(SyncObject syncObject, Throwable t) {
LogMF.warn(l4j, "O--! object {0} failed: {1}", syncObject, getCause(t));
if (l4j.isDebugEnabled()) l4j.debug(summarize(t));
LogMF.warn(l4j, "O--! object {0} failed: {1}", syncObject, SyncUtil.getCause(t));
stackTraceLog.warn(SyncUtil.summarize(t));
failedCount++;
if (rememberFailed) {
failedObjects.add(syncObject);
}
}

protected Throwable getCause(Throwable t) {
Throwable cause = t;
while (cause.getCause() != null) cause = cause.getCause();
return cause;
}

protected String summarize(Throwable t) {
Throwable cause = getCause(t);
StringBuilder summary = new StringBuilder();
summary.append(MessageFormat.format("[{0}] {1}", t, cause));
StackTraceElement[] elements = cause.getStackTrace();
for (int i = 0; i < 15 && i < elements.length; i++) {
summary.append("\n at ").append(elements[i]);
}
return summary.toString();
}

protected void cleanup() {
source.cleanup();
for (SyncFilter filter : filters) {
Expand Down Expand Up @@ -752,6 +752,22 @@ public void setRememberFailed(boolean rememberFailed) {
this.rememberFailed = rememberFailed;
}

public boolean isVerify() {
return verify;
}

public void setVerify(boolean verify) {
this.verify = verify;
}

public boolean isVerifyOnly() {
return verifyOnly;
}

public void setVerifyOnly(boolean verifyOnly) {
this.verifyOnly = verifyOnly;
}

public boolean isDeleteSource() {
return deleteSource;
}
Expand Down Expand Up @@ -801,7 +817,7 @@ public void run() {
LogMF.debug(l4j, "<<<< finished querying children of {0}", syncObject);
}
} catch (Throwable t) {
LogMF.warn(l4j, ">>!! querying children of {0} failed: {1}", syncObject, summarize(t));
LogMF.warn(l4j, ">>!! querying children of {0} failed: {1}", syncObject, SyncUtil.summarize(t));
}
}
}
Expand All @@ -818,11 +834,20 @@ public SyncTask(SyncSource<T> syncSource, T syncObject) {
@Override
public void run() {
try {
LogMF.debug(l4j, "O--+ syncing object {0}", syncObject);
syncSource.sync(syncObject, firstFilter);
if (!verifyOnly) {
LogMF.debug(l4j, "O--+ syncing object {0}", syncObject);
syncSource.sync(syncObject, firstFilter);
LogMF.info(l4j, "O--O finished syncing object {0} ({1} bytes transferred)",
syncObject, syncObject.getBytesRead());
}

if (verify || verifyOnly) {
LogMF.debug(l4j, "O==? verifying object {0}", syncObject);
syncSource.verify(syncObject, firstFilter);
LogMF.info(l4j, "O==O verification successful for {0}", syncObject);
}

complete(syncObject);
LogMF.info(l4j, "O--O finished syncing object {0} ({1} bytes transferred)",
syncObject, syncObject.getBytesRead());

try { // delete object if the source supports deletion (implements the delete() method)
if (deleteSource) {
Expand Down
10 changes: 8 additions & 2 deletions src/main/java/com/emc/vipr/sync/filter/AclMappingFilter.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import com.emc.vipr.sync.CommonOptions;
import com.emc.vipr.sync.model.SyncAcl;
import com.emc.vipr.sync.model.SyncMetadata;
import com.emc.vipr.sync.model.SyncObject;
import com.emc.vipr.sync.model.object.SyncObject;
import com.emc.vipr.sync.source.SyncSource;
import com.emc.vipr.sync.target.SyncTarget;
import com.emc.vipr.sync.util.ConfigurationException;
Expand Down Expand Up @@ -162,7 +162,7 @@ else if (key.equals(PERMISSION)) {
}

@Override
public void filter(SyncObject<?> obj) {
public void filter(SyncObject obj) {
SyncMetadata meta = obj.getMetadata();
if (meta != null) {
SyncAcl acl = meta.getAcl();
Expand Down Expand Up @@ -253,6 +253,12 @@ public void filter(SyncObject<?> obj) {
getNext().filter(obj);
}

// TODO: if verification ever includes ACLs, reverse the ACL map here
@Override
public SyncObject reverseFilter(SyncObject obj) {
return getNext().reverseFilter(obj);
}

@Override
public String getName() {
return "ACL Mapper";
Expand Down
144 changes: 8 additions & 136 deletions src/main/java/com/emc/vipr/sync/filter/DecryptionFilter.java
Original file line number Diff line number Diff line change
@@ -1,22 +1,20 @@
package com.emc.vipr.sync.filter;

import com.emc.vipr.sync.model.SyncMetadata;
import com.emc.vipr.sync.model.SyncObject;
import com.emc.vipr.sync.model.object.SyncObject;
import com.emc.vipr.sync.model.object.DecryptedSyncObject;
import com.emc.vipr.sync.source.SyncSource;
import com.emc.vipr.sync.target.SyncTarget;
import com.emc.vipr.sync.util.ConfigurationException;
import com.emc.vipr.sync.util.OptionBuilder;
import com.emc.vipr.sync.util.TransformUtil;
import com.emc.vipr.transform.InputTransform;
import com.emc.vipr.transform.TransformConstants;
import com.emc.vipr.transform.TransformFactory;
import com.emc.vipr.transform.encryption.KeyStoreEncryptionFactory;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.Options;
import org.apache.log4j.Logger;

import java.io.FileInputStream;
import java.io.InputStream;
import java.security.KeyStore;
import java.util.Date;
import java.util.Enumeration;
Expand Down Expand Up @@ -102,7 +100,7 @@ public void configure(SyncSource source, Iterator<SyncFilter> filters, SyncTarge
* Decryption is based on the ViPR object SDK encryption standard (https://community.emc.com/docs/DOC-34465).
*/
@Override
public void filter(SyncObject<?> obj) {
public void filter(SyncObject obj) {
if (obj.isDirectory()) {
// we can only decrypt data objects
l4j.debug("skipping directory " + obj);
Expand Down Expand Up @@ -145,6 +143,11 @@ public void filter(SyncObject<?> obj) {
}
}

@Override
public SyncObject reverseFilter(SyncObject obj) {
throw new UnsupportedOperationException(getClass().getSimpleName() + " does not yet support reverse filters (verification)");
}

@Override
public String getName() {
return "Decryption Filter";
Expand Down Expand Up @@ -197,135 +200,4 @@ public boolean isUpdateMtime() {
public void setUpdateMtime(boolean updateMtime) {
this.updateMtime = updateMtime;
}

private class DecryptedSyncObject extends SyncObject {
private SyncObject delegate;
private InputTransform transform;
private boolean metadataComplete = false;

@SuppressWarnings("unchecked")
public DecryptedSyncObject(SyncObject delegate, InputTransform transform) {
super(delegate.getRawSourceIdentifier(), delegate.getSourceIdentifier(),
delegate.getRelativePath(), delegate.isDirectory());
this.delegate = delegate;
this.transform = transform;

// set the decrypted size
String decryptedSize = delegate.getMetadata().getUserMetadataValue(TransformConstants.META_ENCRYPTION_UNENC_SIZE);
if (decryptedSize == null)
throw new RuntimeException("encrypted object missing metadata field: " + TransformConstants.META_ENCRYPTION_UNENC_SIZE);

delegate.getMetadata().setSize(Long.parseLong(decryptedSize));
}

@Override
protected void loadObject() {
// calling getMetadata() will call this method on delegate
}

@Override
protected InputStream createSourceInputStream() {
return transform.getDecodedInputStream();
}

@Override
public Object getRawSourceIdentifier() {
return delegate.getRawSourceIdentifier();
}

@Override
public String getSourceIdentifier() {
return delegate.getSourceIdentifier();
}

@Override
public String getRelativePath() {
return delegate.getRelativePath();
}

@Override
public boolean isDirectory() {
return delegate.isDirectory();
}

@Override
public String getTargetIdentifier() {
return delegate.getTargetIdentifier();
}

@Override
public synchronized SyncMetadata getMetadata() {
if (!metadataComplete) {
try {
Map<String, String> decryptedMetadata = transform.getDecodedMetadata();
SyncMetadata objMetadata = delegate.getMetadata();

// remove metadata keys if necessary
for (String key : objMetadata.getUserMetadata().keySet()) {
if (!decryptedMetadata.containsKey(key)) objMetadata.getUserMetadata().remove(key);
}

// apply decrypted metadata
for (String key : decryptedMetadata.keySet()) {
objMetadata.setUserMetadataValue(key, decryptedMetadata.get(key));
}

// TODO: remove when transforms remove their own metadata fields
for (String key : TransformUtil.ENCRYPTION_METADATA_KEYS) {
objMetadata.getUserMetadata().remove(key);
}

// TODO: remove when transforms automatically modify the transform spec
String transformSpec = objMetadata.getUserMetadataValue(TransformConstants.META_TRANSFORM_MODE);
int pipeIndex = transformSpec.indexOf("|");
if (pipeIndex > 0) {
objMetadata.setUserMetadataValue(TransformConstants.META_TRANSFORM_MODE,
transformSpec.substring(0, pipeIndex));
} else {
objMetadata.getUserMetadata().remove(TransformConstants.META_TRANSFORM_MODE);
}

metadataComplete = true;
} catch (IllegalStateException e) {
l4j.debug("could not get decoded metadata - assuming object has not been streamed", e);
}
}
return delegate.getMetadata();
}

@Override
public boolean requiresPostStreamMetadataUpdate() {
return true;
}

@Override
public void setTargetIdentifier(String targetIdentifier) {
delegate.setTargetIdentifier(targetIdentifier);
}

@Override
public void setMetadata(SyncMetadata metadata) {
delegate.setMetadata(metadata);
}

@Override
public long getBytesRead() {
return delegate.getBytesRead();
}

@Override
public String toString() {
return delegate.toString();
}

@Override
public boolean equals(Object o) {
return delegate.equals(o);
}

@Override
public int hashCode() {
return delegate.hashCode();
}
}
}
Loading

0 comments on commit 8728793

Please sign in to comment.