Skip to content

Commit

Permalink
Addressed comments; Moved updateTransformStartTime to IndexManagement…
Browse files Browse the repository at this point in the history
…RestTestCase

Signed-off-by: Tanqiu Liu <[email protected]>
  • Loading branch information
tanqiuliu committed Oct 10, 2023
1 parent 2f8c4d6 commit 399afba
Show file tree
Hide file tree
Showing 5 changed files with 42 additions and 58 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
package org.opensearch.indexmanagement.indexstatemanagement.step.transform

import org.apache.logging.log4j.LogManager
import org.opensearch.ExceptionsHelper
import org.opensearch.indexmanagement.opensearchapi.suspendUntil
import org.opensearch.indexmanagement.spi.indexstatemanagement.Step
import org.opensearch.indexmanagement.spi.indexstatemanagement.model.ManagedIndexMetaData
Expand Down Expand Up @@ -69,7 +70,7 @@ class WaitForTransformCompletionStep : Step(name) {
logger.info("Received the status for jobs [${response.getIdsToExplain().keys}]")
return response
} catch (e: RemoteTransportException) {
processFailure(transformJobId, indexName, e)
processFailure(transformJobId, indexName, ExceptionsHelper.unwrapCause(e) as Exception)
} catch (e: Exception) {
processFailure(transformJobId, indexName, e)

Check warning on line 75 in src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/step/transform/WaitForTransformCompletionStep.kt

View check run for this annotation

Codecov / codecov/patch

src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/step/transform/WaitForTransformCompletionStep.kt#L72-L75

Added lines #L72 - L75 were not covered by tests
}
Expand Down Expand Up @@ -108,17 +109,8 @@ class WaitForTransformCompletionStep : Step(name) {
}

override fun getUpdatedManagedIndexMetadata(currentMetadata: ManagedIndexMetaData): ManagedIndexMetaData {
val currentActionMetadata = currentMetadata.actionMetaData
val currentActionProperties = currentActionMetadata?.actionProperties
val currentTransformActionProperties = currentActionProperties?.transformActionProperties
return currentMetadata.copy(
actionMetaData = currentActionMetadata?.copy(
actionProperties = currentActionProperties?.copy(
transformActionProperties = currentTransformActionProperties?.copy(
transformId = currentTransformActionProperties.transformId
)
)
),
actionMetaData = currentMetadata.actionMetaData,
stepMetaData = StepMetaData(name, getStepStartTime(currentMetadata).toEpochMilli(), stepStatus),
transitionTo = null,
info = info
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import org.opensearch.indexmanagement.indexstatemanagement.util.INDEX_HIDDEN
import org.opensearch.core.rest.RestStatus
import org.opensearch.core.xcontent.MediaType
import org.opensearch.indexmanagement.rollup.model.Rollup
import org.opensearch.indexmanagement.transform.model.Transform
import org.opensearch.jobscheduler.spi.schedule.IntervalSchedule
import java.io.IOException
import java.nio.file.Files
Expand Down Expand Up @@ -229,6 +230,35 @@ abstract class IndexManagementRestTestCase : ODFERestTestCase() {
assertEquals("Request failed", RestStatus.OK, response.restStatus())
}

protected fun updateTransformStartTime(update: Transform, desiredStartTimeMillis: Long? = null) {
// Before updating start time of a job always make sure there are no unassigned shards that could cause the config
// index to move to a new node and negate this forced start
if (isMultiNode) {
waitFor {
try {
client().makeRequest("GET", "_cluster/allocation/explain")
fail("Expected 400 Bad Request when there are no unassigned shards to explain")
} catch (e: ResponseException) {
assertEquals(RestStatus.BAD_REQUEST, e.response.restStatus())
}
}
}
val intervalSchedule = (update.jobSchedule as IntervalSchedule)
val millis = Duration.of(intervalSchedule.interval.toLong(), intervalSchedule.unit).minusSeconds(2).toMillis()
val startTimeMillis = desiredStartTimeMillis ?: (Instant.now().toEpochMilli() - millis)
val waitForActiveShards = if (isMultiNode) "all" else "1"
val response = client().makeRequest(
"POST", "${IndexManagementPlugin.INDEX_MANAGEMENT_INDEX}/_update/${update.id}?wait_for_active_shards=$waitForActiveShards",
StringEntity(
"{\"doc\":{\"transform\":{\"schedule\":{\"interval\":{\"start_time\":" +
"\"$startTimeMillis\"}}}}}",
ContentType.APPLICATION_JSON
)
)

assertEquals("Request failed", RestStatus.OK, response.restStatus())
}

override fun preserveIndicesUponCompletion(): Boolean = true
companion object {
val isMultiNode = System.getProperty("cluster.number_of_nodes", "1").toInt() > 1
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,9 +109,6 @@ abstract class SecurityRestTestCase : IndexManagementRestTestCase() {

private object TransformRestTestCaseExt : TransformRestTestCase() {

fun updateTransformStartTimeExt(update: Transform, desiredStartTimeMillis: Long? = null) =
super.updateTransformStartTime(update, desiredStartTimeMillis)

fun createTransformExt(
transform: Transform,
transformId: String = randomAlphaOfLength(10),
Expand Down Expand Up @@ -310,9 +307,6 @@ abstract class SecurityRestTestCase : IndexManagementRestTestCase() {
executeRequest(request, expectedStatus, userClient)
}

protected fun updateTransformStartTime(update: Transform, desiredStartTimeMillis: Long? = null) =
TransformRestTestCaseExt.updateTransformStartTimeExt(update, desiredStartTimeMillis)

protected fun createTransform(
transform: Transform,
transformId: String = randomAlphaOfLength(10),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,18 +133,18 @@ class TransformActionIT : IndexStateManagementRestTestCase() {
.addAggregator(avgAggregation())
.addAggregator(valueCountAggregation())
)
val transform = ismTransform.toTransform(indexName)
val policy = preparePolicyContainingTransform(indexName, ismTransform, policyId, retry = 1)
createPolicy(policy, policyId)
createIndex(indexName, policyId, mapping = SOURCE_INDEX_MAPPING)

assertIndexTransformFailedInAttemptCreateTransformStep(indexName, policyId, ismTransform)

// verify the wait for transform completion step will be retried and failed again.
Thread.sleep(60000)
updateManagedIndexConfigStartTime(getExistingManagedIndexConfig(indexName))
waitFor {
assertEquals(
AttemptCreateTransformJobStep.getFailedMessage(ismTransform.toTransform(indexName).id, indexName),
AttemptCreateTransformJobStep.getFailedMessage(transform.id, indexName),
getExplainManagedIndexMetaData(indexName).info?.get("message")
)
}
Expand Down Expand Up @@ -255,7 +255,8 @@ class TransformActionIT : IndexStateManagementRestTestCase() {
}

private fun assertIndexTransformSucceeded(indexName: String, policyId: String, ismTransform: ISMTransform) {
val transformId = ismTransform.toTransform(indexName).id
val transform = ismTransform.toTransform(indexName)
val transformId = transform.id
val managedIndexConfig = getExistingManagedIndexConfig(indexName)

// Change the start time so that the policy will be initialized.
Expand All @@ -271,7 +272,7 @@ class TransformActionIT : IndexStateManagementRestTestCase() {
)
}

Thread.sleep(60000)
updateTransformStartTime(transform)

// Change the start time so that the transform action will be attempted.
updateManagedIndexConfigStartTime(managedIndexConfig)
Expand All @@ -291,7 +292,8 @@ class TransformActionIT : IndexStateManagementRestTestCase() {
}

private fun assertIndexTransformSucceededTwice(indexName: String, policyId: String, ismTransform: ISMTransform) {
val transformId = ismTransform.toTransform(indexName).id
val transform = ismTransform.toTransform(indexName)
val transformId = transform.id
val managedIndexConfig = getExistingManagedIndexConfig(indexName)

// Change the start time so that the policy will be initialized.
Expand All @@ -306,7 +308,7 @@ class TransformActionIT : IndexStateManagementRestTestCase() {
getExplainManagedIndexMetaData(indexName).info?.get("message")
)
}
Thread.sleep(60000)
updateTransformStartTime(transform)

// Change the start time so that the transform action will be attempted.
updateManagedIndexConfigStartTime(managedIndexConfig)
Expand Down Expand Up @@ -341,7 +343,7 @@ class TransformActionIT : IndexStateManagementRestTestCase() {
getExplainManagedIndexMetaData(indexName).info?.get("message")
)
}
Thread.sleep(60000)
updateTransformStartTime(transform)

// Change the start time so that the second transform action will be attempted.
updateManagedIndexConfigStartTime(managedIndexConfig)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ import org.apache.hc.core5.http.io.entity.StringEntity
import org.apache.hc.core5.http.message.BasicHeader
import org.junit.AfterClass
import org.opensearch.client.Response
import org.opensearch.client.ResponseException
import org.opensearch.client.RestClient
import org.opensearch.common.settings.Settings
import org.opensearch.core.xcontent.NamedXContentRegistry
Expand All @@ -30,12 +29,8 @@ import org.opensearch.indexmanagement.transform.model.TransformMetadata
import org.opensearch.indexmanagement.util._ID
import org.opensearch.indexmanagement.util._PRIMARY_TERM
import org.opensearch.indexmanagement.util._SEQ_NO
import org.opensearch.indexmanagement.waitFor
import org.opensearch.jobscheduler.spi.schedule.IntervalSchedule
import org.opensearch.core.rest.RestStatus
import org.opensearch.search.SearchModule
import java.time.Duration
import java.time.Instant

abstract class TransformRestTestCase : IndexManagementRestTestCase() {

Expand Down Expand Up @@ -221,35 +216,6 @@ abstract class TransformRestTestCase : IndexManagementRestTestCase() {
return continuousStats["documents_behind"] as Map<String, Long>
}

protected fun updateTransformStartTime(update: Transform, desiredStartTimeMillis: Long? = null) {
// Before updating start time of a job always make sure there are no unassigned shards that could cause the config
// index to move to a new node and negate this forced start
if (isMultiNode) {
waitFor {
try {
client().makeRequest("GET", "_cluster/allocation/explain")
fail("Expected 400 Bad Request when there are no unassigned shards to explain")
} catch (e: ResponseException) {
assertEquals(RestStatus.BAD_REQUEST, e.response.restStatus())
}
}
}
val intervalSchedule = (update.jobSchedule as IntervalSchedule)
val millis = Duration.of(intervalSchedule.interval.toLong(), intervalSchedule.unit).minusSeconds(2).toMillis()
val startTimeMillis = desiredStartTimeMillis ?: Instant.now().toEpochMilli() - millis
val waitForActiveShards = if (isMultiNode) "all" else "1"
val response = client().makeRequest(
"POST", "$INDEX_MANAGEMENT_INDEX/_update/${update.id}?wait_for_active_shards=$waitForActiveShards",
StringEntity(
"{\"doc\":{\"transform\":{\"schedule\":{\"interval\":{\"start_time\":" +
"\"$startTimeMillis\"}}}}}",
ContentType.APPLICATION_JSON
)
)

assertEquals("Request failed", RestStatus.OK, response.restStatus())
}

protected fun Transform.toHttpEntity(): HttpEntity = StringEntity(toJsonString(), ContentType.APPLICATION_JSON)

override fun xContentRegistry(): NamedXContentRegistry {
Expand Down

0 comments on commit 399afba

Please sign in to comment.