Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Apply POC email notification #6

Merged
merged 4 commits into from
Sep 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,7 @@ $RECYCLE.BIN/
.idea/modules.xml
.idea/*.iml
.idea/modules
.idea/*.xml
*.iml
*.ipr

Expand Down
3 changes: 3 additions & 0 deletions .idea/kotlinc.xml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion .idea/misc.xml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion .project
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
<?xml version="1.0" encoding="UTF-8"?>
<projectDescription>
<name>opensearch-reports-scheduler</name>
<name>wazuh-indexer-reports-scheduler</name>
<comment>Project reports-scheduler created by Buildship.</comment>
<projects>
</projects>
Expand Down
40 changes: 32 additions & 8 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,9 @@ repositories {
}

dependencies {
// Needed for integ tests
zipArchive group: 'org.opensearch.plugin', name:'opensearch-notifications-core', version: "${opensearch_build}"
zipArchive group: 'org.opensearch.plugin', name:'notifications', version: "${opensearch_build}"
zipArchive group: 'org.opensearch.plugin', name:'opensearch-job-scheduler', version: "${opensearch_build}"
implementation "org.opensearch:opensearch:${opensearch_version}"
implementation "org.jetbrains.kotlin:kotlin-stdlib:${kotlin_version}"
Expand Down Expand Up @@ -297,14 +300,35 @@ integTest.getClusters().forEach{c -> c.plugin(project.getObjects().fileProperty(
testClusters.integTest {
testDistribution = "INTEG_TEST"
// need to install job-scheduler first, need to assemble job-scheduler first
plugin(provider(new Callable<RegularFile>(){
@Override
RegularFile call() throws Exception {
return new RegularFile() {
@Override
File getAsFile() {
return configurations.zipArchive.asFileTree.getSingleFile()
}
plugin(provider({
new RegularFile() {
@Override
File getAsFile() {
return configurations.zipArchive.asFileTree.matching {
include '**/opensearch-job-scheduler*'
}.singleFile
}
}
}))

plugin(provider({
new RegularFile() {
@Override
File getAsFile() {
return configurations.zipArchive.asFileTree.matching {
include '**/opensearch-notifications-core*'
}.singleFile
}
}
}))

plugin(provider({
new RegularFile() {
@Override
File getAsFile() {
return configurations.zipArchive.asFileTree.matching {
include '**/notifications*'
}.singleFile
}
}
}))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@ class ReportsSchedulerPlugin : Plugin(), ActionPlugin, SystemIndexPlugin, JobSch
repositoriesServiceSupplier: Supplier<RepositoriesService>
): Collection<Any> {
PluginSettings.addSettingsUpdateConsumer(clusterService)
ReportDefinitionJobRunner.initialize(client, clusterService)
ReportDefinitionsIndex.initialize(client, clusterService)
ReportInstancesIndex.initialize(client, clusterService)
return emptyList()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,20 +8,72 @@ package org.opensearch.reportsscheduler.scheduler
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.launch
import org.opensearch.action.search.SearchRequest
import org.opensearch.action.search.SearchResponse
import org.opensearch.client.Client
import org.opensearch.client.node.NodeClient
import org.opensearch.cluster.service.ClusterService
import org.opensearch.commons.notifications.model.NotificationConfigInfo
import org.opensearch.index.query.QueryBuilders
import org.opensearch.jobscheduler.spi.JobExecutionContext
import org.opensearch.jobscheduler.spi.ScheduledJobParameter
import org.opensearch.jobscheduler.spi.ScheduledJobRunner
import org.opensearch.reportsscheduler.ReportsSchedulerPlugin.Companion.LOG_PREFIX
import org.opensearch.reportsscheduler.index.ReportInstancesIndex
import org.opensearch.reportsscheduler.model.ReportDefinitionDetails
import org.opensearch.reportsscheduler.model.ReportInstance
import org.opensearch.reportsscheduler.util.NotificationApiUtils.getNotificationConfigInfo
import org.opensearch.reportsscheduler.util.SecureIndexClient
import org.opensearch.reportsscheduler.util.buildReportLink
import org.opensearch.reportsscheduler.util.logger
import org.opensearch.reportsscheduler.util.sendNotificationWithHTML
import org.opensearch.search.builder.SearchSourceBuilder
import java.time.Instant

internal object ReportDefinitionJobRunner : ScheduledJobRunner {
private val log by logger(ReportDefinitionJobRunner::class.java)
private val scope: CoroutineScope = CoroutineScope(Dispatchers.IO)

private lateinit var client: Client
private lateinit var clusterService: ClusterService

/**
* Initialize the class
* @param client The ES client
* @param clusterService The ES cluster service
*/
fun initialize(client: Client, clusterService: ClusterService) {
this.client = SecureIndexClient(client)
this.clusterService = clusterService
}

private suspend fun createNotification(
configInfo: NotificationConfigInfo,
reportDefinitionDetails: ReportDefinitionDetails,
id: String,
hits: Long?
) {
val title: String = reportDefinitionDetails.reportDefinition.delivery!!.title
val textMessage: String = reportDefinitionDetails.reportDefinition.delivery.textDescription
val htmlMessage: String? = reportDefinitionDetails.reportDefinition.delivery.htmlDescription

val urlDefinition: String =
buildReportLink(reportDefinitionDetails.reportDefinition.source.origin, reportDefinitionDetails.tenant, id)

val textWithURL: String =
textMessage.replace("{{urlDefinition}}", urlDefinition).replace("{{hits}}", hits.toString())
val htmlWithURL: String? =
htmlMessage?.replace("{{urlDefinition}}", urlDefinition)?.replace("{{hits}}", hits.toString())

log.info("esto es el mensaje html $htmlMessage")
configInfo.sendNotificationWithHTML(
this.client,
title,
textWithURL,
htmlWithURL
)
}

override fun runJob(job: ScheduledJobParameter, context: JobExecutionContext) {
if (job !is ReportDefinitionDetails) {
log.warn("$LOG_PREFIX:job is not of type ReportDefinitionDetails:${job.javaClass.name}")
Expand All @@ -48,6 +100,41 @@ internal object ReportDefinitionJobRunner : ScheduledJobRunner {
log.warn("$LOG_PREFIX:runJob-job creation failed for $reportInstance")
} else {
log.info("$LOG_PREFIX:runJob-created job:$id")

// Wazuh - Make queries
val builderSearchResponse: SearchSourceBuilder = SearchSourceBuilder()
.query(
QueryBuilders.boolQuery()
.must(
QueryBuilders.rangeQuery("timestamp")
.gt(beginTime)
.lte(currentTime)
)
.must(
QueryBuilders.matchQuery("agent.id", "001")
)
)
val jobSearchRequest: SearchRequest =
SearchRequest().indices("wazuh-alerts-*").source(builderSearchResponse)
val response: SearchResponse = client.search(jobSearchRequest).actionGet()

val reportDefinitionId = reportDefinitionDetails.reportDefinition.delivery!!.configIds[0]
val configInfo: NotificationConfigInfo? = getNotificationConfigInfo(
client as NodeClient,
reportDefinitionId
)

if (configInfo != null) {
createNotification(
configInfo,
reportDefinitionDetails,
id,
response.hits.totalHits?.value
)
log.info("Notification with id $id was sent.")
} else {
log.error("NotificationConfigInfo with id $reportDefinitionId was not found.")
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.reportsscheduler.util

import org.apache.logging.log4j.LogManager
import org.opensearch.OpenSearchSecurityException
import org.opensearch.OpenSearchStatusException
import org.opensearch.client.Client
import org.opensearch.client.node.NodeClient
import org.opensearch.commons.notifications.NotificationsPluginInterface
import org.opensearch.commons.notifications.action.GetNotificationConfigRequest
import org.opensearch.commons.notifications.action.GetNotificationConfigResponse
import org.opensearch.commons.notifications.action.SendNotificationResponse
import org.opensearch.commons.notifications.model.ChannelMessage
import org.opensearch.commons.notifications.model.EventSource
import org.opensearch.commons.notifications.model.NotificationConfigInfo
import org.opensearch.commons.notifications.model.SeverityType
import org.opensearch.core.action.ActionListener
import org.opensearch.core.rest.RestStatus
import kotlin.coroutines.resume
import kotlin.coroutines.resumeWithException
import kotlin.coroutines.suspendCoroutine

object NotificationApiUtils {

private val logger = LogManager.getLogger(NotificationApiUtils::class)

/**
* Gets a NotificationConfigInfo object by ID if it exists.
*/
suspend fun getNotificationConfigInfo(client: NodeClient, id: String): NotificationConfigInfo? {
return try {
val res: GetNotificationConfigResponse =
getNotificationConfig(client, GetNotificationConfigRequest(setOf(id)))
res.searchResult.objectList.firstOrNull()
} catch (e: OpenSearchSecurityException) {
throw e
} catch (e: OpenSearchStatusException) {
if (e.status() == RestStatus.NOT_FOUND) {
logger.debug("Notification config [$id] was not found")
}
null
}
}

private suspend fun getNotificationConfig(
client: NodeClient,
getNotificationConfigRequest: GetNotificationConfigRequest
): GetNotificationConfigResponse {
val getNotificationConfigResponse: GetNotificationConfigResponse =
NotificationsPluginInterface.suspendUntil {
this.getNotificationConfig(
client,
getNotificationConfigRequest,
it
)
}
return getNotificationConfigResponse
}
}

/**
* Extension function for publishing a notification to a channel in the Notification plugin.
*/
suspend fun NotificationConfigInfo.sendNotificationWithHTML(
client: Client,
title: String,
compiledMessage: String,
compiledMessageHTML: String?
): String {
val config = this
val res: SendNotificationResponse = NotificationsPluginInterface.suspendUntil {
this.sendNotification(
(client as NodeClient),
EventSource(title, config.configId, SeverityType.INFO),
ChannelMessage(compiledMessage, compiledMessageHTML, null),
listOf(config.configId),
it
)
}
validateResponseStatus(res.getStatus(), res.notificationEvent.toString())
return res.notificationEvent.toString()
}

/**
* Converts [NotificationsPluginInterface] methods that take a callback into a kotlin suspending function.
*
* @param block - a block of code that is passed an [ActionListener] that should be passed to the NotificationsPluginInterface API.
*/
suspend fun <T> NotificationsPluginInterface.suspendUntil(block: NotificationsPluginInterface.(ActionListener<T>) -> Unit): T =
suspendCoroutine { cont ->
block(object : ActionListener<T> {
override fun onResponse(response: T) = cont.resume(response)

override fun onFailure(e: Exception) = cont.resumeWithException(e)
})
}

/**
* All valid response statuses.
*/
private val VALID_RESPONSE_STATUS = setOf(
RestStatus.OK.status,
RestStatus.CREATED.status,
RestStatus.ACCEPTED.status,
RestStatus.NON_AUTHORITATIVE_INFORMATION.status,
RestStatus.NO_CONTENT.status,
RestStatus.RESET_CONTENT.status,
RestStatus.PARTIAL_CONTENT.status,
RestStatus.MULTI_STATUS.status
)

@Throws(OpenSearchStatusException::class)
fun validateResponseStatus(restStatus: RestStatus, responseContent: String) {
if (!VALID_RESPONSE_STATUS.contains(restStatus.status)) {
throw OpenSearchStatusException("Failed: $responseContent", restStatus)
}
}
Loading