Skip to content

Commit

Permalink
[ALLUXIO-3135] Allow ufs sync to have out-of-sync dirs, for mount poi…
Browse files Browse the repository at this point in the history
…nts (#6998)

* Add tests for ufs syncing nested mount point

* Add ufs sync nested mount test

* Implement containsMountPoint to mount table

* Prevent ufs sync from removing mount points

* Improve code and comments

* Clean up code
  • Loading branch information
gpang authored and calvinjia committed Mar 26, 2018
1 parent 03148d7 commit d4985d1
Show file tree
Hide file tree
Showing 4 changed files with 244 additions and 120 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3127,69 +3127,18 @@ private boolean syncMetadata(JournalContext journalContext, LockedInodePath inod

// Set to true if ufs metadata must be loaded.
boolean loadMetadata = false;

// Set to true if the given inode was deleted.
boolean deletedInode = false;

DeleteOptions syncDeleteOptions =
DeleteOptions.defaults().setRecursive(true).setAlluxioOnly(true).setUnchecked(true);

try {
if (!inodePath.fullPathExists()) {
// The requested path does not exist in Alluxio, so just load metadata.
loadMetadata = true;
} else {
// The requested path already exists in Alluxio.
Inode<?> inode = inodePath.getInode();

if (inode instanceof InodeFile && !((InodeFile) inode).isCompleted()) {
// Do not sync an incomplete file, since the UFS file is expected to not exist.
return false;
}
if (inode.getPersistenceState() == PersistenceState.TO_BE_PERSISTED) {
// Do not sync a file in the process of being persisted, since the UFS file is being
// written.
return false;
}

MountTable.Resolution resolution = mMountTable.resolve(inodePath.getUri());
AlluxioURI ufsUri = resolution.getUri();
UnderFileSystem ufs = resolution.getUfs();
String ufsFingerprint = ufs.getFingerprint(ufsUri.toString());
boolean isMountPoint = mMountTable.isMountPoint(inodePath.getUri());

UfsSyncUtils.SyncPlan syncPlan =
UfsSyncUtils.computeSyncPlan(inode, ufsFingerprint, isMountPoint);

if (syncPlan.toUpdateDirectory()) {
// Fingerprints only consider permissions for directory inodes.
UfsStatus ufsStatus = ufs.getStatus(ufsUri.toString());
SetAttributeOptions options =
SetAttributeOptions.defaults().setOwner(ufsStatus.getOwner())
.setGroup(ufsStatus.getGroup()).setMode(ufsStatus.getMode())
.setUfsFingerprint(ufsFingerprint);
long opTimeMs = System.currentTimeMillis();
// use replayed, since updating UFS is not desired.
setAttributeInternal(inodePath, true, opTimeMs, options);
journalSetAttribute(inodePath, opTimeMs, options, journalContext);
}
if (syncPlan.toDelete()) {
try {
deleteInternal(inodePath, false, System.currentTimeMillis(), syncDeleteOptions,
journalContext, blockDeletionContext);
deletedInode = true;
} catch (DirectoryNotEmptyException | IOException e) {
// Should not happen, since it is an unchecked delete.
LOG.error("Unexpected error for unchecked delete.", e);
}
}
if (syncPlan.toLoadMetadata()) {
loadMetadata = true;
}
if (syncPlan.toSyncChildren()) {
loadMetadata = syncChildrenMetadata(journalContext, inodePath, syncDescendantType,
blockDeletionContext);
}
SyncResult result =
syncInodeMetadata(journalContext, inodePath, syncDescendantType, blockDeletionContext);
deletedInode = result.getDeletedInode();
loadMetadata = result.getLoadMetadata();
}
} catch (Exception e) {
LOG.error("Failed to remove out-of-sync metadata for path: {}", inodePath.getUri(), e);
Expand Down Expand Up @@ -3220,86 +3169,148 @@ private boolean syncMetadata(JournalContext journalContext, LockedInodePath inod
return true;
}

private boolean syncChildrenMetadata(JournalContext journalContext, LockedInodePath inodePath,
DescendantType syncDescendantType, BlockDeletionContext blockDeletionContext)
throws FileDoesNotExistException, InvalidPathException, IOException,
DirectoryNotEmptyException {
if (syncDescendantType == DescendantType.NONE) {
return false;
/**
* This class represents the result for a sync. The following are returned:
* - deleted: if true, the inode was already deleted as part of the syncing process
* - loadMetadata: if true, load metadata must be called (the last step of the full sync process)
*/
private static class SyncResult {
private boolean mLoadMetadata;
private boolean mDeletedInode;

static SyncResult defaults() {
return new SyncResult(false, false);
}

SyncResult(boolean loadMetadata, boolean deletedInode) {
mLoadMetadata = loadMetadata;
mDeletedInode = deletedInode;
}

boolean getLoadMetadata() {
return mLoadMetadata;
}
MountTable.Resolution resolution = mMountTable.resolve(inodePath.getUri());
AlluxioURI ufsUri = resolution.getUri();
UnderFileSystem ufs = resolution.getUfs();

boolean getDeletedInode() {
return mDeletedInode;
}
}

/**
* Syncs an inode with the UFS.
*
* @param journalContext the journal context
* @param inodePath the Alluxio inode path to sync with UFS
* @param syncDescendantType how to sync descendants
* @param blockDeletionContext the block deletion context
* @return the result of the sync, including if the inode was deleted, and if further load
* metadata is required
*/
private SyncResult syncInodeMetadata(JournalContext journalContext, LockedInodePath inodePath,
DescendantType syncDescendantType, BlockDeletionContext blockDeletionContext)
throws FileDoesNotExistException, InvalidPathException, IOException, AccessControlException {
// Set to true if ufs metadata must be loaded.
boolean loadMetadata = false;
// Set to true if the given inode was deleted.
boolean deletedInode = false;
// The options for deleting.
DeleteOptions syncDeleteOptions =
DeleteOptions.defaults().setRecursive(true).setAlluxioOnly(true).setUnchecked(true);

// The requested path already exists in Alluxio.
Inode<?> inode = inodePath.getInode();

UfsStatus[] listStatus = ufs.listStatus(ufsUri.toString());
if (inode instanceof InodeFile && !((InodeFile) inode).isCompleted()) {
// Do not sync an incomplete file, since the UFS file is expected to not exist.
return SyncResult.defaults();
}
if (inode.getPersistenceState() == PersistenceState.TO_BE_PERSISTED) {
// Do not sync a file in the process of being persisted, since the UFS file is being
// written.
return SyncResult.defaults();
}

if (listStatus != null) {
// maps children name to up-to-date ufs fingerprint
Map<String, String> ufsChildFingerprints = new HashMap<>();
// maps children name to inode
Map<String, Inode<?>> inodeChildren = new HashMap<>();
MountTable.Resolution resolution = mMountTable.resolve(inodePath.getUri());
AlluxioURI ufsUri = resolution.getUri();
UnderFileSystem ufs = resolution.getUfs();
String ufsFingerprint = ufs.getFingerprint(ufsUri.toString());
boolean containsMountPoint = mMountTable.containsMountPoint(inodePath.getUri());

UfsSyncUtils.SyncPlan syncPlan =
UfsSyncUtils.computeSyncPlan(inode, ufsFingerprint, containsMountPoint);

for (UfsStatus ufsChildStatus : listStatus) {
ufsChildFingerprints.put(ufsChildStatus.getName(),
Fingerprint.create(ufs.getUnderFSType(), ufsChildStatus).serialize());
if (syncPlan.toUpdateDirectory()) {
// Fingerprints only consider permissions for directory inodes.
UfsStatus ufsStatus = null;
try {
ufsStatus = ufs.getStatus(ufsUri.toString());
} catch (IOException e) {
// Ignore, since this directory inode could be out of sync (contains a mount point)
}
InodeDirectory inodeDir = (InodeDirectory) inode;
for (Inode<?> child : inodeDir.getChildren()) {
inodeChildren.put(child.getName(), child);
if (ufsStatus != null) {
SetAttributeOptions options =
SetAttributeOptions.defaults().setOwner(ufsStatus.getOwner())
.setGroup(ufsStatus.getGroup()).setMode(ufsStatus.getMode())
.setUfsFingerprint(ufsFingerprint);
long opTimeMs = System.currentTimeMillis();
// use replayed, since updating UFS is not desired.
setAttributeInternal(inodePath, true, opTimeMs, options);
journalSetAttribute(inodePath, opTimeMs, options, journalContext);
}

// Iterate over ufs children and process children which do not exist in Alluxio.
for (Map.Entry<String, String> ufsEntry : ufsChildFingerprints.entrySet()) {
if (!inodeChildren.containsKey(ufsEntry.getKey()) && !PathUtils
.isTemporaryFileName(ufsEntry.getKey())) {
// Ufs child exists, but Alluxio child does not. Must load metadata.
loadMetadata = true;
break;
}
if (syncPlan.toDelete()) {
try {
deleteInternal(inodePath, false, System.currentTimeMillis(), syncDeleteOptions,
journalContext, blockDeletionContext);
deletedInode = true;
} catch (DirectoryNotEmptyException | IOException e) {
// Should not happen, since it is an unchecked delete.
LOG.error("Unexpected error for unchecked delete.", e);
}
}
if (syncPlan.toLoadMetadata()) {
loadMetadata = true;
}
if (syncPlan.toSyncChildren() && inode instanceof InodeDirectory
&& syncDescendantType != DescendantType.NONE) {
UfsStatus[] listStatus = ufs.listStatus(ufsUri.toString());
if (listStatus != null) {
InodeDirectory inodeDir = (InodeDirectory) inode;
// maps children name to inode
Map<String, Inode<?>> inodeChildren = new HashMap<>();
for (Inode<?> child : inodeDir.getChildren()) {
inodeChildren.put(child.getName(), child);
}
}

// Iterate over Alluxio children and process persisted children.
for (Map.Entry<String, Inode<?>> inodeEntry : inodeChildren.entrySet()) {
if (!inodeEntry.getValue().isPersisted()) {
// Ignore non-persisted inodes.
continue;
for (UfsStatus ufsChildStatus : listStatus) {
if (!inodeChildren.containsKey(ufsChildStatus.getName()) && !PathUtils
.isTemporaryFileName(ufsChildStatus.getName())) {
// Ufs child exists, but Alluxio child does not. Must load metadata.
loadMetadata = true;
break;
}
}

String ufsFingerprint = ufsChildFingerprints.get(inodeEntry.getKey());
boolean deleteChild =
!UfsSyncUtils.inodeUfsIsSynced(inodeEntry.getValue(), ufsFingerprint);

if (deleteChild) {
TempInodePathForDescendant tempInodePath =
new TempInodePathForDescendant(inodePath);
tempInodePath.setDescendant(inodeEntry.getValue(),
inodePath.getUri().join(inodeEntry.getKey()));

deleteInternal(tempInodePath, false, System.currentTimeMillis(), syncDeleteOptions,
journalContext, blockDeletionContext);
// Must load metadata afterwards.
loadMetadata = true;
} else if (inodeEntry.getValue().isDirectory()) {
// Recursively sync for this directory.
TempInodePathForDescendant tempInodePath =
new TempInodePathForDescendant(inodePath);
tempInodePath.setDescendant(inodeEntry.getValue(),
inodePath.getUri().join(inodeEntry.getKey()));

if (syncDescendantType == DescendantType.ALL) {
// Recursively sync children
loadMetadata |= syncChildrenMetadata(journalContext, tempInodePath, DescendantType.ALL,
blockDeletionContext);
// Iterate over Alluxio children and process persisted children.
for (Map.Entry<String, Inode<?>> inodeEntry : inodeChildren.entrySet()) {
if (!inodeEntry.getValue().isPersisted()) {
// Ignore non-persisted inodes.
continue;
}
TempInodePathForDescendant tempInodePath = new TempInodePathForDescendant(inodePath);
tempInodePath
.setDescendant(inodeEntry.getValue(), inodePath.getUri().join(inodeEntry.getKey()));

// Recursively sync children
if (syncDescendantType != DescendantType.ALL) {
syncDescendantType = DescendantType.NONE;
}
loadMetadata |= syncInodeMetadata(journalContext, tempInodePath,
syncDescendantType, blockDeletionContext).getLoadMetadata();
}
}
}
return loadMetadata;
return new SyncResult(loadMetadata, deletedInode);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -260,6 +260,24 @@ public Map<String, MountInfo> getMountTable() {
}
}

/**
* @param uri the Alluxio uri to check
* @return true if the specified uri is mount point, or has a descendant which is a mount point
*/
public boolean containsMountPoint(AlluxioURI uri) throws InvalidPathException {
String path = uri.getPath();

try (LockResource r = new LockResource(mReadLock)) {
for (Map.Entry<String, MountInfo> entry : mMountTable.entrySet()) {
String mountPath = entry.getKey();
if (PathUtils.hasPrefix(mountPath, path)) {
return true;
}
}
}
return false;
}

/**
* @param uri an Alluxio path URI
* @return whether the given uri is a mount point
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,11 @@ private UfsSyncUtils() {} // prevent instantiation
*
* @param inode the inode to sync
* @param ufsFingerprint the ufs fingerprint to check for the sync
* @param isMountPoint true if this inode is a mount point, false otherwise
* @param containsMountPoint true if this inode contains a mount point, false otherwise
* @return a {@link SyncPlan} describing how to sync the inode with the ufs
*/
public static SyncPlan computeSyncPlan(Inode inode, String ufsFingerprint, boolean isMountPoint) {
public static SyncPlan computeSyncPlan(Inode inode, String ufsFingerprint,
boolean containsMountPoint) {
boolean isSynced = inodeUfsIsSynced(inode, ufsFingerprint);
boolean ufsExists = !Constants.INVALID_UFS_FINGERPRINT.equals(ufsFingerprint);
boolean ufsIsDir = ufsFingerprint != null && Fingerprint.Type.DIRECTORY.name()
Expand All @@ -45,9 +46,9 @@ public static SyncPlan computeSyncPlan(Inode inode, String ufsFingerprint, boole
// Alluxio inode is not synced with UFS, so update the inode metadata
// Updating an inode is achieved by deleting the inode, and then loading metadata.

if (inode.isDirectory() && (isMountPoint || ufsIsDir)) {
if (inode.isDirectory() && (containsMountPoint || ufsIsDir)) {
// Instead of deleting and then loading metadata to update, try to update directly
// - mount points should not be deleted
// - mount points (or paths with mount point descendants) should not be deleted
// - directory permissions can be updated without removing the inode
if (inode.getParentId() != InodeTree.NO_PARENT) {
// Only update the inode if it is not the root directory. The root directory is a special
Expand Down
Loading

0 comments on commit d4985d1

Please sign in to comment.