-
Notifications
You must be signed in to change notification settings - Fork 99
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Merge pull request #50 from bellofreedom/key_iterator
Add key iterator for HaloDB
- Loading branch information
Showing
4 changed files
with
248 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,116 @@ | ||
package com.oath.halodb; | ||
|
||
import java.io.IOException; | ||
import java.nio.channels.ClosedChannelException; | ||
import java.util.*; | ||
import org.slf4j.Logger; | ||
import org.slf4j.LoggerFactory; | ||
|
||
public class HaloDBKeyIterator implements Iterator<RecordKey>{ | ||
private static final Logger logger = LoggerFactory.getLogger(HaloDBIterator.class); | ||
|
||
private Iterator<Integer> outer; | ||
private Iterator<IndexFileEntry> inner; | ||
private HaloDBFile currentFile; | ||
|
||
private RecordKey next; | ||
|
||
private final HaloDBInternal dbInternal; | ||
|
||
HaloDBKeyIterator(HaloDBInternal dbInternal) { | ||
this.dbInternal = dbInternal; | ||
outer = dbInternal.listDataFileIds().iterator(); | ||
} | ||
|
||
@Override | ||
public boolean hasNext() { | ||
if (next != null) { | ||
return true; | ||
} | ||
|
||
try { | ||
// inner == null means this is the first time hasNext() is called. | ||
// use moveToNextFile() to move to the first file. | ||
if (inner == null && !moveToNextFile()) { | ||
return false; | ||
} | ||
|
||
do { | ||
if (readNextRecord()) { | ||
return true; | ||
} | ||
} while (moveToNextFile()); | ||
|
||
return false; | ||
|
||
} catch (IOException e) { | ||
logger.error("Error in Iterator", e); | ||
return false; | ||
} | ||
} | ||
|
||
@Override | ||
public RecordKey next() { | ||
if (hasNext()) { | ||
RecordKey key = next; | ||
next = null; | ||
return key; | ||
} | ||
throw new NoSuchElementException(); | ||
} | ||
|
||
private boolean moveToNextFile() throws IOException { | ||
while (outer.hasNext()) { | ||
int fileId = outer.next(); | ||
currentFile = dbInternal.getHaloDBFile(fileId); | ||
if (currentFile != null) { | ||
try { | ||
inner = currentFile.getIndexFile().newIterator(); | ||
return true; | ||
} catch (ClosedChannelException e) { | ||
if (dbInternal.isClosing()) { | ||
//TODO: define custom Exception classes for HaloDB. | ||
throw new RuntimeException("DB is closing"); | ||
} | ||
logger.debug("Index file {} closed, probably by compaction thread. Skipping to next one", fileId); | ||
} | ||
} | ||
logger.debug("Data file {} deleted, probably by compaction thread. Skipping to next one", fileId); | ||
} | ||
|
||
return false; | ||
} | ||
|
||
private boolean readNextRecord() { | ||
while (inner.hasNext()) { | ||
IndexFileEntry entry = inner.next(); | ||
try { | ||
try { | ||
next = readValidRecordKey(entry); | ||
if (next != null) { | ||
return true; | ||
} | ||
} catch (ClosedChannelException e) { | ||
if (dbInternal.isClosing()) { | ||
throw new RuntimeException("DB is closing"); | ||
} | ||
logger.debug("Data file {} closed, probably by compaction thread. Skipping to next one", currentFile.getFileId()); | ||
break; | ||
} | ||
} catch (IOException e) { | ||
logger.info("Error in iterator", e); | ||
break; | ||
} | ||
} | ||
return false; | ||
} | ||
|
||
private RecordKey readValidRecordKey(IndexFileEntry entry) throws IOException { | ||
InMemoryIndexMetaData meta = Utils.getMetaData(entry, currentFile.getFileId()); | ||
RecordKey key = null; | ||
if (dbInternal.isRecordFresh(entry.getKey(), meta)) { | ||
key = new RecordKey(entry.getKey()); | ||
} | ||
return key; | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,31 @@ | ||
package com.oath.halodb; | ||
|
||
import java.util.*; | ||
|
||
public class RecordKey { | ||
final byte[] key; | ||
public RecordKey(byte[] key) { | ||
this.key = key; | ||
} | ||
|
||
public byte[] getBytes() { | ||
return key; | ||
} | ||
|
||
@Override | ||
public boolean equals(Object obj) { | ||
// to be used in tests as we don't check if the headers are the same. | ||
|
||
if (this == obj) { | ||
return true; | ||
} | ||
if (!(obj instanceof RecordKey)) { | ||
return false; | ||
} | ||
|
||
RecordKey recordKey = (RecordKey)obj; | ||
return Arrays.equals(this.key, recordKey.getBytes()); | ||
} | ||
|
||
|
||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,97 @@ | ||
package com.oath.halodb; | ||
|
||
import java.io.IOException; | ||
import java.nio.channels.ClosedChannelException; | ||
import java.util.*; | ||
import mockit.Invocation; | ||
import mockit.Mock; | ||
import mockit.MockUp; | ||
import org.hamcrest.MatcherAssert; | ||
import org.hamcrest.Matchers; | ||
import org.testng.Assert; | ||
import org.testng.annotations.Test; | ||
|
||
public class HaloDBKeyIteratorTest extends TestBase { | ||
|
||
@Test(expectedExceptions = NoSuchElementException.class, dataProvider = "Options") | ||
public void testWithEmptyDB(HaloDBOptions options) throws HaloDBException { | ||
String directory = TestUtils.getTestDirectory("HaloDBKeyIteratorTest", "testWithEmptyDB"); | ||
|
||
HaloDB db = getTestDB(directory, options); | ||
HaloDBKeyIterator iterator = db.newKeyIterator(); | ||
Assert.assertFalse(iterator.hasNext()); | ||
iterator.next(); | ||
} | ||
|
||
@Test(dataProvider = "Options") | ||
public void testWithDelete(HaloDBOptions options) throws HaloDBException { | ||
String directory = TestUtils.getTestDirectory("HaloDBKeyIteratorTest", "testWithEmptyDB"); | ||
|
||
options.setCompactionDisabled(true); | ||
|
||
HaloDB db = getTestDB(directory, options); | ||
int noOfRecords = 10_000; | ||
List<Record> records = TestUtils.insertRandomRecords(db, noOfRecords); | ||
|
||
// delete all records. | ||
for (Record r : records) { | ||
db.delete(r.getKey()); | ||
} | ||
|
||
HaloDBKeyIterator iterator = db.newKeyIterator(); | ||
Assert.assertFalse(iterator.hasNext()); | ||
|
||
// close and open the db again. | ||
db.close(); | ||
db = getTestDBWithoutDeletingFiles(directory, options); | ||
iterator = db.newKeyIterator(); | ||
Assert.assertFalse(iterator.hasNext()); | ||
} | ||
|
||
@Test(dataProvider = "Options") | ||
public void testPutAndGetDB(HaloDBOptions options) throws HaloDBException { | ||
String directory = TestUtils.getTestDirectory("HaloDBKeyIteratorTest", "testPutAndGetDB"); | ||
|
||
options.setCompactionDisabled(true); | ||
options.setMaxFileSize(10 * 1024); | ||
|
||
HaloDB db = getTestDB(directory, options); | ||
|
||
int noOfRecords = 10_000; | ||
List<Record> records = TestUtils.insertRandomRecords(db, noOfRecords); | ||
|
||
List<RecordKey> keys = new LinkedList<>(); | ||
for (Record record : records) { | ||
keys.add(new RecordKey(record.getKey())); | ||
} | ||
|
||
List<RecordKey> actual = new ArrayList<>(); | ||
db.newKeyIterator().forEachRemaining(actual::add); | ||
|
||
MatcherAssert.assertThat(actual, Matchers.containsInAnyOrder(keys.toArray())); | ||
} | ||
|
||
@Test(dataProvider = "Options") | ||
public void testPutUpdateAndGetDB(HaloDBOptions options) throws HaloDBException { | ||
String directory = TestUtils.getTestDirectory("HaloDBKeyIteratorTest", "testPutUpdateAndGetDB"); | ||
|
||
options.setCompactionDisabled(true); | ||
options.setMaxFileSize(10 * 1024); | ||
|
||
HaloDB db = getTestDB(directory, options); | ||
|
||
int noOfRecords = 10_000; | ||
List<Record> records = TestUtils.insertRandomRecords(db, noOfRecords); | ||
|
||
List<Record> updated = TestUtils.updateRecords(db, records); | ||
|
||
List<RecordKey> keys = new LinkedList<>(); | ||
for (Record record : updated) { | ||
keys.add(new RecordKey(record.getKey())); | ||
} | ||
|
||
List<RecordKey> actual = new ArrayList<>(); | ||
db.newKeyIterator().forEachRemaining(actual::add); | ||
MatcherAssert.assertThat(actual, Matchers.containsInAnyOrder(keys.toArray())); | ||
} | ||
} |