From cc0fe392ac848cc6aac88454dd8b34fac2f1a494 Mon Sep 17 00:00:00 2001 From: Matthew B <106352182+artntek@users.noreply.github.com> Date: Thu, 11 Jul 2024 11:01:34 -0700 Subject: [PATCH 1/7] Make .Values.rabbitmq.auth.existingPasswordSecret required --- helm/templates/deployment.yaml | 2 +- helm/values.yaml | 3 +++ 2 files changed, 4 insertions(+), 1 deletion(-) diff --git a/helm/templates/deployment.yaml b/helm/templates/deployment.yaml index 8f909208..08bf600a 100644 --- a/helm/templates/deployment.yaml +++ b/helm/templates/deployment.yaml @@ -90,7 +90,7 @@ spec: - name: RABBITMQ_PASSWORD valueFrom: secretKeyRef: - name: {{ .Values.rabbitmq.auth.existingPasswordSecret }} + name: {{ required "rabbitmq.auth.existingPasswordSecret IS REQUIRED" .Values.rabbitmq.auth.existingPasswordSecret }} key: rabbitmq-password optional: false {{- if .Values.solr.auth.enabled }} diff --git a/helm/values.yaml b/helm/values.yaml index 182f1fd0..fe5f099b 100644 --- a/helm/values.yaml +++ b/helm/values.yaml @@ -248,6 +248,9 @@ solr: extraVolumes: - name: solr-config configMap: + ## @param solr.extraVolumes.configMap.name must be edited to include your release name! + ## format: releasename-indexer-configfiles + ## name: d1index-indexer-configfiles defaultMode: 0777 extraVolumeMounts: From 1687402874c5c6c2991cf663f88c5d912a0aa800 Mon Sep 17 00:00:00 2001 From: Matthew B <106352182+artntek@users.noreply.github.com> Date: Tue, 23 Jul 2024 16:46:20 -0700 Subject: [PATCH 2/7] re-open connections after AlreadyClosedException --- pom.xml | 2 +- .../org/dataone/cn/indexer/IndexWorker.java | 168 +++++++++--------- 2 files changed, 81 insertions(+), 89 deletions(-) diff --git a/pom.xml b/pom.xml index ac8e6aa6..157c3c0c 100644 --- a/pom.xml +++ b/pom.xml @@ -3,7 +3,7 @@ 4.0.0 org.dataone dataone-index-worker - 3.0.1 + 3.0.2-SNAPSHOT jar dataone-index-worker http://maven.apache.org diff --git a/src/main/java/org/dataone/cn/indexer/IndexWorker.java b/src/main/java/org/dataone/cn/indexer/IndexWorker.java index 1cbcab65..889fca70 100644 --- a/src/main/java/org/dataone/cn/indexer/IndexWorker.java +++ b/src/main/java/org/dataone/cn/indexer/IndexWorker.java @@ -7,7 +7,6 @@ import java.nio.file.Paths; import java.nio.file.attribute.FileTime; import java.util.HashMap; -import java.util.List; import java.util.Map; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -18,6 +17,8 @@ import javax.xml.parsers.ParserConfigurationException; import javax.xml.xpath.XPathExpressionException; +import com.rabbitmq.client.AlreadyClosedException; +import com.rabbitmq.client.ShutdownSignalException; import org.apache.commons.codec.EncoderException; import org.apache.commons.configuration.ConfigurationException; import org.apache.log4j.Logger; @@ -46,9 +47,6 @@ import com.rabbitmq.client.Consumer; import com.rabbitmq.client.DefaultConsumer; import com.rabbitmq.client.Envelope; -import com.rabbitmq.client.LongString; - - /** @@ -260,7 +258,7 @@ private void initIndexQueue() throws IOException, TimeoutException { // this routine key be used. The routine key INDEX_ROUTING_KEY sends messages to the index worker, boolean durable = true; rabbitMQconnection = factory.newConnection(); - rabbitMQchannel = rabbitMQconnection .createChannel(); + rabbitMQchannel = rabbitMQconnection.createChannel(); rabbitMQchannel.exchangeDeclare(EXCHANGE_NAME, "direct", durable); boolean exclusive = false; @@ -321,7 +319,6 @@ protected void initExecutorService() { ". Final computed thread pool size for index executor: " + nThreads + ". Since its value is 1, we do NOT need the executor service and use a single thread way."); multipleThread = false; } - } @@ -336,61 +333,111 @@ public void start() throws IOException { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { + logger.debug("Received message with delivery tag: " + envelope.getDeliveryTag()); + + doAck(envelope, this); + String identifier = null; try { final IndexQueueMessageParser parser = new IndexQueueMessageParser(); parser.parse(properties, body); - final Envelope finalEnvelop = envelope; if (multipleThread) { logger.debug("IndexWorker.start.handleDelivery - using multiple threads to index identifier " + parser.getIdentifier().getValue()); Runnable runner = new Runnable() { @Override public void run() { - indexOjbect(parser, finalEnvelop.getDeliveryTag(), multipleThread); + indexOjbect(parser, multipleThread); } }; // submit the task, and that's it executor.submit(runner); } else { logger.debug("IndexWorker.start.handleDelivery - using single thread to index identifier " + parser.getIdentifier().getValue()); - indexOjbect(parser, finalEnvelop.getDeliveryTag(), multipleThread); + indexOjbect(parser, multipleThread); } } catch (InvalidRequest e) { - logger.error("IndexWorker.start.handleDelivery - cannot index the task for identifier " + + logger.error("IndexWorker.start.handleDelivery - cannot index the task for identifier " + identifier + " since " + e.getMessage()); boolean requeue = false; rabbitMQchannel.basicReject(envelope.getDeliveryTag(), requeue); } } + + @Override + public void handleShutdownSignal(String consumerTag, ShutdownSignalException sig) { + + logger.debug("handleShutdownSignal called by amqp client code. Consumer tag: " + + consumerTag + "; ShutdownSignalException" + sig.getMessage()); + try { + recreateConnection(this); + logger.debug("handleShutdownSignal successfully recreated connection."); + } catch (IOException e) { + logger.debug( + "handleShutdownSignal unable to recreate connection. Consumer tag: " + + consumerTag + "; ShutdownSignalException" + sig.getMessage()); + } + } }; - boolean autoAck = false; - rabbitMQchannel.basicConsume(INDEX_QUEUE_NAME, autoAck, consumer); + + try { + // Set autoAck = false + rabbitMQchannel.basicConsume(INDEX_QUEUE_NAME, false, consumer); + + } catch (AlreadyClosedException arce) { + logger.debug( + "rabbitMQchannel.basicConsume failed: Channel was already closed: " + + arce.getMessage() + ". Re-creating connection..."); + recreateConnection(consumer); + } logger.info("IndexWorker.start - Calling basicConsume and waiting for the coming messages"); } - + + private void doAck(Envelope envelope, Consumer consumer) throws IOException { + try { + // Send acknowledgment back to RabbitMQ before processing index. + // This is a temporary solution for the RabbitMQ timeout issue. + // Set multiple false + rabbitMQchannel.basicAck(envelope.getDeliveryTag(), false); + logger.debug("Sent acknowledgement to RabbitMQ for delivery tag: " + + envelope.getDeliveryTag()); + + } catch (AlreadyClosedException arce) { + logger.debug("rabbitMQchannel.basicAck failed: Channel was already closed: " + + arce.getMessage() + + ". Message will remain on queue to be consumed again"); + recreateConnection(consumer); + } + } + + private void recreateConnection(Consumer consumer) throws IOException { + + rabbitMQconnection.close(); + try { + if (rabbitMQchannel.isOpen()) { + rabbitMQchannel.close(); + } + initIndexQueue(); + + } catch (TimeoutException e) { + throw new IOException("TimeoutException trying to re-initialize Queue: " + + e.getMessage(), e); + } + // Tell RabbitMQ this worker is ready for tasks + rabbitMQchannel.basicConsume(INDEX_QUEUE_NAME, false, consumer); + logger.debug("rabbitMQ connection successfully re-created"); + } + /** * Process the index task. This method is called by a single or multiple thread(s) determined by the configuration. * @param parser the parser parsed the index queue message and holds the index information - * @param deliveryTag the tag of the rabbitmq message * @param multipleThread the task was handled by multiple thread or not (for the log information only) */ - private void indexOjbect(IndexQueueMessageParser parser, long deliveryTag, boolean multipleThread) { + private void indexOjbect(IndexQueueMessageParser parser, boolean multipleThread) { long start = System.currentTimeMillis(); Identifier pid = parser.getIdentifier(); String indexType = parser.getIndexType(); int priority = parser.getPriority(); String finalFilePath = parser.getObjectPath(); - try { - // Send the acknowledge back to RabbitMQ before processing index. - // This is a temporary solution for the RabbitMQ timeout issue. - // Set multiple false - rabbitMQchannel.basicAck(deliveryTag, false); - } catch (Exception e) { - logger.error("IndexWorker.indexOjbect - identifier: " + pid.getValue() - + " , the index type: " + indexType - + ", sending acknowledgement back to rabbitmq failed since " - + e.getMessage() + ". So rabbitmq may resend the message again"); - } try { long threadId = Thread.currentThread().getId(); logger.info("IndexWorker.consumer.indexOjbect by multiple thread? " + multipleThread @@ -420,70 +467,15 @@ private void indexOjbect(IndexQueueMessageParser parser, long deliveryTag, boole + ", the priotity: " + priority + " and the time taking is " + (end-start) + " milliseconds"); - } catch (InvalidToken e) { - logger.error("IndexWorker.indexOjbect - cannot index the task for identifier " - + pid.getValue() + " since " + e.getMessage()); - } catch (NotAuthorized e) { - logger.error("IndexWorker.indexOjbect - cannot index the task for identifier " - + pid.getValue() + " since " + e.getMessage()); - } catch (NotImplemented e) { - logger.error("IndexWorker.indexOjbect - cannot index the task for identifier " - + pid.getValue() + " since " + e.getMessage()); - } catch (ServiceFailure e) { - logger.error("IndexWorker.indexOjbect - cannot index the task for identifier " - + pid.getValue() + " since " + e.getMessage(), e); - } catch (NotFound e) { - logger.error("IndexWorker.indexOjbect - cannot index the task for identifier " - + pid.getValue() + " since " + e.getMessage()); - } catch (XPathExpressionException e) { - logger.error("IndexWorker.indexOjbect - cannot index the task for identifier " - + pid.getValue() + " since " + e.getMessage(), e); - } catch (UnsupportedType e) { - logger.error("IndexWorker.indexOjbect - cannot index the task for identifier " - + pid.getValue() + " since " + e.getMessage(), e); - } catch (SAXException e) { - logger.error("IndexWorker.indexOjbect - cannot index the task for identifier " - + pid.getValue() + " since " + e.getMessage(), e); - } catch (ParserConfigurationException e) { - logger.error("IndexWorker.indexOjbect - cannot index the task for identifier " - + pid.getValue() + " since " + e.getMessage(), e); - } catch (SolrServerException e) { - logger.error("IndexWorker.indexOjbect - cannot index the task for identifier " - + pid.getValue() + " since " + e.getMessage(), e); - } catch (MarshallingException e) { - logger.error("IndexWorker.indexOjbect - cannot index the task for identifier " - + pid.getValue() + " since " + e.getMessage(), e); - } catch (EncoderException e) { - logger.error("IndexWorker.indexOjbect - cannot index the task for identifier " - + pid.getValue() + " since " + e.getMessage(), e); - } catch (InterruptedException e) { - logger.error("IndexWorker.indexOjbect - cannot index the task for identifier " - + pid.getValue() + " since " + e.getMessage(), e); - } catch (IOException e) { - logger.error("IndexWorker.indexOjbect - cannot index the task for identifier " - + pid.getValue() + " since " + e.getMessage(), e); - } catch (InvalidRequest e) { - logger.error("IndexWorker.indexOjbect - cannot index the task for identifier " - + pid.getValue() + " since " + e.getMessage()); - } catch (InstantiationException e) { - logger.error("IndexWorker.indexOjbect - cannot index the task for identifier " - + pid.getValue() + " since " + e.getMessage(), e); - } catch (IllegalAccessException e) { - logger.error("IndexWorker.indexOjbect - cannot index the task for identifier " - + pid.getValue() + " since " + e.getMessage(), e); + } catch (InvalidToken | NotAuthorized | NotImplemented | NotFound | InvalidRequest | + ServiceFailure | XPathExpressionException | UnsupportedType | SAXException | + ParserConfigurationException | SolrServerException | MarshallingException | + EncoderException | InterruptedException | IOException | InstantiationException | + IllegalAccessException e) { + logger.error("Cannot index the task for identifier " + pid.getValue() + + " since " + e.getMessage(), e); } } - - /** - * Stop the RabbitMQ connection - * @throws TimeoutException - * @throws IOException - */ - public void stop() throws IOException, TimeoutException { - rabbitMQchannel.close(); - rabbitMQconnection.close(); - logger.info("IndexWorker.stop - stop the index queue connection."); - } private static void startLivenessProbe() { ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1); From af46856f08354693b414fc099c72fd3e5cd7aaa1 Mon Sep 17 00:00:00 2001 From: Matthew B <106352182+artntek@users.noreply.github.com> Date: Tue, 23 Jul 2024 17:00:19 -0700 Subject: [PATCH 3/7] fix build warnings --- docker/Dockerfile | 2 +- pom.xml | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/docker/Dockerfile b/docker/Dockerfile index 4795b98d..354a7937 100644 --- a/docker/Dockerfile +++ b/docker/Dockerfile @@ -38,4 +38,4 @@ USER 1000 LABEL org.opencontainers.image.source="https://github.com/dataoneorg/dataone-indexer" # Run the Worker process -CMD ./entrypoint.sh +CMD ["./entrypoint.sh"] diff --git a/pom.xml b/pom.xml index 157c3c0c..c3dc0290 100644 --- a/pom.xml +++ b/pom.xml @@ -197,7 +197,7 @@ org.slf4j - slf4j-log4j12 + slf4j-reload4j 1.7.36 From d7ce8c42ef655da57e119d0c6067e613d668d341 Mon Sep 17 00:00:00 2001 From: Matthew B <106352182+artntek@users.noreply.github.com> Date: Tue, 23 Jul 2024 17:14:36 -0700 Subject: [PATCH 4/7] tagged chart 1.0.2-develop --- helm/Chart.yaml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/helm/Chart.yaml b/helm/Chart.yaml index f54df6eb..f8c16503 100644 --- a/helm/Chart.yaml +++ b/helm/Chart.yaml @@ -21,13 +21,13 @@ type: application # This is the chart version. This version number should be incremented each time you make changes # to the chart and its templates, including the app version. # Versions are expected to follow Semantic Versioning (https://semver.org/) -version: 1.0.1 +version: 1.0.2-develop # This is the version number of the application being deployed. This version number should be # incremented each time you make changes to the application. Versions are not expected to # follow Semantic Versioning. They should reflect the version the application is using. # It is recommended to use it with quotes. -appVersion: "3.0.1" +appVersion: "3.0.2-SNAPSHOT" # Chart dependencies dependencies: From dcbdc30a0860c8deb6bc245614166386efaed0ad Mon Sep 17 00:00:00 2001 From: Matthew B <106352182+artntek@users.noreply.github.com> Date: Wed, 24 Jul 2024 16:55:30 -0700 Subject: [PATCH 5/7] remove AlreadyClosedExceptions and clean up --- .../org/dataone/cn/indexer/IndexWorker.java | 256 +++++++++--------- 1 file changed, 124 insertions(+), 132 deletions(-) diff --git a/src/main/java/org/dataone/cn/indexer/IndexWorker.java b/src/main/java/org/dataone/cn/indexer/IndexWorker.java index 889fca70..754040b6 100644 --- a/src/main/java/org/dataone/cn/indexer/IndexWorker.java +++ b/src/main/java/org/dataone/cn/indexer/IndexWorker.java @@ -17,7 +17,6 @@ import javax.xml.parsers.ParserConfigurationException; import javax.xml.xpath.XPathExpressionException; -import com.rabbitmq.client.AlreadyClosedException; import com.rabbitmq.client.ShutdownSignalException; import org.apache.commons.codec.EncoderException; import org.apache.commons.configuration.ConfigurationException; @@ -53,53 +52,47 @@ * Worker class to process index tasks and submit results to store. */ public class IndexWorker { - + //Those strings are the types of the index tasks. //The create is the index task type for the action when a new object was created. So the solr index will be generated. //delete is the index task type for the action when an object was deleted. So the solr index will be deleted - //sysmeta is the index task type for the action when the system metadata of an existing object was updated. + //sysmeta is the index task type for the action when the system metadata of an existing object was updated. public final static String CREATE_INDEXT_TYPE = "create"; public final static String DELETE_INDEX_TYPE = "delete"; public final static String SYSMETA_CHANGE_TYPE = "sysmeta"; //this handle for resource map only - + public final static int HIGHEST_PRIORITY = 4; // some special cases public final static int HIGH_PRIORITY = 3; //use for the operations such as create, update public final static int MEDIUM_PRIORITY = 2; //use for the operations such as updateSystem, delete, archive - public final static int LOW_PRIORITY = 1; //use for the bulk operations such as reindexing the whole corpus - + public final static int LOW_PRIORITY = 1; //use for the bulk operations such as reindexing the whole corpus + private final static String HEADER_ID = "id"; //The header name in the message to store the identifier - private final static String HEADER_PATH = "path"; //The header name in the message to store the path of the object + private final static String HEADER_PATH = "path"; //The header name in the message to store the path of the object private final static String HEADER_INDEX_TYPE = "index_type"; //The header name in the message to store the index type - + private final static String EXCHANGE_NAME = "dataone-index"; private final static String INDEX_QUEUE_NAME = "index"; private final static String INDEX_ROUTING_KEY = "index"; - + private static final String springConfigFileURL = "/index-parser-context.xml"; private static final String ENV_NAME_OF_PROPERTIES_FILE = "DATAONE_INDEXER_CONFIG"; - + private static Logger logger = Logger.getLogger(IndexWorker.class); private static String defaultExternalPropertiesFile = "/etc/dataone/dataone-indexer.properties"; - + protected static String propertyFilePath = null; protected boolean multipleThread = true; protected int nThreads = 1; - - private String rabbitMQhost = null; - private int rabbitMQport = 0; - private String rabbitMQusername = null; - private String rabbitMQpassword = null; - private int rabbitMQMaxPriority = 10; + private Connection rabbitMQconnection = null; private Channel rabbitMQchannel = null; private ApplicationContext context = null; protected SolrIndex solrIndex = null; - private String specifiedThreadNumberStr = null; - private int specifiedThreadNumber = 0; private ExecutorService executor = null; /** * Commandline main for the IndexWorker to be started. + * * @param args (not used -- command line args) */ public static void main(String[] args) { @@ -115,38 +108,38 @@ public static void main(String[] args) { } startLivenessProbe(); } - + /** - * Load properties from an external file. + * Load properties from an external file. * DataONE-indexer will try to load the property file by this order * 1. try to read it from the user specified * 2. try to read it from an env variable - DATAONE_INDEXER_CONFIG. * 3 try to use the default path - /etc/dataone/dataone-indexer.properties * If all attempts fail, it will give up and use the one embedded in the jar file - * @param propertyFile the property file user specified + * @param propertyFile the property file user specified */ public static void loadExternalPropertiesFile(String propertyFile) { // try the users specified path - if (propertyFile != null && !propertyFile.trim().equals("")) { + if (propertyFile != null && !propertyFile.trim().isEmpty()) { propertyFilePath = propertyFile; logger.info("IndexWorker.loadExternalPropertiesFile - the configuration path specified by users is " + propertyFilePath); - File defaultFile = new File (propertyFilePath); + File defaultFile = new File(propertyFilePath); if (defaultFile.exists() && defaultFile.canRead()) { - logger.info("IndexWorker.loadExternalPropertiesFile - the configuration path users specified is " + + logger.info("IndexWorker.loadExternalPropertiesFile - the configuration path users specified is " + propertyFilePath + ". The file exists and is readable. So it will be used."); } else { - logger.info("IndexWorker.loadExternalPropertiesFile - the configuration path users specified is " + + logger.info("IndexWorker.loadExternalPropertiesFile - the configuration path users specified is " + propertyFilePath + ". But the file does NOT exist or is NOT readable. So it will NOT be used."); propertyFilePath = null; } - } - + } + //try the path from the env variable - if (propertyFilePath == null || propertyFilePath.trim().equals("")) { + if (propertyFilePath == null || propertyFilePath.trim().isEmpty()) { propertyFilePath = System.getenv(ENV_NAME_OF_PROPERTIES_FILE); logger.info("IndexWorker.loadExternalPropertiesFile - the configuration path from the env variable is " + propertyFilePath); if (propertyFilePath != null && !propertyFilePath.trim().equals("")) { - File defaultFile = new File (propertyFilePath); + File defaultFile = new File(propertyFilePath); if (defaultFile.exists() && defaultFile.canRead()) { logger.info("IndexWorker.loadExternalPropertiesFile - the configuration path can be read from the env variable " + ENV_NAME_OF_PROPERTIES_FILE + " and its value is " + propertyFilePath + ". The file exists and it will be used."); @@ -157,23 +150,24 @@ public static void loadExternalPropertiesFile(String propertyFile) { } } } - - //The attempts to read the configuration file specified by users and from the env variable failed. We will try the default external path - if (propertyFilePath == null || propertyFilePath.trim().equals("")) { - File defaultFile = new File (defaultExternalPropertiesFile); + + //The attempts to read the configuration file specified by users and from the env + // variable failed. We will try the default external path + if (propertyFilePath == null || propertyFilePath.trim().isEmpty()) { + File defaultFile = new File(defaultExternalPropertiesFile); if (defaultFile.exists() && defaultFile.canRead()) { logger.info("IndexWorker.loadExternalPropertiesFile - the configure path can't be read either by users specified or from the env variable " + ENV_NAME_OF_PROPERTIES_FILE + ". However, the default external file " + defaultExternalPropertiesFile + " exists and it will be used."); propertyFilePath = defaultExternalPropertiesFile; } } - if (propertyFilePath != null && !propertyFilePath.trim().equals("")) { + if (propertyFilePath != null && !propertyFilePath.trim().isEmpty()) { try { //Settings.getConfiguration(); Settings.augmentConfiguration(propertyFilePath); logger.info("IndexWorker.loadExternalPropertiesFile - loaded the properties from the file " + propertyFilePath); } catch (ConfigurationException e) { - logger.error("IndexWorker.loadExternalPropertiesFile - can't load any properties from the file " + propertyFilePath + + logger.error("IndexWorker.loadExternalPropertiesFile - can't load any properties from the file " + propertyFilePath + " since " + e.getMessage() + ". It will use the default properties in the jar file."); } } else { @@ -181,7 +175,7 @@ public static void loadExternalPropertiesFile(String propertyFile) { ENV_NAME_OF_PROPERTIES_FILE + " or from the default path " + defaultExternalPropertiesFile + ". Dataone-indexer will use the properties file embedded in the jar file"); } } - + /** * Load an additional property file to the worker when it is necessary. * The main reason to have this method is that metacat has two property files @@ -189,34 +183,36 @@ public static void loadExternalPropertiesFile(String propertyFile) { * method to load the site property file after loading the metacat property file * @param propertyFile */ - public static void loadAdditionalPropertyFile (String propertyFile) { - if (propertyFile != null && !propertyFile.trim().equals("")) { + public static void loadAdditionalPropertyFile(String propertyFile) { + if (propertyFile != null && !propertyFile.trim().isEmpty()) { try { //Settings.getConfiguration(); Settings.augmentConfiguration(propertyFile); logger.info("IndexWorker.loadAdditionalPropertyFile - loaded the properties from the file " + propertyFile); } catch (ConfigurationException e) { - logger.error("IndexWorker.loadAdditionalPropertyFile - can't load any properties from the file " + propertyFile + + logger.error("IndexWorker.loadAdditionalPropertyFile - can't load any properties from the file " + propertyFile + " since " + e.getMessage() + "."); } } else { logger.info("IndexWorker.loadAdditionalPropertyFile - can't load an additional property file since its path is null or blank."); } } - + /** * Default constructor to initialize the RabbitMQ service + * * @throws IOException * @throws TimeoutException - * @throws ServiceFailure + * @throws ServiceFailure */ public IndexWorker() throws IOException, TimeoutException, ServiceFailure { this(true); } - + /** * Constructor with/without initialization - * @param initialize if we need to initialize RabittMQ and et al + * + * @param initialize if we need to initialize RabbitMQ et al * @throws IOException * @throws TimeoutException * @throws ServiceFailure @@ -233,15 +229,19 @@ public IndexWorker(Boolean initialize) throws IOException, TimeoutException, Ser /** * Initialize the RabbitMQ service - * @throws IOException - * @throws TimeoutException + * + * @throws IOException + * @throws TimeoutException */ private void initIndexQueue() throws IOException, TimeoutException { - rabbitMQhost = Settings.getConfiguration().getString("index.rabbitmq.hostname", "localhost"); - rabbitMQport = Settings.getConfiguration().getInt("index.rabbitmq.hostport", 5672); - rabbitMQusername = Settings.getConfiguration().getString("index.rabbitmq.username", "guest"); - rabbitMQpassword = Settings.getConfiguration().getString("index.rabbitmq.password", "guest"); - rabbitMQMaxPriority = Settings.getConfiguration().getInt("index.rabbitmq.max.priority"); + String rabbitMQhost = + Settings.getConfiguration().getString("index.rabbitmq.hostname", "localhost"); + int rabbitMQport = Settings.getConfiguration().getInt("index.rabbitmq.hostport", 5672); + String rabbitMQusername = + Settings.getConfiguration().getString("index.rabbitmq.username", "guest"); + String rabbitMQpassword = + Settings.getConfiguration().getString("index.rabbitmq.password", "guest"); + int rabbitMQMaxPriority = Settings.getConfiguration().getInt("index.rabbitmq.max.priority"); ConnectionFactory factory = new ConnectionFactory(); factory.setHost(rabbitMQhost); factory.setPort(rabbitMQport); @@ -263,42 +263,45 @@ private void initIndexQueue() throws IOException, TimeoutException { boolean exclusive = false; boolean autoDelete = false; - Map argus = new HashMap(); - argus.put("x-max-priority", rabbitMQMaxPriority); - logger.debug("IndexWorker.initIndexQueue - Set RabbitMQ max priority to: " + rabbitMQMaxPriority); - rabbitMQchannel.queueDeclare(INDEX_QUEUE_NAME, durable, exclusive, autoDelete, argus); + Map args = new HashMap<>(); + args.put("x-max-priority", rabbitMQMaxPriority); + logger.debug( + "IndexWorker.initIndexQueue - Set RabbitMQ max priority to: " + rabbitMQMaxPriority); + rabbitMQchannel.queueDeclare(INDEX_QUEUE_NAME, durable, exclusive, autoDelete, args); rabbitMQchannel.queueBind(INDEX_QUEUE_NAME, EXCHANGE_NAME, INDEX_ROUTING_KEY); - + logger.info("IndexWorker.initIndexQueue - the allowed unacknowledged message(s) number is " + nThreads); rabbitMQchannel.basicQos(nThreads); logger.debug("IndexWorker.initIndexQueue - Connected to the RabbitMQ queue with the name of " + INDEX_QUEUE_NAME); } - + /** * Initialize the solrIndex object which contains the index parsers. */ protected void initIndexParsers() { if (context == null) { - synchronized(IndexWorker.class) { + synchronized (IndexWorker.class) { if (context == null) { context = new ClassPathXmlApplicationContext(springConfigFileURL); } } } - solrIndex = (SolrIndex)context.getBean("solrIndex"); + solrIndex = (SolrIndex) context.getBean("solrIndex"); } - + /** * Determine the size of the thread pool and initialize the executor service */ protected void initExecutorService() { - specifiedThreadNumberStr = Settings.getConfiguration().getString("index.thread.number", "0"); + String specifiedThreadNumberStr + = Settings.getConfiguration().getString("index.thread.number", "0"); + int specifiedThreadNumber; try { specifiedThreadNumber = Integer.parseInt(specifiedThreadNumberStr); } catch (NumberFormatException e) { specifiedThreadNumber = 0; logger.warn("IndexWorker.initExecutorService - IndexWorker cannot parse the string " + specifiedThreadNumberStr + - " specified by property index.thread.number into a number since " + e.getLocalizedMessage() + + " specified by property index.thread.number into a number since " + e.getLocalizedMessage() + ". The default value 0 will be used as the specified value"); } int availableProcessors = Runtime.getRuntime().availableProcessors(); @@ -309,55 +312,63 @@ protected void initExecutorService() { } if (nThreads != 1) { logger.info("IndexWorker.initExecutorService - the size of index thread pool specified in the propery file is " + specifiedThreadNumber + - ". The size computed from the available processors is " + availableProcessors + + ". The size computed from the available processors is " + availableProcessors + ". Final computed thread pool size for index executor: " + nThreads); executor = Executors.newFixedThreadPool(nThreads); multipleThread = true; } else { logger.info("IndexWorker.initExecutorService - the size of index thread pool specified in the propery file is " + specifiedThreadNumber + - ". The size computed from the available processors is " + availableProcessors + + ". The size computed from the available processors is " + availableProcessors + ". Final computed thread pool size for index executor: " + nThreads + ". Since its value is 1, we do NOT need the executor service and use a single thread way."); multipleThread = false; } } - - + /** - * Worker starts to consume messages from the index queue - calling SolrIndex to + * Worker starts to consume messages from the index queue - calling SolrIndex to * process index tasks and submit results to store. * @throws IOException */ public void start() throws IOException { final Consumer consumer = new DefaultConsumer(rabbitMQchannel) { @Override - public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) - throws IOException { + public void handleDelivery( + String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) + throws IOException { + logger.debug("Received message with delivery tag: " + envelope.getDeliveryTag()); - doAck(envelope, this); + // Send acknowledgment back to RabbitMQ before processing index. + // This is a temporary solution for the RabbitMQ timeout issue. + // Set multiple false + rabbitMQchannel.basicAck(envelope.getDeliveryTag(), false); - String identifier = null; + final IndexQueueMessageParser parser = new IndexQueueMessageParser(); try { - final IndexQueueMessageParser parser = new IndexQueueMessageParser(); parser.parse(properties, body); if (multipleThread) { - logger.debug("IndexWorker.start.handleDelivery - using multiple threads to index identifier " + parser.getIdentifier().getValue()); + logger.debug( + "using multiple threads to index identifier " + parser.getIdentifier() + .getValue()); Runnable runner = new Runnable() { @Override public void run() { - indexOjbect(parser, multipleThread); + indexObject(parser, multipleThread); } }; // submit the task, and that's it executor.submit(runner); } else { - logger.debug("IndexWorker.start.handleDelivery - using single thread to index identifier " + parser.getIdentifier().getValue()); - indexOjbect(parser, multipleThread); + logger.debug( + "using single thread to index identifier " + parser.getIdentifier() + .getValue()); + indexObject(parser, multipleThread); } } catch (InvalidRequest e) { - logger.error("IndexWorker.start.handleDelivery - cannot index the task for identifier " + - identifier + " since " + e.getMessage()); + logger.error( + "cannot index the task for identifier " + parser.getIdentifier().getValue() + + " since " + e.getMessage()); boolean requeue = false; rabbitMQchannel.basicReject(envelope.getDeliveryTag(), requeue); } @@ -366,47 +377,26 @@ public void run() { @Override public void handleShutdownSignal(String consumerTag, ShutdownSignalException sig) { - logger.debug("handleShutdownSignal called by amqp client code. Consumer tag: " - + consumerTag + "; ShutdownSignalException" + sig.getMessage()); + logger.debug( + "handleShutdownSignal called by amqp client code. Consumer tag: " + consumerTag + + "; ShutdownSignalException: " + sig.getMessage()); try { recreateConnection(this); - logger.debug("handleShutdownSignal successfully recreated connection."); + logger.debug( + "handleShutdownSignal successfully recreated connection. Consumer tag: " + + consumerTag); } catch (IOException e) { logger.debug( "handleShutdownSignal unable to recreate connection. Consumer tag: " + consumerTag + "; ShutdownSignalException" + sig.getMessage()); } } - }; - - try { - // Set autoAck = false - rabbitMQchannel.basicConsume(INDEX_QUEUE_NAME, false, consumer); - - } catch (AlreadyClosedException arce) { - logger.debug( - "rabbitMQchannel.basicConsume failed: Channel was already closed: " - + arce.getMessage() + ". Re-creating connection..."); - recreateConnection(consumer); - } - logger.info("IndexWorker.start - Calling basicConsume and waiting for the coming messages"); - } + }; - private void doAck(Envelope envelope, Consumer consumer) throws IOException { - try { - // Send acknowledgment back to RabbitMQ before processing index. - // This is a temporary solution for the RabbitMQ timeout issue. - // Set multiple false - rabbitMQchannel.basicAck(envelope.getDeliveryTag(), false); - logger.debug("Sent acknowledgement to RabbitMQ for delivery tag: " - + envelope.getDeliveryTag()); - - } catch (AlreadyClosedException arce) { - logger.debug("rabbitMQchannel.basicAck failed: Channel was already closed: " - + arce.getMessage() - + ". Message will remain on queue to be consumed again"); - recreateConnection(consumer); - } + // Set autoAck = false + rabbitMQchannel.basicConsume(INDEX_QUEUE_NAME, false, consumer); + + logger.info("IndexWorker.start - Calling basicConsume and waiting for the coming messages"); } private void recreateConnection(Consumer consumer) throws IOException { @@ -432,7 +422,7 @@ private void recreateConnection(Consumer consumer) throws IOException { * @param parser the parser parsed the index queue message and holds the index information * @param multipleThread the task was handled by multiple thread or not (for the log information only) */ - private void indexOjbect(IndexQueueMessageParser parser, boolean multipleThread) { + private void indexObject(IndexQueueMessageParser parser, boolean multipleThread) { long start = System.currentTimeMillis(); Identifier pid = parser.getIdentifier(); String indexType = parser.getIndexType(); @@ -440,33 +430,35 @@ private void indexOjbect(IndexQueueMessageParser parser, boolean multipleThread) String finalFilePath = parser.getObjectPath(); try { long threadId = Thread.currentThread().getId(); - logger.info("IndexWorker.consumer.indexOjbect by multiple thread? " + multipleThread - + ", with the thread id " + threadId + logger.info("IndexWorker.consumer.indexObject by multiple thread? " + multipleThread + + ", with the thread id " + threadId + " - Received the index task from the index queue with the identifier: " - + pid.getValue() + " , the index type: " + indexType - + ", the file path (null means not to have): " + finalFilePath - + ", the priotity: " + priority); - if (indexType.equals(CREATE_INDEXT_TYPE)) { - boolean sysmetaOnly = false; - solrIndex.update(pid, finalFilePath, sysmetaOnly); - } else if (indexType.equals(SYSMETA_CHANGE_TYPE)) { - boolean sysmetaOnly = true; - solrIndex.update(pid, finalFilePath, sysmetaOnly); - } else if (indexType.equals(DELETE_INDEX_TYPE)) { - solrIndex.remove(pid); - } else { - throw new InvalidRequest("0000", "DataONE indexer does not know the index type: " - + indexType + " in the index task"); + + pid.getValue() + " , the index type: " + indexType + + ", the file path (null means not to have): " + finalFilePath + + ", the priority: " + priority); + switch (indexType) { + case CREATE_INDEXT_TYPE -> { + boolean sysmetaOnly = false; + solrIndex.update(pid, finalFilePath, sysmetaOnly); + } + case SYSMETA_CHANGE_TYPE -> { + boolean sysmetaOnly = true; + solrIndex.update(pid, finalFilePath, sysmetaOnly); + } + case DELETE_INDEX_TYPE -> solrIndex.remove(pid); + default -> throw new InvalidRequest( + "0000", "DataONE indexer does not know the index type: " + indexType + + " in the index task"); } long end = System.currentTimeMillis(); logger.info("IndexWorker.indexOjbect with the thread id " + threadId + " - Completed the index task from the index queue with the identifier: " - + pid.getValue() + " , the index type: " + indexType - + ", the file path (null means not to have): " + finalFilePath - + ", the priotity: " + priority + " and the time taking is " - + (end-start) + " milliseconds"); - + + pid.getValue() + " , the index type: " + indexType + + ", the file path (null means not to have): " + finalFilePath + + ", the priority: " + priority + " and the time taking is " + + (end - start) + " milliseconds"); + } catch (InvalidToken | NotAuthorized | NotImplemented | NotFound | InvalidRequest | ServiceFailure | XPathExpressionException | UnsupportedType | SAXException | ParserConfigurationException | SolrServerException | MarshallingException | From f85c25c30d60f371bc64b6bf449933a857074a34 Mon Sep 17 00:00:00 2001 From: Matthew B <106352182+artntek@users.noreply.github.com> Date: Thu, 25 Jul 2024 11:35:20 -0700 Subject: [PATCH 6/7] update versions and release notes --- RELEASE-NOTES.md | 20 +++++++++++++++----- helm/Chart.yaml | 4 ++-- pom.xml | 2 +- 3 files changed, 18 insertions(+), 8 deletions(-) diff --git a/RELEASE-NOTES.md b/RELEASE-NOTES.md index 309d3a5c..eaab3b73 100644 --- a/RELEASE-NOTES.md +++ b/RELEASE-NOTES.md @@ -1,19 +1,29 @@ # dataone-indexer Release Notes +## dataone-indexer version 3.0.2 & helm chart version 1.0.2 + +* Release date: 2024-07-29 +* dataone-indexer version 3.0.2 + * Bug fix - RabbitMQ Channel timeouts (PR #119) +* helm chart version 1.0.2 + * Bump Application version to 3.0.2 + * Make .Values.rabbitmq.auth.existingPasswordSecret a required value + ## dataone-indexer version 3.0.1 & helm chart version 1.0.1 * Release date: 2024-07-08 -* helm chart version 1.0.1 - * Change `.Values.idxworker.cn_url` to `.Values.global.d1ClientCnUrl` - * Get `fullname` from metacat chart or provide in values.yaml - * Add simple 'exec' liveness probe. Remove readiness probe + * dataone-indexer version 3.0.1 * Bump rmq amqp client to 5.21.0 * Add healthcheck code * Exit app if unrecoverable exception occurs when started from `main()` method +* helm chart version 1.0.1 + * Change `.Values.idxworker.cn_url` to `.Values.global.d1ClientCnUrl` + * Get `fullname` from metacat chart or provide in values.yaml + * Add simple 'exec' liveness probe. Remove readiness probe ## dataone-indexer version 3.0.0 & helm chart version 1.0.0 * Release date: 2024-04-25 -* helm chart version 1.0.0 -- first release of helm chart * dataone-indexer version 3.0.0 -- first release of dataone-indexer +* helm chart version 1.0.0 -- first release of helm chart diff --git a/helm/Chart.yaml b/helm/Chart.yaml index f8c16503..2f92eb41 100644 --- a/helm/Chart.yaml +++ b/helm/Chart.yaml @@ -21,13 +21,13 @@ type: application # This is the chart version. This version number should be incremented each time you make changes # to the chart and its templates, including the app version. # Versions are expected to follow Semantic Versioning (https://semver.org/) -version: 1.0.2-develop +version: 1.0.2 # This is the version number of the application being deployed. This version number should be # incremented each time you make changes to the application. Versions are not expected to # follow Semantic Versioning. They should reflect the version the application is using. # It is recommended to use it with quotes. -appVersion: "3.0.2-SNAPSHOT" +appVersion: "3.0.2" # Chart dependencies dependencies: diff --git a/pom.xml b/pom.xml index c3dc0290..b78f79c4 100644 --- a/pom.xml +++ b/pom.xml @@ -3,7 +3,7 @@ 4.0.0 org.dataone dataone-index-worker - 3.0.2-SNAPSHOT + 3.0.2 jar dataone-index-worker http://maven.apache.org From 10afe2035e2c41947cf17d4e07a34914f45d81ff Mon Sep 17 00:00:00 2001 From: Matthew B <106352182+artntek@users.noreply.github.com> Date: Thu, 25 Jul 2024 11:44:22 -0700 Subject: [PATCH 7/7] remove blank line --- RELEASE-NOTES.md | 1 - 1 file changed, 1 deletion(-) diff --git a/RELEASE-NOTES.md b/RELEASE-NOTES.md index eaab3b73..d6fdf6e4 100644 --- a/RELEASE-NOTES.md +++ b/RELEASE-NOTES.md @@ -12,7 +12,6 @@ ## dataone-indexer version 3.0.1 & helm chart version 1.0.1 * Release date: 2024-07-08 - * dataone-indexer version 3.0.1 * Bump rmq amqp client to 5.21.0 * Add healthcheck code