Skip to content

Commit

Permalink
Add support for copy file with subdirectories for ADLSGen2PinotFS (#1…
Browse files Browse the repository at this point in the history
…4860)

* fixCopyFile

* add tests

* modify to be in line with other PinotFs implementations.
  • Loading branch information
aishikbh authored Jan 22, 2025
1 parent f845a97 commit 1e21ffe
Show file tree
Hide file tree
Showing 2 changed files with 123 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -480,6 +480,10 @@ private static FileMetadata getFileMetadata(PathItem file) {
public void copyToLocalFile(URI srcUri, File dstFile)
throws Exception {
LOGGER.debug("copyToLocalFile is called with srcUri='{}', dstFile='{}'", srcUri, dstFile);

// Create parent directories if they don't exist.
FileUtils.forceMkdir(dstFile.getParentFile());

if (dstFile.exists()) {
if (dstFile.isDirectory()) {
FileUtils.deleteDirectory(dstFile);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,16 +29,20 @@
import com.azure.storage.file.datalake.models.DataLakeStorageException;
import com.azure.storage.file.datalake.models.PathItem;
import com.azure.storage.file.datalake.models.PathProperties;
import java.io.ByteArrayInputStream;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.net.URI;
import java.net.URISyntaxException;
import java.nio.file.Files;
import java.time.Instant;
import java.time.OffsetDateTime;
import java.time.ZoneOffset;
import java.util.HashMap;
import java.util.List;
import java.util.stream.Stream;
import org.apache.commons.io.FileUtils;
import org.apache.pinot.plugin.filesystem.ADLSGen2PinotFS;
import org.apache.pinot.plugin.filesystem.AzurePinotFSUtil;
import org.apache.pinot.spi.env.PinotConfiguration;
Expand All @@ -51,6 +55,9 @@
import org.testng.annotations.Test;

import static org.mockito.Mockito.*;
import static org.testng.AssertJUnit.assertFalse;
import static org.testng.AssertJUnit.assertTrue;
import static org.testng.internal.junit.ArrayAsserts.assertArrayEquals;


/**
Expand Down Expand Up @@ -445,4 +452,116 @@ public void open()
verify(_mockFileClient).openInputStream();
verify(_mockFileOpenInputStreamResult).getInputStream();
}

@Test
public void testCopyToLocalFileWithSubdirectories() throws Exception {
// Create a temporary file for the test
File tempDir = new File(System.getProperty("java.io.tmpdir"), "pinot_test");
tempDir.mkdirs();
File mockDstFile = new File(tempDir, "test_file.txt");

// Create parent directory
File parentFile = mockDstFile.getParentFile();
if (!parentFile.exists()) {
parentFile.mkdirs();
}

// Mock file stream
byte[] testData = "test data".getBytes();
InputStream mockInputStream = new ByteArrayInputStream(testData);
when(_mockFileSystemClient.getFileClient(any())).thenReturn(_mockFileClient);
when(_mockFileClient.openInputStream()).thenReturn(_mockFileOpenInputStreamResult);
when(_mockFileOpenInputStreamResult.getInputStream()).thenReturn(mockInputStream);

try {
// Execute
_adlsGen2PinotFsUnderTest.copyToLocalFile(_mockURI, mockDstFile);

// Verify file operations in order
verify(_mockFileSystemClient).getFileClient(AzurePinotFSUtil.convertUriToAzureStylePath(_mockURI));
verify(_mockFileClient).openInputStream();
verify(_mockFileOpenInputStreamResult).getInputStream();

// Verify file was created
assertTrue(mockDstFile.exists());

// Verify content was written correctly
byte[] writtenContent = Files.readAllBytes(mockDstFile.toPath());
assertArrayEquals(testData, writtenContent);
} finally {
// Cleanup
FileUtils.deleteQuietly(mockDstFile);
FileUtils.deleteQuietly(tempDir);
}
}

@Test
public void testCopyToLocalFileWithoutSubdirectories() throws Exception {
// Create a temporary file for the test
File tempFile = new File(System.getProperty("java.io.tmpdir"), "test_file.txt");

// Mock file stream
byte[] testData = "test data".getBytes();
InputStream mockInputStream = new ByteArrayInputStream(testData);
when(_mockFileSystemClient.getFileClient(any())).thenReturn(_mockFileClient);
when(_mockFileClient.openInputStream()).thenReturn(_mockFileOpenInputStreamResult);
when(_mockFileOpenInputStreamResult.getInputStream()).thenReturn(mockInputStream);

try {
// Execute
_adlsGen2PinotFsUnderTest.copyToLocalFile(_mockURI, tempFile);

// Verify file operations in order
verify(_mockFileSystemClient).getFileClient(AzurePinotFSUtil.convertUriToAzureStylePath(_mockURI));
verify(_mockFileClient).openInputStream();
verify(_mockFileOpenInputStreamResult).getInputStream();

// Verify file was created
assertTrue(tempFile.exists());

// Verify content was written correctly
byte[] writtenContent = Files.readAllBytes(tempFile.toPath());
assertArrayEquals(testData, writtenContent);
} finally {
// Cleanup
FileUtils.deleteQuietly(tempFile);
}
}

@Test
public void testCopyToLocalFileExistingDirectory() throws Exception {
// Create a temporary directory for the test
File tempDir = new File(System.getProperty("java.io.tmpdir"), "existing_dir");
tempDir.mkdirs();

// Mock file stream
byte[] testData = "test data".getBytes();
InputStream mockInputStream = new ByteArrayInputStream(testData);
when(_mockFileSystemClient.getFileClient(any())).thenReturn(_mockFileClient);
when(_mockFileClient.openInputStream()).thenReturn(_mockFileOpenInputStreamResult);
when(_mockFileOpenInputStreamResult.getInputStream()).thenReturn(mockInputStream);

try {
// Execute
_adlsGen2PinotFsUnderTest.copyToLocalFile(_mockURI, tempDir);

// Verify file operations in order
verify(_mockFileSystemClient).getFileClient(AzurePinotFSUtil.convertUriToAzureStylePath(_mockURI));
verify(_mockFileClient).openInputStream();
verify(_mockFileOpenInputStreamResult).getInputStream();

// Verify directory was overwritten with file
assertTrue(tempDir.exists());
assertFalse(tempDir.isDirectory());

// Verify content was written correctly
byte[] writtenContent = Files.readAllBytes(tempDir.toPath());
assertArrayEquals(testData, writtenContent);
} finally {
// Cleanup
if (tempDir.exists()) {
FileUtils.deleteQuietly(tempDir);
}
}
}
}

0 comments on commit 1e21ffe

Please sign in to comment.