Skip to content

Commit

Permalink
Refactor
Browse files Browse the repository at this point in the history
  • Loading branch information
AlexRuiz7 committed Sep 25, 2024
1 parent c0c9e06 commit 5a73ef1
Show file tree
Hide file tree
Showing 6 changed files with 169 additions and 123 deletions.
3 changes: 3 additions & 0 deletions .idea/kotlinc.xml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion .idea/misc.xml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

13 changes: 0 additions & 13 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -300,19 +300,6 @@ integTest.getClusters().forEach{c -> c.plugin(project.getObjects().fileProperty(
testClusters.integTest {
testDistribution = "INTEG_TEST"
// need to install job-scheduler first, need to assemble job-scheduler first
// plugin(provider(new Callable<RegularFile>(){
// @Override
// RegularFile call() throws Exception {
// return new RegularFile() {
// @Override
// File getAsFile() {
// return configurations.zipArchive.asFileTree.getSingleFile()
// }
// }
// }
// }))


plugin(provider({
new RegularFile() {
@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ package org.opensearch.reportsscheduler

import org.opensearch.action.ActionRequest
import org.opensearch.client.Client
import org.opensearch.client.node.NodeClient
import org.opensearch.cluster.metadata.IndexNameExpressionResolver
import org.opensearch.cluster.node.DiscoveryNodes
import org.opensearch.cluster.service.ClusterService
Expand Down Expand Up @@ -104,8 +103,8 @@ class ReportsSchedulerPlugin : Plugin(), ActionPlugin, SystemIndexPlugin, JobSch
indexNameExpressionResolver: IndexNameExpressionResolver,
repositoriesServiceSupplier: Supplier<RepositoriesService>
): Collection<Any> {
ReportDefinitionJobRunner.nodeClient = client as NodeClient
PluginSettings.addSettingsUpdateConsumer(clusterService)
ReportDefinitionJobRunner.initialize(client, clusterService)
ReportDefinitionsIndex.initialize(client, clusterService)
ReportInstancesIndex.initialize(client, clusterService)
return emptyList()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,22 +8,12 @@ package org.opensearch.reportsscheduler.scheduler
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.launch
import org.opensearch.OpenSearchSecurityException
import org.opensearch.OpenSearchStatusException
import org.opensearch.action.search.SearchRequest
import org.opensearch.action.search.SearchResponse
import org.opensearch.client.Client
import org.opensearch.client.node.NodeClient
import org.opensearch.commons.notifications.NotificationsPluginInterface
import org.opensearch.commons.notifications.action.GetNotificationConfigRequest
import org.opensearch.commons.notifications.action.GetNotificationConfigResponse
import org.opensearch.commons.notifications.action.SendNotificationResponse
import org.opensearch.commons.notifications.model.ChannelMessage
import org.opensearch.commons.notifications.model.EventSource
import org.opensearch.cluster.service.ClusterService
import org.opensearch.commons.notifications.model.NotificationConfigInfo
import org.opensearch.commons.notifications.model.SeverityType
import org.opensearch.core.action.ActionListener
import org.opensearch.core.rest.RestStatus
import org.opensearch.index.query.QueryBuilders
import org.opensearch.jobscheduler.spi.JobExecutionContext
import org.opensearch.jobscheduler.spi.ScheduledJobParameter
Expand All @@ -32,54 +22,33 @@ import org.opensearch.reportsscheduler.ReportsSchedulerPlugin.Companion.LOG_PREF
import org.opensearch.reportsscheduler.index.ReportInstancesIndex
import org.opensearch.reportsscheduler.model.ReportDefinitionDetails
import org.opensearch.reportsscheduler.model.ReportInstance
import org.opensearch.reportsscheduler.util.NotificationApiUtils.getNotificationConfigInfo
import org.opensearch.reportsscheduler.util.SecureIndexClient
import org.opensearch.reportsscheduler.util.buildReportLink
import org.opensearch.reportsscheduler.util.logger
import org.opensearch.reportsscheduler.util.sendNotificationWithHTML
import org.opensearch.search.builder.SearchSourceBuilder
import java.time.Instant
import kotlin.coroutines.resume
import kotlin.coroutines.resumeWithException
import kotlin.coroutines.suspendCoroutine

internal object ReportDefinitionJobRunner : ScheduledJobRunner {
private val log by logger(ReportDefinitionJobRunner::class.java)
private val scope: CoroutineScope = CoroutineScope(Dispatchers.IO)
lateinit var nodeClient: NodeClient
const val MAX_SIZE: Int = 10

private lateinit var client: Client
private lateinit var clusterService: ClusterService

/**
* Wazuh - Gets a NotificationConfigInfo object by ID if it exists.
* Initialize the class
* @param client The ES client
* @param clusterService The ES cluster service
*/
private suspend fun getNotificationConfigInfo(client: NodeClient, id: String): NotificationConfigInfo? {
return try {
val res: GetNotificationConfigResponse = getNotificationConfig(client, GetNotificationConfigRequest(setOf(id)))
res.searchResult.objectList.firstOrNull()
} catch (e: OpenSearchSecurityException) {
throw e
} catch (e: OpenSearchStatusException) {
if (e.status() == RestStatus.NOT_FOUND) {
log.debug("Notification config [$id] was not found")
}
null
}
}

private suspend fun getNotificationConfig(
client: NodeClient,
getNotificationConfigRequest: GetNotificationConfigRequest
): GetNotificationConfigResponse {
val getNotificationConfigResponse: GetNotificationConfigResponse = NotificationsPluginInterface.suspendUntil {
this.getNotificationConfig(
client,
getNotificationConfigRequest,
it
)
}
return getNotificationConfigResponse
fun initialize(client: Client, clusterService: ClusterService) {
this.client = SecureIndexClient(client)
this.clusterService = clusterService
}

private suspend fun createNotification(
client: NodeClient,
configInfo: NotificationConfigInfo?,
configInfo: NotificationConfigInfo,
reportDefinitionDetails: ReportDefinitionDetails,
id: String,
hits: Long?
Expand All @@ -88,14 +57,17 @@ internal object ReportDefinitionJobRunner : ScheduledJobRunner {
val textMessage: String = reportDefinitionDetails.reportDefinition.delivery.textDescription
val htmlMessage: String? = reportDefinitionDetails.reportDefinition.delivery.htmlDescription

val urlDefinition: String = buildReportLink(reportDefinitionDetails.reportDefinition.source.origin, reportDefinitionDetails.tenant, id)
val urlDefinition: String =
buildReportLink(reportDefinitionDetails.reportDefinition.source.origin, reportDefinitionDetails.tenant, id)

val textWithURL: String = textMessage.replace("{{urlDefinition}}", urlDefinition).replace("{{hits}}", hits.toString())
val htmlWithURL: String? = htmlMessage?.replace("{{urlDefinition}}", urlDefinition)?.replace("{{hits}}", hits.toString())
val textWithURL: String =
textMessage.replace("{{urlDefinition}}", urlDefinition).replace("{{hits}}", hits.toString())
val htmlWithURL: String? =
htmlMessage?.replace("{{urlDefinition}}", urlDefinition)?.replace("{{hits}}", hits.toString())

log.info("esto es el mensaje html $htmlMessage")
configInfo?.sendNotifications(
client,
configInfo.sendNotificationWithHTML(
this.client,
title,
textWithURL,
htmlWithURL
Expand Down Expand Up @@ -128,7 +100,7 @@ internal object ReportDefinitionJobRunner : ScheduledJobRunner {
log.warn("$LOG_PREFIX:runJob-job creation failed for $reportInstance")
} else {
log.info("$LOG_PREFIX:runJob-created job:$id")

// Wazuh - Make queries
val builderSearchResponse: SearchSourceBuilder = SearchSourceBuilder()
.query(
Expand All @@ -142,64 +114,28 @@ internal object ReportDefinitionJobRunner : ScheduledJobRunner {
QueryBuilders.matchQuery("agent.id", "001")
)
)
val jobSearchRequest: SearchRequest = SearchRequest().indices("wazuh-alerts-*").source(builderSearchResponse)
val response: SearchResponse = nodeClient.search(jobSearchRequest).actionGet()

val configInfo = getNotificationConfigInfo(
nodeClient,
id = reportDefinitionDetails.reportDefinition.delivery!!.configIds.get(0)
val jobSearchRequest: SearchRequest =
SearchRequest().indices("wazuh-alerts-*").source(builderSearchResponse)
val response: SearchResponse = client.search(jobSearchRequest).actionGet()

val reportDefinitionId = reportDefinitionDetails.reportDefinition.delivery!!.configIds[0]
val configInfo: NotificationConfigInfo? = getNotificationConfigInfo(
client as NodeClient,
reportDefinitionId
)
createNotification(nodeClient, configInfo, reportDefinitionDetails, id, response.getHits().getTotalHits()?.value)

if (configInfo != null) {
createNotification(
configInfo,
reportDefinitionDetails,
id,
response.hits.totalHits?.value
)
log.info("Notification with id $id was sent.")
} else {
log.error("NotificationConfigInfo with id $reportDefinitionId was not found.")
}
}
}
}
}


/**
* Wazuh - Send notification
*/
suspend fun NotificationConfigInfo.sendNotifications(client: Client, title: String, compiledMessage: String, compiledMessageHTML: String?): String {
val config = this
val res: SendNotificationResponse = NotificationsPluginInterface.suspendUntil {
this.sendNotification(
(client as NodeClient),
EventSource(title, config.configId, SeverityType.INFO),
ChannelMessage(compiledMessage, compiledMessageHTML, null),
listOf(config.configId),
it
)
}
validateResponseStatus(res.getStatus(), res.notificationEvent.toString())
return res.notificationEvent.toString()
}

suspend fun <T> NotificationsPluginInterface.suspendUntil(block: NotificationsPluginInterface.(ActionListener<T>) -> Unit): T =
suspendCoroutine { cont ->
block(object : ActionListener<T> {
override fun onResponse(response: T) = cont.resume(response)

override fun onFailure(e: Exception) = cont.resumeWithException(e)
})
}

/**
* All valid response statuses.
*/
private val VALID_RESPONSE_STATUS = setOf(
RestStatus.OK.status,
RestStatus.CREATED.status,
RestStatus.ACCEPTED.status,
RestStatus.NON_AUTHORITATIVE_INFORMATION.status,
RestStatus.NO_CONTENT.status,
RestStatus.RESET_CONTENT.status,
RestStatus.PARTIAL_CONTENT.status,
RestStatus.MULTI_STATUS.status
)

@Throws(OpenSearchStatusException::class)
fun validateResponseStatus(restStatus: RestStatus, responseContent: String) {
if (!VALID_RESPONSE_STATUS.contains(restStatus.status)) {
throw OpenSearchStatusException("Failed: $responseContent", restStatus)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.reportsscheduler.util

import org.apache.logging.log4j.LogManager
import org.opensearch.OpenSearchSecurityException
import org.opensearch.OpenSearchStatusException
import org.opensearch.client.Client
import org.opensearch.client.node.NodeClient
import org.opensearch.commons.notifications.NotificationsPluginInterface
import org.opensearch.commons.notifications.action.GetNotificationConfigRequest
import org.opensearch.commons.notifications.action.GetNotificationConfigResponse
import org.opensearch.commons.notifications.action.SendNotificationResponse
import org.opensearch.commons.notifications.model.ChannelMessage
import org.opensearch.commons.notifications.model.EventSource
import org.opensearch.commons.notifications.model.NotificationConfigInfo
import org.opensearch.commons.notifications.model.SeverityType
import org.opensearch.core.action.ActionListener
import org.opensearch.core.rest.RestStatus
import kotlin.coroutines.resume
import kotlin.coroutines.resumeWithException
import kotlin.coroutines.suspendCoroutine

object NotificationApiUtils {

private val logger = LogManager.getLogger(NotificationApiUtils::class)

/**
* Gets a NotificationConfigInfo object by ID if it exists.
*/
suspend fun getNotificationConfigInfo(client: NodeClient, id: String): NotificationConfigInfo? {
return try {
val res: GetNotificationConfigResponse =
getNotificationConfig(client, GetNotificationConfigRequest(setOf(id)))
res.searchResult.objectList.firstOrNull()
} catch (e: OpenSearchSecurityException) {
throw e
} catch (e: OpenSearchStatusException) {
if (e.status() == RestStatus.NOT_FOUND) {
logger.debug("Notification config [$id] was not found")
}
null
}
}

private suspend fun getNotificationConfig(
client: NodeClient,
getNotificationConfigRequest: GetNotificationConfigRequest
): GetNotificationConfigResponse {
val getNotificationConfigResponse: GetNotificationConfigResponse =
NotificationsPluginInterface.suspendUntil {
this.getNotificationConfig(
client,
getNotificationConfigRequest,
it
)
}
return getNotificationConfigResponse
}
}

/**
* Extension function for publishing a notification to a channel in the Notification plugin.
*/
suspend fun NotificationConfigInfo.sendNotificationWithHTML(
client: Client,
title: String,
compiledMessage: String,
compiledMessageHTML: String?
): String {
val config = this
val res: SendNotificationResponse = NotificationsPluginInterface.suspendUntil {
this.sendNotification(
(client as NodeClient),
EventSource(title, config.configId, SeverityType.INFO),
ChannelMessage(compiledMessage, compiledMessageHTML, null),
listOf(config.configId),
it
)
}
validateResponseStatus(res.getStatus(), res.notificationEvent.toString())
return res.notificationEvent.toString()
}

/**
* Converts [NotificationsPluginInterface] methods that take a callback into a kotlin suspending function.
*
* @param block - a block of code that is passed an [ActionListener] that should be passed to the NotificationsPluginInterface API.
*/
suspend fun <T> NotificationsPluginInterface.suspendUntil(block: NotificationsPluginInterface.(ActionListener<T>) -> Unit): T =
suspendCoroutine { cont ->
block(object : ActionListener<T> {
override fun onResponse(response: T) = cont.resume(response)

override fun onFailure(e: Exception) = cont.resumeWithException(e)
})
}

/**
* All valid response statuses.
*/
private val VALID_RESPONSE_STATUS = setOf(
RestStatus.OK.status,
RestStatus.CREATED.status,
RestStatus.ACCEPTED.status,
RestStatus.NON_AUTHORITATIVE_INFORMATION.status,
RestStatus.NO_CONTENT.status,
RestStatus.RESET_CONTENT.status,
RestStatus.PARTIAL_CONTENT.status,
RestStatus.MULTI_STATUS.status
)

@Throws(OpenSearchStatusException::class)
fun validateResponseStatus(restStatus: RestStatus, responseContent: String) {
if (!VALID_RESPONSE_STATUS.contains(restStatus.status)) {
throw OpenSearchStatusException("Failed: $responseContent", restStatus)
}
}

0 comments on commit 5a73ef1

Please sign in to comment.