From 5a73ef1671c10bb35e80e4b647232d2ce9526950 Mon Sep 17 00:00:00 2001 From: Alex Ruiz Date: Wed, 25 Sep 2024 20:27:02 +0200 Subject: [PATCH] Refactor --- .idea/kotlinc.xml | 3 + .idea/misc.xml | 2 +- build.gradle | 13 -- .../ReportsSchedulerPlugin.kt | 3 +- .../scheduler/ReportDefinitionJobRunner.kt | 150 +++++------------- .../util/NotificationApiUtils.kt | 121 ++++++++++++++ 6 files changed, 169 insertions(+), 123 deletions(-) create mode 100644 src/main/kotlin/org/opensearch/reportsscheduler/util/NotificationApiUtils.kt diff --git a/.idea/kotlinc.xml b/.idea/kotlinc.xml index 0dd4b354..951989e5 100644 --- a/.idea/kotlinc.xml +++ b/.idea/kotlinc.xml @@ -3,4 +3,7 @@ + + \ No newline at end of file diff --git a/.idea/misc.xml b/.idea/misc.xml index 08bcbb84..6d5ded36 100644 --- a/.idea/misc.xml +++ b/.idea/misc.xml @@ -7,7 +7,7 @@ - + \ No newline at end of file diff --git a/build.gradle b/build.gradle index 0a62a8a3..78358cae 100644 --- a/build.gradle +++ b/build.gradle @@ -300,19 +300,6 @@ integTest.getClusters().forEach{c -> c.plugin(project.getObjects().fileProperty( testClusters.integTest { testDistribution = "INTEG_TEST" // need to install job-scheduler first, need to assemble job-scheduler first -// plugin(provider(new Callable(){ -// @Override -// RegularFile call() throws Exception { -// return new RegularFile() { -// @Override -// File getAsFile() { -// return configurations.zipArchive.asFileTree.getSingleFile() -// } -// } -// } -// })) - - plugin(provider({ new RegularFile() { @Override diff --git a/src/main/kotlin/org/opensearch/reportsscheduler/ReportsSchedulerPlugin.kt b/src/main/kotlin/org/opensearch/reportsscheduler/ReportsSchedulerPlugin.kt index 45158303..7a1fd80d 100644 --- a/src/main/kotlin/org/opensearch/reportsscheduler/ReportsSchedulerPlugin.kt +++ b/src/main/kotlin/org/opensearch/reportsscheduler/ReportsSchedulerPlugin.kt @@ -7,7 +7,6 @@ package org.opensearch.reportsscheduler import org.opensearch.action.ActionRequest import org.opensearch.client.Client -import org.opensearch.client.node.NodeClient import org.opensearch.cluster.metadata.IndexNameExpressionResolver import org.opensearch.cluster.node.DiscoveryNodes import org.opensearch.cluster.service.ClusterService @@ -104,8 +103,8 @@ class ReportsSchedulerPlugin : Plugin(), ActionPlugin, SystemIndexPlugin, JobSch indexNameExpressionResolver: IndexNameExpressionResolver, repositoriesServiceSupplier: Supplier ): Collection { - ReportDefinitionJobRunner.nodeClient = client as NodeClient PluginSettings.addSettingsUpdateConsumer(clusterService) + ReportDefinitionJobRunner.initialize(client, clusterService) ReportDefinitionsIndex.initialize(client, clusterService) ReportInstancesIndex.initialize(client, clusterService) return emptyList() diff --git a/src/main/kotlin/org/opensearch/reportsscheduler/scheduler/ReportDefinitionJobRunner.kt b/src/main/kotlin/org/opensearch/reportsscheduler/scheduler/ReportDefinitionJobRunner.kt index a66c4629..b1f6624d 100644 --- a/src/main/kotlin/org/opensearch/reportsscheduler/scheduler/ReportDefinitionJobRunner.kt +++ b/src/main/kotlin/org/opensearch/reportsscheduler/scheduler/ReportDefinitionJobRunner.kt @@ -8,22 +8,12 @@ package org.opensearch.reportsscheduler.scheduler import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.Dispatchers import kotlinx.coroutines.launch -import org.opensearch.OpenSearchSecurityException -import org.opensearch.OpenSearchStatusException import org.opensearch.action.search.SearchRequest import org.opensearch.action.search.SearchResponse import org.opensearch.client.Client import org.opensearch.client.node.NodeClient -import org.opensearch.commons.notifications.NotificationsPluginInterface -import org.opensearch.commons.notifications.action.GetNotificationConfigRequest -import org.opensearch.commons.notifications.action.GetNotificationConfigResponse -import org.opensearch.commons.notifications.action.SendNotificationResponse -import org.opensearch.commons.notifications.model.ChannelMessage -import org.opensearch.commons.notifications.model.EventSource +import org.opensearch.cluster.service.ClusterService import org.opensearch.commons.notifications.model.NotificationConfigInfo -import org.opensearch.commons.notifications.model.SeverityType -import org.opensearch.core.action.ActionListener -import org.opensearch.core.rest.RestStatus import org.opensearch.index.query.QueryBuilders import org.opensearch.jobscheduler.spi.JobExecutionContext import org.opensearch.jobscheduler.spi.ScheduledJobParameter @@ -32,54 +22,33 @@ import org.opensearch.reportsscheduler.ReportsSchedulerPlugin.Companion.LOG_PREF import org.opensearch.reportsscheduler.index.ReportInstancesIndex import org.opensearch.reportsscheduler.model.ReportDefinitionDetails import org.opensearch.reportsscheduler.model.ReportInstance +import org.opensearch.reportsscheduler.util.NotificationApiUtils.getNotificationConfigInfo +import org.opensearch.reportsscheduler.util.SecureIndexClient import org.opensearch.reportsscheduler.util.buildReportLink import org.opensearch.reportsscheduler.util.logger +import org.opensearch.reportsscheduler.util.sendNotificationWithHTML import org.opensearch.search.builder.SearchSourceBuilder import java.time.Instant -import kotlin.coroutines.resume -import kotlin.coroutines.resumeWithException -import kotlin.coroutines.suspendCoroutine internal object ReportDefinitionJobRunner : ScheduledJobRunner { private val log by logger(ReportDefinitionJobRunner::class.java) private val scope: CoroutineScope = CoroutineScope(Dispatchers.IO) - lateinit var nodeClient: NodeClient - const val MAX_SIZE: Int = 10 + + private lateinit var client: Client + private lateinit var clusterService: ClusterService /** - * Wazuh - Gets a NotificationConfigInfo object by ID if it exists. + * Initialize the class + * @param client The ES client + * @param clusterService The ES cluster service */ - private suspend fun getNotificationConfigInfo(client: NodeClient, id: String): NotificationConfigInfo? { - return try { - val res: GetNotificationConfigResponse = getNotificationConfig(client, GetNotificationConfigRequest(setOf(id))) - res.searchResult.objectList.firstOrNull() - } catch (e: OpenSearchSecurityException) { - throw e - } catch (e: OpenSearchStatusException) { - if (e.status() == RestStatus.NOT_FOUND) { - log.debug("Notification config [$id] was not found") - } - null - } - } - - private suspend fun getNotificationConfig( - client: NodeClient, - getNotificationConfigRequest: GetNotificationConfigRequest - ): GetNotificationConfigResponse { - val getNotificationConfigResponse: GetNotificationConfigResponse = NotificationsPluginInterface.suspendUntil { - this.getNotificationConfig( - client, - getNotificationConfigRequest, - it - ) - } - return getNotificationConfigResponse + fun initialize(client: Client, clusterService: ClusterService) { + this.client = SecureIndexClient(client) + this.clusterService = clusterService } private suspend fun createNotification( - client: NodeClient, - configInfo: NotificationConfigInfo?, + configInfo: NotificationConfigInfo, reportDefinitionDetails: ReportDefinitionDetails, id: String, hits: Long? @@ -88,14 +57,17 @@ internal object ReportDefinitionJobRunner : ScheduledJobRunner { val textMessage: String = reportDefinitionDetails.reportDefinition.delivery.textDescription val htmlMessage: String? = reportDefinitionDetails.reportDefinition.delivery.htmlDescription - val urlDefinition: String = buildReportLink(reportDefinitionDetails.reportDefinition.source.origin, reportDefinitionDetails.tenant, id) + val urlDefinition: String = + buildReportLink(reportDefinitionDetails.reportDefinition.source.origin, reportDefinitionDetails.tenant, id) - val textWithURL: String = textMessage.replace("{{urlDefinition}}", urlDefinition).replace("{{hits}}", hits.toString()) - val htmlWithURL: String? = htmlMessage?.replace("{{urlDefinition}}", urlDefinition)?.replace("{{hits}}", hits.toString()) + val textWithURL: String = + textMessage.replace("{{urlDefinition}}", urlDefinition).replace("{{hits}}", hits.toString()) + val htmlWithURL: String? = + htmlMessage?.replace("{{urlDefinition}}", urlDefinition)?.replace("{{hits}}", hits.toString()) log.info("esto es el mensaje html $htmlMessage") - configInfo?.sendNotifications( - client, + configInfo.sendNotificationWithHTML( + this.client, title, textWithURL, htmlWithURL @@ -128,7 +100,7 @@ internal object ReportDefinitionJobRunner : ScheduledJobRunner { log.warn("$LOG_PREFIX:runJob-job creation failed for $reportInstance") } else { log.info("$LOG_PREFIX:runJob-created job:$id") - + // Wazuh - Make queries val builderSearchResponse: SearchSourceBuilder = SearchSourceBuilder() .query( @@ -142,64 +114,28 @@ internal object ReportDefinitionJobRunner : ScheduledJobRunner { QueryBuilders.matchQuery("agent.id", "001") ) ) - val jobSearchRequest: SearchRequest = SearchRequest().indices("wazuh-alerts-*").source(builderSearchResponse) - val response: SearchResponse = nodeClient.search(jobSearchRequest).actionGet() - - val configInfo = getNotificationConfigInfo( - nodeClient, - id = reportDefinitionDetails.reportDefinition.delivery!!.configIds.get(0) + val jobSearchRequest: SearchRequest = + SearchRequest().indices("wazuh-alerts-*").source(builderSearchResponse) + val response: SearchResponse = client.search(jobSearchRequest).actionGet() + + val reportDefinitionId = reportDefinitionDetails.reportDefinition.delivery!!.configIds[0] + val configInfo: NotificationConfigInfo? = getNotificationConfigInfo( + client as NodeClient, + reportDefinitionId ) - createNotification(nodeClient, configInfo, reportDefinitionDetails, id, response.getHits().getTotalHits()?.value) + + if (configInfo != null) { + createNotification( + configInfo, + reportDefinitionDetails, + id, + response.hits.totalHits?.value + ) + log.info("Notification with id $id was sent.") + } else { + log.error("NotificationConfigInfo with id $reportDefinitionId was not found.") + } } } } } - - -/** - * Wazuh - Send notification - */ -suspend fun NotificationConfigInfo.sendNotifications(client: Client, title: String, compiledMessage: String, compiledMessageHTML: String?): String { - val config = this - val res: SendNotificationResponse = NotificationsPluginInterface.suspendUntil { - this.sendNotification( - (client as NodeClient), - EventSource(title, config.configId, SeverityType.INFO), - ChannelMessage(compiledMessage, compiledMessageHTML, null), - listOf(config.configId), - it - ) - } - validateResponseStatus(res.getStatus(), res.notificationEvent.toString()) - return res.notificationEvent.toString() -} - -suspend fun NotificationsPluginInterface.suspendUntil(block: NotificationsPluginInterface.(ActionListener) -> Unit): T = - suspendCoroutine { cont -> - block(object : ActionListener { - override fun onResponse(response: T) = cont.resume(response) - - override fun onFailure(e: Exception) = cont.resumeWithException(e) - }) - } - -/** - * All valid response statuses. - */ -private val VALID_RESPONSE_STATUS = setOf( - RestStatus.OK.status, - RestStatus.CREATED.status, - RestStatus.ACCEPTED.status, - RestStatus.NON_AUTHORITATIVE_INFORMATION.status, - RestStatus.NO_CONTENT.status, - RestStatus.RESET_CONTENT.status, - RestStatus.PARTIAL_CONTENT.status, - RestStatus.MULTI_STATUS.status -) - -@Throws(OpenSearchStatusException::class) -fun validateResponseStatus(restStatus: RestStatus, responseContent: String) { - if (!VALID_RESPONSE_STATUS.contains(restStatus.status)) { - throw OpenSearchStatusException("Failed: $responseContent", restStatus) - } -} diff --git a/src/main/kotlin/org/opensearch/reportsscheduler/util/NotificationApiUtils.kt b/src/main/kotlin/org/opensearch/reportsscheduler/util/NotificationApiUtils.kt new file mode 100644 index 00000000..414db187 --- /dev/null +++ b/src/main/kotlin/org/opensearch/reportsscheduler/util/NotificationApiUtils.kt @@ -0,0 +1,121 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.reportsscheduler.util + +import org.apache.logging.log4j.LogManager +import org.opensearch.OpenSearchSecurityException +import org.opensearch.OpenSearchStatusException +import org.opensearch.client.Client +import org.opensearch.client.node.NodeClient +import org.opensearch.commons.notifications.NotificationsPluginInterface +import org.opensearch.commons.notifications.action.GetNotificationConfigRequest +import org.opensearch.commons.notifications.action.GetNotificationConfigResponse +import org.opensearch.commons.notifications.action.SendNotificationResponse +import org.opensearch.commons.notifications.model.ChannelMessage +import org.opensearch.commons.notifications.model.EventSource +import org.opensearch.commons.notifications.model.NotificationConfigInfo +import org.opensearch.commons.notifications.model.SeverityType +import org.opensearch.core.action.ActionListener +import org.opensearch.core.rest.RestStatus +import kotlin.coroutines.resume +import kotlin.coroutines.resumeWithException +import kotlin.coroutines.suspendCoroutine + +object NotificationApiUtils { + + private val logger = LogManager.getLogger(NotificationApiUtils::class) + + /** + * Gets a NotificationConfigInfo object by ID if it exists. + */ + suspend fun getNotificationConfigInfo(client: NodeClient, id: String): NotificationConfigInfo? { + return try { + val res: GetNotificationConfigResponse = + getNotificationConfig(client, GetNotificationConfigRequest(setOf(id))) + res.searchResult.objectList.firstOrNull() + } catch (e: OpenSearchSecurityException) { + throw e + } catch (e: OpenSearchStatusException) { + if (e.status() == RestStatus.NOT_FOUND) { + logger.debug("Notification config [$id] was not found") + } + null + } + } + + private suspend fun getNotificationConfig( + client: NodeClient, + getNotificationConfigRequest: GetNotificationConfigRequest + ): GetNotificationConfigResponse { + val getNotificationConfigResponse: GetNotificationConfigResponse = + NotificationsPluginInterface.suspendUntil { + this.getNotificationConfig( + client, + getNotificationConfigRequest, + it + ) + } + return getNotificationConfigResponse + } +} + +/** + * Extension function for publishing a notification to a channel in the Notification plugin. + */ +suspend fun NotificationConfigInfo.sendNotificationWithHTML( + client: Client, + title: String, + compiledMessage: String, + compiledMessageHTML: String? +): String { + val config = this + val res: SendNotificationResponse = NotificationsPluginInterface.suspendUntil { + this.sendNotification( + (client as NodeClient), + EventSource(title, config.configId, SeverityType.INFO), + ChannelMessage(compiledMessage, compiledMessageHTML, null), + listOf(config.configId), + it + ) + } + validateResponseStatus(res.getStatus(), res.notificationEvent.toString()) + return res.notificationEvent.toString() +} + +/** + * Converts [NotificationsPluginInterface] methods that take a callback into a kotlin suspending function. + * + * @param block - a block of code that is passed an [ActionListener] that should be passed to the NotificationsPluginInterface API. + */ +suspend fun NotificationsPluginInterface.suspendUntil(block: NotificationsPluginInterface.(ActionListener) -> Unit): T = + suspendCoroutine { cont -> + block(object : ActionListener { + override fun onResponse(response: T) = cont.resume(response) + + override fun onFailure(e: Exception) = cont.resumeWithException(e) + }) + } + +/** + * All valid response statuses. + */ +private val VALID_RESPONSE_STATUS = setOf( + RestStatus.OK.status, + RestStatus.CREATED.status, + RestStatus.ACCEPTED.status, + RestStatus.NON_AUTHORITATIVE_INFORMATION.status, + RestStatus.NO_CONTENT.status, + RestStatus.RESET_CONTENT.status, + RestStatus.PARTIAL_CONTENT.status, + RestStatus.MULTI_STATUS.status +) + +@Throws(OpenSearchStatusException::class) +fun validateResponseStatus(restStatus: RestStatus, responseContent: String) { + if (!VALID_RESPONSE_STATUS.contains(restStatus.status)) { + throw OpenSearchStatusException("Failed: $responseContent", restStatus) + } +}