Skip to content

Commit

Permalink
added one test back
Browse files Browse the repository at this point in the history
Signed-off-by: Ronnak Saxena <[email protected]>
  • Loading branch information
ronnaksaxena committed Sep 8, 2023
1 parent e496c57 commit 8aa3f81
Showing 1 changed file with 178 additions and 178 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,188 +5,188 @@

package org.opensearch.indexmanagement.rollup.interceptor

// import org.apache.http.entity.ContentType
// import org.apache.http.entity.StringEntity
// import org.opensearch.indexmanagement.common.model.dimension.DateHistogram
// import org.opensearch.indexmanagement.common.model.dimension.Terms
// import org.opensearch.indexmanagement.makeRequest
import org.apache.http.entity.ContentType
import org.apache.http.entity.StringEntity
import org.opensearch.indexmanagement.common.model.dimension.DateHistogram
import org.opensearch.indexmanagement.common.model.dimension.Terms
import org.opensearch.indexmanagement.makeRequest
import org.opensearch.indexmanagement.rollup.RollupRestTestCase
// import org.opensearch.indexmanagement.rollup.model.Rollup
// import org.opensearch.indexmanagement.rollup.model.RollupMetadata
// import org.opensearch.indexmanagement.rollup.model.RollupMetrics
// import org.opensearch.indexmanagement.rollup.model.metric.Average
// import org.opensearch.indexmanagement.rollup.model.metric.Max
// import org.opensearch.indexmanagement.rollup.model.metric.Min
// import org.opensearch.indexmanagement.rollup.model.metric.Sum
// import org.opensearch.indexmanagement.rollup.model.metric.ValueCount
// import org.opensearch.indexmanagement.waitFor
// import org.opensearch.jobscheduler.spi.schedule.IntervalSchedule
// import org.opensearch.rest.RestStatus
// import java.time.Instant
// import java.time.temporal.ChronoUnit
import org.opensearch.indexmanagement.rollup.model.Rollup
import org.opensearch.indexmanagement.rollup.model.RollupMetadata
import org.opensearch.indexmanagement.rollup.model.RollupMetrics
import org.opensearch.indexmanagement.rollup.model.metric.Average
import org.opensearch.indexmanagement.rollup.model.metric.Max
import org.opensearch.indexmanagement.rollup.model.metric.Min
import org.opensearch.indexmanagement.rollup.model.metric.Sum
import org.opensearch.indexmanagement.rollup.model.metric.ValueCount
import org.opensearch.indexmanagement.waitFor
import org.opensearch.jobscheduler.spi.schedule.IntervalSchedule
import org.opensearch.rest.RestStatus
import java.time.Instant
import java.time.temporal.ChronoUnit

@Suppress("UNCHECKED_CAST")
class ResponseInterceptorIT : RollupRestTestCase() {
// fun `test search a live index and rollup index with no overlap`() {
// generateNYCTaxiData("source_rollup_search")
// val rollup = Rollup(
// id = "base_case1_rollup_search",
// enabled = true,
// schemaVersion = 1L,
// jobSchedule = IntervalSchedule(Instant.now(), 1, ChronoUnit.MINUTES),
// jobLastUpdatedTime = Instant.now(),
// jobEnabledTime = Instant.now(),
// description = "basic search test",
// sourceIndex = "source_rollup_search",
// targetIndex = "target_rollup_search",
// metadataID = null,
// roles = emptyList(),
// pageSize = 10,
// delay = 0,
// continuous = false,
// dimensions = listOf(
// DateHistogram(sourceField = "tpep_pickup_datetime", fixedInterval = "1h"),
// Terms("passenger_count", "passenger_count")
// ),
// metrics = listOf(
// RollupMetrics(
// sourceField = "passenger_count", targetField = "passenger_count",
// metrics = listOf(
// Sum(), Min(), Max(),
// ValueCount(), Average()
// )
// )
// )
// ).let { createRollup(it, it.id) }
//
// updateRollupStartTime(rollup)
//
// waitFor {
// val rollupJob = getRollup(rollupId = rollup.id)
// assertNotNull("Rollup job doesn't have metadata set", rollupJob.metadataID)
// val rollupMetadata = getRollupMetadata(rollupJob.metadataID!!)
// assertEquals("Rollup is not finished", RollupMetadata.Status.FINISHED, rollupMetadata.status)
// }
// // Get expected aggregation values by searching live data before deletion
// var aggReq = """
// {
// "size": 0,
// "query": {
// "match_all": {}
// },
// "aggs": {
// "sum_passenger_count": {
// "sum": {
// "field": "passenger_count"
// }
// },
// "max_passenger_count": {
// "max": {
// "field": "passenger_count"
// }
// },
// "min_passenger_count": {
// "min": {
// "field": "passenger_count"
// }
// },
// "avg_passenger_count": {
// "avg": {
// "field": "passenger_count"
// }
// },
// "count_passenger_count": {
// "value_count": {
// "field": "passenger_count"
// }
// }
// }
// }
// """.trimIndent()
// var searchResponse = client().makeRequest("POST", "/source_rollup_search/_search", emptyMap(), StringEntity(aggReq, ContentType.APPLICATION_JSON))
// assertTrue("Could not search inital data for expected values", searchResponse.restStatus() == RestStatus.OK)
// var expectedAggs = searchResponse.asMap()["aggregations"] as Map<String, Map<String, Any>>
// val expectedSum = expectedAggs.getValue("sum_passenger_count")["value"]
// val expectedMax = expectedAggs.getValue("max_passenger_count")["value"]
// val expectedMin = expectedAggs.getValue("min_passenger_count")["value"]
// val expectedCount = expectedAggs.getValue("count_passenger_count")["value"]
// val expectedAvg = expectedAggs.getValue("avg_passenger_count")["value"]
// refreshAllIndices()
// // Split data at 1546304400000 or Jan 01 2019 01:00:00
// // Delete half the values from live data simulating an ism job deleting old data
// var r = """
// {
// "query": {
// "range": {
// "tpep_pickup_datetime": {
// "lt": 1546304400000,
// "format": "epoch_millis",
// "time_zone": "+00:00"
// }
// }
// }
// }
// """.trimIndent()
// var deleteLiveResponse = client().makeRequest(
// "POST",
// "source_rollup_search/_delete_by_query",
// mapOf("refresh" to "true"),
// StringEntity(r, ContentType.APPLICATION_JSON)
// )
//
// assertTrue("Could not delete live data", deleteLiveResponse.restStatus() == RestStatus.OK)
//
// // Delete half the values from rollup data
// r = """
// {
// "query": {
// "range": {
// "tpep_pickup_datetime": {
// "gte": 1546304400000,
// "format": "epoch_millis",
// "time_zone": "+00:00"
// }
// }
// }
// }
// """.trimIndent()
// var deleteRollupResponse = client().makeRequest(
// "POST",
// "target_rollup_search/_delete_by_query",
// mapOf("refresh" to "true"),
// StringEntity(r, ContentType.APPLICATION_JSON)
// )
//
// assertTrue("Could not delete rollup data", deleteRollupResponse.restStatus() == RestStatus.OK)
// var searchBothResponse = client().makeRequest("POST", "/target_rollup_search,source_rollup_search/_search", emptyMap(), StringEntity(aggReq, ContentType.APPLICATION_JSON))
// assertTrue(searchBothResponse.restStatus() == RestStatus.OK)
// var responseAggs = searchBothResponse.asMap()["aggregations"] as Map<String, Map<String, Any>>
// assertEquals(
// "sum agg is wrong",
// expectedSum,
// responseAggs.getValue("sum_passenger_count")["value"]
// )
// assertEquals(
// "max agg is wrong",
// expectedMax,
// responseAggs.getValue("max_passenger_count")["value"]
// )
// assertEquals(
// "min agg is wrong",
// expectedMin,
// responseAggs.getValue("min_passenger_count")["value"]
// )
// assertEquals(
// "value_count is wrong",
// expectedCount,
// responseAggs.getValue("count_passenger_count")["value"]
// )
// assertEquals(
// "avg is wrong",
// expectedAvg,
// responseAggs.getValue("avg_passenger_count")["value"]
// )
// }
fun `test search a live index and rollup index with no overlap`() {
generateNYCTaxiData("source_rollup_search")
val rollup = Rollup(
id = "base_case1_rollup_search",
enabled = true,
schemaVersion = 1L,
jobSchedule = IntervalSchedule(Instant.now(), 1, ChronoUnit.MINUTES),
jobLastUpdatedTime = Instant.now(),
jobEnabledTime = Instant.now(),
description = "basic search test",
sourceIndex = "source_rollup_search",
targetIndex = "target_rollup_search",
metadataID = null,
roles = emptyList(),
pageSize = 10,
delay = 0,
continuous = false,
dimensions = listOf(
DateHistogram(sourceField = "tpep_pickup_datetime", fixedInterval = "1h"),
Terms("passenger_count", "passenger_count")
),
metrics = listOf(
RollupMetrics(
sourceField = "passenger_count", targetField = "passenger_count",
metrics = listOf(
Sum(), Min(), Max(),
ValueCount(), Average()
)
)
)
).let { createRollup(it, it.id) }

updateRollupStartTime(rollup)

waitFor {
val rollupJob = getRollup(rollupId = rollup.id)
assertNotNull("Rollup job doesn't have metadata set", rollupJob.metadataID)
val rollupMetadata = getRollupMetadata(rollupJob.metadataID!!)
assertEquals("Rollup is not finished", RollupMetadata.Status.FINISHED, rollupMetadata.status)
}
// Get expected aggregation values by searching live data before deletion
var aggReq = """
{
"size": 0,
"query": {
"match_all": {}
},
"aggs": {
"sum_passenger_count": {
"sum": {
"field": "passenger_count"
}
},
"max_passenger_count": {
"max": {
"field": "passenger_count"
}
},
"min_passenger_count": {
"min": {
"field": "passenger_count"
}
},
"avg_passenger_count": {
"avg": {
"field": "passenger_count"
}
},
"count_passenger_count": {
"value_count": {
"field": "passenger_count"
}
}
}
}
""".trimIndent()
var searchResponse = client().makeRequest("POST", "/source_rollup_search/_search", emptyMap(), StringEntity(aggReq, ContentType.APPLICATION_JSON))
assertTrue("Could not search inital data for expected values", searchResponse.restStatus() == RestStatus.OK)
var expectedAggs = searchResponse.asMap()["aggregations"] as Map<String, Map<String, Any>>
val expectedSum = expectedAggs.getValue("sum_passenger_count")["value"]
val expectedMax = expectedAggs.getValue("max_passenger_count")["value"]
val expectedMin = expectedAggs.getValue("min_passenger_count")["value"]
val expectedCount = expectedAggs.getValue("count_passenger_count")["value"]
val expectedAvg = expectedAggs.getValue("avg_passenger_count")["value"]
refreshAllIndices()
// Split data at 1546304400000 or Jan 01 2019 01:00:00
// Delete half the values from live data simulating an ism job deleting old data
var r = """
{
"query": {
"range": {
"tpep_pickup_datetime": {
"lt": 1546304400000,
"format": "epoch_millis",
"time_zone": "+00:00"
}
}
}
}
""".trimIndent()
var deleteLiveResponse = client().makeRequest(
"POST",
"source_rollup_search/_delete_by_query",
mapOf("refresh" to "true"),
StringEntity(r, ContentType.APPLICATION_JSON)
)

assertTrue("Could not delete live data", deleteLiveResponse.restStatus() == RestStatus.OK)

// Delete half the values from rollup data
r = """
{
"query": {
"range": {
"tpep_pickup_datetime": {
"gte": 1546304400000,
"format": "epoch_millis",
"time_zone": "+00:00"
}
}
}
}
""".trimIndent()
var deleteRollupResponse = client().makeRequest(
"POST",
"target_rollup_search/_delete_by_query",
mapOf("refresh" to "true"),
StringEntity(r, ContentType.APPLICATION_JSON)
)

assertTrue("Could not delete rollup data", deleteRollupResponse.restStatus() == RestStatus.OK)
var searchBothResponse = client().makeRequest("POST", "/target_rollup_search,source_rollup_search/_search", emptyMap(), StringEntity(aggReq, ContentType.APPLICATION_JSON))
assertTrue(searchBothResponse.restStatus() == RestStatus.OK)
var responseAggs = searchBothResponse.asMap()["aggregations"] as Map<String, Map<String, Any>>
assertEquals(
"sum agg is wrong",
expectedSum,
responseAggs.getValue("sum_passenger_count")["value"]
)
assertEquals(
"max agg is wrong",
expectedMax,
responseAggs.getValue("max_passenger_count")["value"]
)
assertEquals(
"min agg is wrong",
expectedMin,
responseAggs.getValue("min_passenger_count")["value"]
)
assertEquals(
"value_count is wrong",
expectedCount,
responseAggs.getValue("count_passenger_count")["value"]
)
assertEquals(
"avg is wrong",
expectedAvg,
responseAggs.getValue("avg_passenger_count")["value"]
)
}
// // Edge Case
// fun `test search a live index with no data and rollup index with data`() {
// generateNYCTaxiData("source_rollup_search_no_data_case")
Expand Down

0 comments on commit 8aa3f81

Please sign in to comment.