From 550a042ccdc279acc3231e5a97e2f3a912260011 Mon Sep 17 00:00:00 2001 From: kangli Date: Mon, 20 Apr 2020 16:26:49 -0700 Subject: [PATCH] Add key iterator for HaloDB --- src/main/java/com/oath/halodb/HaloDB.java | 4 + .../com/oath/halodb/HaloDBKeyIterator.java | 116 ++++++++++++++++++ src/main/java/com/oath/halodb/RecordKey.java | 31 +++++ .../oath/halodb/HaloDBKeyIteratorTest.java | 97 +++++++++++++++ 4 files changed, 248 insertions(+) create mode 100644 src/main/java/com/oath/halodb/HaloDBKeyIterator.java create mode 100644 src/main/java/com/oath/halodb/RecordKey.java create mode 100644 src/test/java/com/oath/halodb/HaloDBKeyIteratorTest.java diff --git a/src/main/java/com/oath/halodb/HaloDB.java b/src/main/java/com/oath/halodb/HaloDB.java index c16d304..b0ea540 100644 --- a/src/main/java/com/oath/halodb/HaloDB.java +++ b/src/main/java/com/oath/halodb/HaloDB.java @@ -80,6 +80,10 @@ public HaloDBIterator newIterator() throws HaloDBException { return new HaloDBIterator(dbInternal); } + public HaloDBKeyIterator newKeyIterator() { + return new HaloDBKeyIterator(dbInternal); + } + public void pauseCompaction() throws HaloDBException { try { dbInternal.pauseCompaction(); diff --git a/src/main/java/com/oath/halodb/HaloDBKeyIterator.java b/src/main/java/com/oath/halodb/HaloDBKeyIterator.java new file mode 100644 index 0000000..ed854ca --- /dev/null +++ b/src/main/java/com/oath/halodb/HaloDBKeyIterator.java @@ -0,0 +1,116 @@ +package com.oath.halodb; + +import java.io.IOException; +import java.nio.channels.ClosedChannelException; +import java.util.*; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class HaloDBKeyIterator implements Iterator{ + private static final Logger logger = LoggerFactory.getLogger(HaloDBIterator.class); + + private Iterator outer; + private Iterator inner; + private HaloDBFile currentFile; + + private RecordKey next; + + private final HaloDBInternal dbInternal; + + HaloDBKeyIterator(HaloDBInternal dbInternal) { + this.dbInternal = dbInternal; + outer = dbInternal.listDataFileIds().iterator(); + } + + @Override + public boolean hasNext() { + if (next != null) { + return true; + } + + try { + // inner == null means this is the first time hasNext() is called. + // use moveToNextFile() to move to the first file. + if (inner == null && !moveToNextFile()) { + return false; + } + + do { + if (readNextRecord()) { + return true; + } + } while (moveToNextFile()); + + return false; + + } catch (IOException e) { + logger.error("Error in Iterator", e); + return false; + } + } + + @Override + public RecordKey next() { + if (hasNext()) { + RecordKey key = next; + next = null; + return key; + } + throw new NoSuchElementException(); + } + + private boolean moveToNextFile() throws IOException { + while (outer.hasNext()) { + int fileId = outer.next(); + currentFile = dbInternal.getHaloDBFile(fileId); + if (currentFile != null) { + try { + inner = currentFile.getIndexFile().newIterator(); + return true; + } catch (ClosedChannelException e) { + if (dbInternal.isClosing()) { + //TODO: define custom Exception classes for HaloDB. + throw new RuntimeException("DB is closing"); + } + logger.debug("Index file {} closed, probably by compaction thread. Skipping to next one", fileId); + } + } + logger.debug("Data file {} deleted, probably by compaction thread. Skipping to next one", fileId); + } + + return false; + } + + private boolean readNextRecord() { + while (inner.hasNext()) { + IndexFileEntry entry = inner.next(); + try { + try { + next = readValidRecordKey(entry); + if (next != null) { + return true; + } + } catch (ClosedChannelException e) { + if (dbInternal.isClosing()) { + throw new RuntimeException("DB is closing"); + } + logger.debug("Data file {} closed, probably by compaction thread. Skipping to next one", currentFile.getFileId()); + break; + } + } catch (IOException e) { + logger.info("Error in iterator", e); + break; + } + } + return false; + } + + private RecordKey readValidRecordKey(IndexFileEntry entry) throws IOException { + InMemoryIndexMetaData meta = Utils.getMetaData(entry, currentFile.getFileId()); + RecordKey key = null; + if (dbInternal.isRecordFresh(entry.getKey(), meta)) { + key = new RecordKey(entry.getKey()); + } + return key; + } +} diff --git a/src/main/java/com/oath/halodb/RecordKey.java b/src/main/java/com/oath/halodb/RecordKey.java new file mode 100644 index 0000000..03c81e3 --- /dev/null +++ b/src/main/java/com/oath/halodb/RecordKey.java @@ -0,0 +1,31 @@ +package com.oath.halodb; + +import java.util.*; + +public class RecordKey { + final byte[] key; + public RecordKey(byte[] key) { + this.key = key; + } + + public byte[] getBytes() { + return key; + } + + @Override + public boolean equals(Object obj) { + // to be used in tests as we don't check if the headers are the same. + + if (this == obj) { + return true; + } + if (!(obj instanceof RecordKey)) { + return false; + } + + RecordKey recordKey = (RecordKey)obj; + return Arrays.equals(this.key, recordKey.getBytes()); + } + + +} diff --git a/src/test/java/com/oath/halodb/HaloDBKeyIteratorTest.java b/src/test/java/com/oath/halodb/HaloDBKeyIteratorTest.java new file mode 100644 index 0000000..9fa1359 --- /dev/null +++ b/src/test/java/com/oath/halodb/HaloDBKeyIteratorTest.java @@ -0,0 +1,97 @@ +package com.oath.halodb; + +import java.io.IOException; +import java.nio.channels.ClosedChannelException; +import java.util.*; +import mockit.Invocation; +import mockit.Mock; +import mockit.MockUp; +import org.hamcrest.MatcherAssert; +import org.hamcrest.Matchers; +import org.testng.Assert; +import org.testng.annotations.Test; + +public class HaloDBKeyIteratorTest extends TestBase { + + @Test(expectedExceptions = NoSuchElementException.class, dataProvider = "Options") + public void testWithEmptyDB(HaloDBOptions options) throws HaloDBException { + String directory = TestUtils.getTestDirectory("HaloDBKeyIteratorTest", "testWithEmptyDB"); + + HaloDB db = getTestDB(directory, options); + HaloDBKeyIterator iterator = db.newKeyIterator(); + Assert.assertFalse(iterator.hasNext()); + iterator.next(); + } + + @Test(dataProvider = "Options") + public void testWithDelete(HaloDBOptions options) throws HaloDBException { + String directory = TestUtils.getTestDirectory("HaloDBKeyIteratorTest", "testWithEmptyDB"); + + options.setCompactionDisabled(true); + + HaloDB db = getTestDB(directory, options); + int noOfRecords = 10_000; + List records = TestUtils.insertRandomRecords(db, noOfRecords); + + // delete all records. + for (Record r : records) { + db.delete(r.getKey()); + } + + HaloDBKeyIterator iterator = db.newKeyIterator(); + Assert.assertFalse(iterator.hasNext()); + + // close and open the db again. + db.close(); + db = getTestDBWithoutDeletingFiles(directory, options); + iterator = db.newKeyIterator(); + Assert.assertFalse(iterator.hasNext()); + } + + @Test(dataProvider = "Options") + public void testPutAndGetDB(HaloDBOptions options) throws HaloDBException { + String directory = TestUtils.getTestDirectory("HaloDBKeyIteratorTest", "testPutAndGetDB"); + + options.setCompactionDisabled(true); + options.setMaxFileSize(10 * 1024); + + HaloDB db = getTestDB(directory, options); + + int noOfRecords = 10_000; + List records = TestUtils.insertRandomRecords(db, noOfRecords); + + List keys = new LinkedList<>(); + for (Record record : records) { + keys.add(new RecordKey(record.getKey())); + } + + List actual = new ArrayList<>(); + db.newKeyIterator().forEachRemaining(actual::add); + + MatcherAssert.assertThat(actual, Matchers.containsInAnyOrder(keys.toArray())); + } + + @Test(dataProvider = "Options") + public void testPutUpdateAndGetDB(HaloDBOptions options) throws HaloDBException { + String directory = TestUtils.getTestDirectory("HaloDBKeyIteratorTest", "testPutUpdateAndGetDB"); + + options.setCompactionDisabled(true); + options.setMaxFileSize(10 * 1024); + + HaloDB db = getTestDB(directory, options); + + int noOfRecords = 10_000; + List records = TestUtils.insertRandomRecords(db, noOfRecords); + + List updated = TestUtils.updateRecords(db, records); + + List keys = new LinkedList<>(); + for (Record record : updated) { + keys.add(new RecordKey(record.getKey())); + } + + List actual = new ArrayList<>(); + db.newKeyIterator().forEachRemaining(actual::add); + MatcherAssert.assertThat(actual, Matchers.containsInAnyOrder(keys.toArray())); + } +}