Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature 105 hashstore #117

Merged
merged 23 commits into from
Aug 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
023e7a2
Added the hashstore dependency.
taojing2002 Jun 21, 2024
ab07a66
Added the storage interface and implementation.
taojing2002 Jun 22, 2024
3248f61
Removed the commented out code. Shortened the long statements.
taojing2002 Jun 24, 2024
ab0a721
Add a method to get the input stream of system metadata for an identi…
taojing2002 Jun 25, 2024
847b986
Change the call to get the system metadata.
taojing2002 Jun 25, 2024
e1e354f
Used the new method to get the system metadata.
taojing2002 Jun 25, 2024
186e739
Changed the code to get the system metadata.
taojing2002 Jun 25, 2024
ad2dd13
Use the hashstore method to get system metadata and object.
taojing2002 Jun 25, 2024
b193762
Removed a unused method.
taojing2002 Jun 26, 2024
c2b27cb
Deleted two unused methods.
taojing2002 Jun 26, 2024
f1441b6
Added a save method for the test classes.
taojing2002 Jun 26, 2024
e643ae4
Added the hastore properties.
taojing2002 Jun 26, 2024
03ed1ba
Added the code to store objects into hash store.
taojing2002 Jun 26, 2024
32bec87
Added the method to store system metadata. This method is only for th…
taojing2002 Jun 26, 2024
d4b7089
Added the code to put the objects and system metadata into hashstore.
taojing2002 Jun 26, 2024
f810a6d
Modified the ObjectManagerTest class to use the new methods.
taojing2002 Jun 27, 2024
8d0f359
Removed the object path in the messages of rabitmq.
taojing2002 Jun 28, 2024
1a0355a
Modified the code based on the reviewer's suggestion.
taojing2002 Aug 12, 2024
4d68259
Deleted the StorageFactory class and the Storage interface.
taojing2002 Aug 12, 2024
48c63a4
Modified the code based on the change on the storage class.
taojing2002 Aug 12, 2024
ea064aa
Merge branch 'develop' into feature-105-hashstore
taojing2002 Aug 12, 2024
3af3f88
Merge branch 'feature-105-hashstore' of https://github.com/dataoneorg…
taojing2002 Aug 12, 2024
9bc801c
Made changes based on the viewer's suggestion.
taojing2002 Aug 14, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions helm/config/dataone-indexer.properties
Original file line number Diff line number Diff line change
Expand Up @@ -40,3 +40,12 @@ index.resourcemap.waitingComponent.time={{ default 800 .Values.idxworker.resourc
index.resourcemap.waitingComponent.max.attempts={{ default 25 .Values.idxworker.resourcemapMaxTries }}
index.solr.versionConflict.waiting.time={{ default 1000 .Values.idxworker.solrVerConflictWaitMs }}
index.solr.versionConflict.max.attempts={{ default 50 .Values.idxworker.solrVerConflictMaxTries }}

# Storage properties
storage.className={{ default "org.dataone.hashstore.filehashstore.FileHashStore" .Values.idxworker.storage.hashStoreClassName }}
storage.hashstore.rootDirectory={{ default "./target/hashstore" .Values.idxworker.storage.hashStoreRootDir }}
storage.hashstore.defaultNamespace={{ default "https://ns.dataone.org/service/types/v2.0#SystemMetadata" .Values.idxworker.storage.hashStoreDefaultNamespace }}
# The following three properties must NOT be modified after the hash store is initialized
storage.hashstore.fileNameAlgorithm={{ default "SHA-256" .Values.idxworker.storage.hashStoreAlgorithm }}
storage.hashstore.directory.width={{ default 2 .Values.idxworker.storage.hashStoreDirWidth }}
storage.hashstore.directory.depth={{ default 3 .Values.idxworker.storage.hashStoreDirDepth }}
5 changes: 5 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -288,6 +288,11 @@
<artifactId>jaxb-runtime</artifactId>
<version>2.3.2</version>
</dependency>
<dependency>
<groupId>org.dataone</groupId>
<artifactId>hashstore</artifactId>
<version>1.0-SNAPSHOT</version>
</dependency>
</dependencies>
<build>
<plugins>
Expand Down
7 changes: 2 additions & 5 deletions src/main/java/org/dataone/cn/indexer/IndexWorker.java
Original file line number Diff line number Diff line change
Expand Up @@ -427,23 +427,21 @@ private void indexObject(IndexQueueMessageParser parser, boolean multipleThread)
Identifier pid = parser.getIdentifier();
String indexType = parser.getIndexType();
int priority = parser.getPriority();
String finalFilePath = parser.getObjectPath();
try {
long threadId = Thread.currentThread().getId();
logger.info("IndexWorker.consumer.indexObject by multiple thread? " + multipleThread
+ ", with the thread id " + threadId
+ " - Received the index task from the index queue with the identifier: "
+ pid.getValue() + " , the index type: " + indexType
+ ", the file path (null means not to have): " + finalFilePath
+ ", the priority: " + priority);
switch (indexType) {
case CREATE_INDEXT_TYPE -> {
boolean sysmetaOnly = false;
solrIndex.update(pid, finalFilePath, sysmetaOnly);
solrIndex.update(pid, sysmetaOnly);
}
case SYSMETA_CHANGE_TYPE -> {
boolean sysmetaOnly = true;
solrIndex.update(pid, finalFilePath, sysmetaOnly);
solrIndex.update(pid, sysmetaOnly);
}
case DELETE_INDEX_TYPE -> solrIndex.remove(pid);
default -> throw new InvalidRequest(
Expand All @@ -455,7 +453,6 @@ private void indexObject(IndexQueueMessageParser parser, boolean multipleThread)
logger.info("IndexWorker.indexOjbect with the thread id " + threadId
+ " - Completed the index task from the index queue with the identifier: "
+ pid.getValue() + " , the index type: " + indexType
+ ", the file path (null means not to have): " + finalFilePath
+ ", the priority: " + priority + " and the time taking is "
+ (end - start) + " milliseconds");

Expand Down
401 changes: 158 additions & 243 deletions src/main/java/org/dataone/cn/indexer/SolrIndex.java

Large diffs are not rendered by default.

309 changes: 109 additions & 200 deletions src/main/java/org/dataone/cn/indexer/object/ObjectManager.java
taojing2002 marked this conversation as resolved.
Show resolved Hide resolved

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -80,42 +80,6 @@ public static Identifier getPid(Identifier identifier)
return pid;
}

/**
* Check if the given identifier is a PID or a SID
*
* @param identifier
* @return true if the identifier is a SID, false if a PID
* @throws NotFound
* @throws ServiceFailure
* @throws NotImplemented
* @throws NotAuthorized
* @throws InvalidToken
* @throws MarshallingException
* @throws IOException
* @throws IllegalAccessException
* @throws InstantiationException
*/
public static boolean isSeriesId(Identifier identifier)
throws InvalidToken, NotAuthorized, NotImplemented, ServiceFailure, NotFound,
InstantiationException, IllegalAccessException, IOException, MarshallingException {

// if we have system metadata available via HZ map, then it's a PID
String relativeObjPath = null;//we don't know the path
SystemMetadata systemMetadata =
ObjectManager.getInstance().getSystemMetadata(identifier.getValue(), relativeObjPath);
if (systemMetadata != null) {
return false;
}

//TODO: check that it's not just bogus value by looking up the pid?
// Identifier pid = getPid(identifier);
// if (pid.equals(identifier)) {
// return false;
// }

// okay, it's a SID
return true;

}

}
Original file line number Diff line number Diff line change
@@ -1,25 +1,3 @@
/**
* This work was created by participants in the DataONE project, and is
* jointly copyrighted by participating institutions in DataONE. For
* more information on DataONE, see our web site at http://dataone.org.
*
* Copyright ${year}
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* $Id$
*/

package org.dataone.cn.indexer.resourcemap;

import java.io.ByteArrayOutputStream;
Expand All @@ -29,6 +7,7 @@
import java.io.StringReader;
import java.io.UnsupportedEncodingException;
import java.net.URISyntaxException;
import java.security.NoSuchAlgorithmException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
Expand Down Expand Up @@ -218,60 +197,65 @@ private void _init(InputStream is) throws OREException, URISyntaxException,
public static boolean representsResourceMap(String formatId) {
return RESOURCE_MAP_FORMAT.equals(formatId);
}

private boolean isHeadVersion(Identifier pid, Identifier sid) {
boolean isHead = true;
if(pid != null && sid != null) {
/*Identifier newId = new Identifier();
newId.setValue("peggym.130.5");
if(pid.getValue().equals("peggym.130.4") && HazelcastClientFactory.getSystemMetadataMap().get(newId) != null) {
isHead =false;
} else if (pid.getValue().equals("peggym.130.4") && HazelcastClientFactory.getSystemMetadataMap().get(newId) == null) {
isHead = true;
}*/
Identifier head = null;
try {
head = SeriesIdResolver.getPid(sid);//if the passed sid actually is a pid, the method will return the pid.
//if the passed sid actually is a pid, the method will return the pid.
head = SeriesIdResolver.getPid(sid);
} catch (Exception e) {
System.out.println(""+e.getStackTrace());
isHead = true;
}
if(head != null ) {
//System.out.println("||||||||||||||||||| the head version is "+ head.getValue()+" for sid "+sid.getValue());
logger.info("||||||||||||||||||| the head version is "+ head.getValue()+" for sid "+sid.getValue());

logger.info("||||||||||||||||||| the head version is " + head.getValue()
+ " for sid " + sid.getValue());
if(head.equals(pid)) {
logger.info("||||||||||||||||||| the pid "+ pid.getValue()+" is the head version for sid "+sid.getValue());
logger.info("||||||||||||||||||| the pid " + pid.getValue()
+ " is the head version for sid " + sid.getValue());
isHead=true;
} else {
logger.info("||||||||||||||||||| the pid "+ pid.getValue()+" is NOT the head version for sid "+sid.getValue());
logger.info("||||||||||||||||||| the pid " + pid.getValue()
+ " is NOT the head version for sid " + sid.getValue());
isHead=false;
}
} else {
//System.out.println("||||||||||||||||||| can't find the head version for sid "+sid.getValue());
logger.info("||||||||||||||||||| can't find the head version for sid "+sid.getValue() + " and we think the given pid "+pid.getValue()+" is the head version.");
logger.info("||||||||||||||||||| can't find the head version for sid "
+ sid.getValue() + " and we think the given pid " + pid.getValue()
+ " is the head version.");
}
}
return isHead;
}

private SolrDoc _mergeMappedReference(ResourceEntry resourceEntry, SolrDoc mergeDocument) throws InvalidToken, NotAuthorized, NotImplemented,
ServiceFailure, NotFound, InstantiationException, IllegalAccessException, IOException, MarshallingException {

Identifier identifier = new Identifier();
identifier.setValue(mergeDocument.getIdentifier());
//SystemMetadata sysMeta = HazelcastClientFactory.getSystemMetadataMap().get(identifier);
String relativeObjPath = null; //we don't know the path
SystemMetadata sysMeta = ObjectManager.getInstance().getSystemMetadata(identifier.getValue(), relativeObjPath);
if (sysMeta.getSeriesId() != null && sysMeta.getSeriesId().getValue() != null && !sysMeta.getSeriesId().getValue().trim().equals("")) {
// skip this one
if(!isHeadVersion(identifier, sysMeta.getSeriesId())) {
//System.out.println("The id "+identifier+" is not the head of the serial id "+sysMeta.getSeriesId().getValue()+" So, skip merge this one!!!!!!!!!!!!!!!!!!!!!!"+mergeDocument.getIdentifier());
logger.info("The id "+identifier+" is not the head of the serial id "+sysMeta.getSeriesId().getValue()+" So, skip merge this one!!!!!!!!!!!!!!!!!!!!!!"+mergeDocument.getIdentifier());
return mergeDocument;
}

}

private SolrDoc _mergeMappedReference(ResourceEntry resourceEntry, SolrDoc mergeDocument)
throws InvalidToken, NotAuthorized, NotImplemented,
NoSuchAlgorithmException, ServiceFailure, NotFound, InstantiationException,
IllegalAccessException, IOException, MarshallingException {

Identifier identifier = new Identifier();
identifier.setValue(mergeDocument.getIdentifier());
try {
SystemMetadata sysMeta = (SystemMetadata) ObjectManager.getInstance()
.getSystemMetadata(identifier.getValue());
if (sysMeta.getSeriesId() != null && sysMeta.getSeriesId().getValue() != null
&& !sysMeta.getSeriesId().getValue().trim().equals("")) {
// skip this one
if(!isHeadVersion(identifier, sysMeta.getSeriesId())) {
logger.info("The id " + identifier + " is not the head of the serial id "
+ sysMeta.getSeriesId().getValue()
+ " So, skip merge this one!!!!!!!!!!!!!!!!!!!!!!"
+ mergeDocument.getIdentifier());
return mergeDocument;
}
}
} catch (ClassCastException e) {
logger.warn("The systemmetadata is a v1 object and we need to do nothing");
}


if (mergeDocument.hasField(SolrElementField.FIELD_ID) == false) {
mergeDocument.addField(new SolrElementField(SolrElementField.FIELD_ID, resourceEntry
.getIdentifier()));
Expand Down Expand Up @@ -362,19 +346,22 @@ public List<SolrDoc> mergeIndexedDocuments(List<SolrDoc> docs) {
List<SolrDoc> mergedDocuments = new ArrayList<SolrDoc>();
for (ResourceEntry resourceEntry : this.resourceMap.values()) {
for (SolrDoc doc : docs) {
//System.out.println(">>>>>>>>in mergeIndexedDocuments of ForesiteResourceMap, the doc id is "+doc.getIdentifier() +" in the thread "+Thread.currentThread().getId());
//System.out.println(">>>>>>>>in mergeIndexedDocuments of ForesiteResourceMap, the doc series id is "+doc.getSeriesId()+" in the thread "+Thread.currentThread().getId());
//System.out.println(">>>>>>>>in mergeIndexedDocuments of ForesiteResourceMap, the resource entry id is "+resourceEntry.getIdentifier()+" in the thread "+Thread.currentThread().getId());
logger.debug(">>>>>>>>in mergeIndexedDocuments of ForesiteResourceMap, the doc id is "+doc.getIdentifier() +" in the thread "+Thread.currentThread().getId());
logger.debug(">>>>>>>>in mergeIndexedDocuments of ForesiteResourceMap, the doc series id is "+doc.getSeriesId()+" in the thread "+Thread.currentThread().getId());
logger.debug(">>>>>>>>in mergeIndexedDocuments of ForesiteResourceMap, the resource entry id is "+resourceEntry.getIdentifier()+" in the thread "+Thread.currentThread().getId());

logger.debug("in mergeIndexedDocuments of ForesiteResourceMap, the doc id is "
+ doc.getIdentifier() + " in the thread "+Thread.currentThread().getId());
logger.debug("in mergeIndexedDocuments of ForesiteResourceMap, the doc series id is "
+ doc.getSeriesId() + " in the thread "+Thread.currentThread().getId());
logger.debug("in mergeIndexedDocuments of ForesiteResourceMap, the resource entry id is "
+ resourceEntry.getIdentifier() + " in the thread "
+ Thread.currentThread().getId());

if (doc.getIdentifier().equals(resourceEntry.getIdentifier())
|| resourceEntry.getIdentifier().equals(doc.getSeriesId())) {
try {
mergedDocuments.add(_mergeMappedReference(resourceEntry, doc));
} catch (Exception e) {
logger.error("ForestieResourceMap.mergeIndexedDocuments - cannot merge the document since " + e.getMessage());
logger.error("ForestieResourceMap.mergeIndexedDocuments - cannot merge the document since "
+ e.getMessage());
}

}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package org.dataone.cn.indexer.resourcemap;

import java.io.IOException;
import java.security.NoSuchAlgorithmException;

import org.apache.log4j.Logger;
import org.dataone.cn.indexer.object.ObjectManager;
Expand All @@ -12,7 +13,7 @@
import org.dataone.service.exceptions.NotImplemented;
import org.dataone.service.exceptions.ServiceFailure;
import org.dataone.service.types.v1.Identifier;
import org.dataone.service.types.v2.SystemMetadata;
import org.dataone.service.types.v1.SystemMetadata;

public class IndexVisibilityDelegateImpl implements IndexVisibilityDelegate {

Expand All @@ -25,10 +26,8 @@ public IndexVisibilityDelegateImpl() {
public boolean isDocumentVisible(Identifier pid) {
boolean visible = false;
try {

//SystemMetadata systemMetadata = HazelcastClientFactory.getSystemMetadataMap().get(pid);
String relativeObjPath = null; //we don't know the path
SystemMetadata systemMetadata = ObjectManager.getInstance().getSystemMetadata(pid.getValue(), relativeObjPath);
SystemMetadata systemMetadata = ObjectManager.getInstance()
.getSystemMetadata(pid.getValue());
// TODO: Is pid Identifier a SID?
if (systemMetadata == null) {
return true;
Expand Down Expand Up @@ -56,16 +55,16 @@ public boolean isDocumentVisible(Identifier pid) {
logger.warn("Could not get visible value for pid: " + pid.getValue() + " since " +e.getMessage());
} catch (MarshallingException e) {
logger.warn("Could not get visible value for pid: " + pid.getValue() + " since " +e.getMessage());
} catch (NoSuchAlgorithmException e) {
logger.warn("Could not get visible value for pid: " + pid.getValue() + " since " +e.getMessage());
}
return visible;
}

public boolean documentExists(Identifier pid) {
boolean exists = false;
try {
//SystemMetadata systemMetadata = HazelcastClientFactory.getSystemMetadataMap().get(pid);
String relativeObjPath = null; //we don't know the path
SystemMetadata systemMetadata = ObjectManager.getInstance().getSystemMetadata(pid.getValue(), relativeObjPath);
SystemMetadata systemMetadata = ObjectManager.getInstance().getSystemMetadata(pid.getValue());
if (systemMetadata != null) {
exists = true;
} else {
Expand All @@ -92,6 +91,8 @@ public boolean documentExists(Identifier pid) {
logger.warn("Could not get visible value for pid: " + pid.getValue() + " since " +e.getMessage());
} catch (MarshallingException e) {
logger.warn("Could not get visible value for pid: " + pid.getValue() + " since " +e.getMessage());
} catch (NoSuchAlgorithmException e) {
logger.warn("Could not get visible value for pid: " + pid.getValue() + " since " +e.getMessage());
}
return exists;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
import java.util.List;

import org.apache.commons.io.IOUtils;
import org.dataone.service.types.v2.SystemMetadata;
import org.dataone.service.types.v1.SystemMetadata;
artntek marked this conversation as resolved.
Show resolved Hide resolved
import org.w3c.dom.Element;
import org.w3c.dom.Node;
import org.w3c.dom.NodeList;
Expand Down
Loading
Loading