Skip to content

Commit

Permalink
code cleanup 2
Browse files Browse the repository at this point in the history
  • Loading branch information
vga91 committed Nov 8, 2024
1 parent 3f835fe commit e517560
Show file tree
Hide file tree
Showing 10 changed files with 11 additions and 153 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -54,9 +54,6 @@ class StreamsSinkProcedures {
Stream.empty<StreamResult>()
} else {
val properties = config?.mapValues { it.value.toString() } ?: emptyMap()
// val configuration = getStreamsEventSink(db!!)!!
// .getEventSinkConfigMapper()
// .convert(config = properties)

val configuration = StreamsConfig.getConfiguration(properties)
readData(topic, config ?: emptyMap(), configuration)
Expand Down Expand Up @@ -101,29 +98,16 @@ class StreamsSinkProcedures {
}

private fun createConsumer(consumerConfig: Map<String, String>, topic: String): StreamsEventConsumer = runBlocking {
// todo - check that
val copy = StreamsConfig.getConfiguration()
// val copy = StreamsConfig.getInstance(db!! as GraphDatabaseAPI).getConfiguration()
.filter { it.value is String }
.mapValues { it.value.toString() }
.mapValues { it.value }
.toMutableMap()
copy.putAll(consumerConfig)
getStreamsEventSink(db!!)!!.getEventConsumerFactory()
.createStreamsEventConsumer(copy, log!!, setOf(topic))
}

companion object {
// todo - move in another class, similar to CypherProceduresHandler extends LifecycleAdapter implements AvailabilityListener {
// fun initListeners(db: GraphDatabaseAPI?, log: Log?) {
// // todo - move in another class, similar to CypherProcedureHandler
// // todo - check if there is a better way, maybe put if(apoc.kafka.enabled=true)
// StreamsRouterConfigurationListener(db!!, log!!
// ).start(StreamsConfig.getConfiguration())
//
// StreamsSinkConfigurationListener(db!!, log!!
// ).start(StreamsConfig.getConfiguration())
// }
//
private val streamsEventSinkStore = ConcurrentHashMap<String, KafkaEventSink>()

private fun getStreamsEventSink(db: GraphDatabaseService) = streamsEventSinkStore[KafkaUtil.getName(db)]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,84 +10,29 @@ import apoc.kafka.producer.kafka.KafkaEventRouter
import apoc.kafka.utils.KafkaUtil.getConsumerProperties

class StreamsRouterConfigurationListener(private val db: GraphDatabaseAPI,
private val log: Log) /*: ConfigurationLifecycleListener*/ {
private val mutex = Mutex()
private val log: Log) {

// private var txHandler: StreamsTransactionEventHandler? = null
// private var streamsConstraintsService: StreamsConstraintsService? = null
private var streamsEventRouter: KafkaEventRouter? = null
private var streamsEventRouterConfiguration: StreamsEventRouterConfiguration? = null

private var lastConfig: KafkaConfiguration? = null

private val consumerConfig = getConsumerProperties()

private fun KafkaConfiguration.excludeSinkProps() = this.asProperties()
?.filterNot { consumerConfig.contains(it.key)
|| it.key.toString().startsWith("apoc.kafka.sink")
// these are not yet used by the streams Source module
|| it.key == "apoc.kafka.cluster.only"
|| it.key == "apoc.kafka.check.apoc.timeout"
|| it.key == "apoc.kafka.check.apoc.interval" }

// override fun onShutdown() {
// runBlocking {
// mutex.withLock {
// shutdown()
// }
// }
// }

// visible for testing
fun isConfigurationChanged(configMap: Map<String, String>) = when (configMap
.getOrDefault("apoc.kafka.router", "apoc.kafka.kafka.KafkaEventRouter")) {
"apoc.kafka.kafka.KafkaEventRouter" -> {
// we validate all properties except for the ones related to the Consumer
// we use this strategy because there are some properties related to the Confluent Platform
// that we're not able to track from the Apache Packages
// i.e. the Schema Registry
val config = KafkaConfiguration.create(configMap).excludeSinkProps()
val lastConfig = lastConfig?.excludeSinkProps()
val streamsConfig = StreamsEventRouterConfiguration.from(configMap, db.databaseName(), isDefaultDb = db.isDefaultDb(), log)
config != lastConfig || streamsConfig != streamsEventRouterConfiguration
}
else -> true
}


fun shutdown() {
// val isShuttingDown = txHandler?.status() == StreamsPluginStatus.RUNNING
// if (isShuttingDown) {
// log.info("[Sink] Shutting down the Streams Source Module")
// }
if (streamsEventRouterConfiguration?.enabled == true) {
// streamsConstraintsService?.close()
streamsEventRouter?.stop()
streamsEventRouter = null
PublishProcedures.unregister(db)
// txHandler?.stop()
// txHandler = null
}
// if (isShuttingDown) {
// log.info("[Source] Shutdown of the Streams Source Module completed")
// }
}

fun start(configMap: Map<String, String>) {
lastConfig = KafkaConfiguration.create(configMap)
streamsEventRouterConfiguration = StreamsEventRouterConfiguration.from(configMap, db.databaseName(), isDefaultDb = db.isDefaultDb(), log)
// todo -- KafkaEventRouter
streamsEventRouter = KafkaEventRouter(configMap, db, log)// StreamsEventRouterFactory.getStreamsEventRouter(configMap, db, log)
// streamsConstraintsService = StreamsConstraintsService(db, streamsEventRouterConfiguration!!.schemaPollingInterval)
streamsEventRouter = KafkaEventRouter(configMap, db, log)
if (streamsEventRouterConfiguration?.enabled == true || streamsEventRouterConfiguration?.proceduresEnabled == true) {
// streamsConstraintsService!!.start()
streamsEventRouter!!.start()
}
// txHandler = StreamsTransactionEventHandler(streamsEventRouter!!, db, streamsConstraintsService!!)
if (streamsEventRouterConfiguration?.enabled == true) {
// streamsEventRouter!!.printInvalidTopics()
// txHandler!!.start()
}
PublishProcedures.register(db, streamsEventRouter!!/*, txHandler!!*/)
PublishProcedures.register(db, streamsEventRouter!!)
log.info("[Source] Streams Source module initialised")
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import java.util.*

class KafkaEventRouter(private val config: Map<String, String>,
private val db: GraphDatabaseService,
private val log: Log)/*: StreamsEventRouter(config, db, log)*/ {
private val log: Log) {

/*override*/ val eventRouterConfiguration: StreamsEventRouterConfiguration = StreamsEventRouterConfiguration
.from(config, db.databaseName(), db.isDefaultDb(), log)
Expand All @@ -38,14 +38,7 @@ class KafkaEventRouter(private val config: Map<String, String>,

private var producer: Neo4jKafkaProducer<ByteArray, ByteArray>? = null
private val kafkaConfig by lazy { KafkaConfiguration.from(config, log) }
private val kafkaAdminService by lazy { KafkaAdminService(kafkaConfig/*, eventRouterConfiguration.allTopics()*/, log) }

// /*override*/ fun printInvalidTopics() {
// val invalidTopics = kafkaAdminService.getInvalidTopics()
// if (invalidTopics.isNotEmpty()) {
// log.warn(getInvalidTopicsError(invalidTopics))
// }
// }
private val kafkaAdminService by lazy { KafkaAdminService(kafkaConfig, log) }

private fun status(producer: Neo4jKafkaProducer<*, *>?): StreamsPluginStatus = when (producer != null) {
true -> StreamsPluginStatus.RUNNING
Expand Down
20 changes: 0 additions & 20 deletions extended/src/main/kotlin/apoc/kafka/service/errors/ErrorService.kt
Original file line number Diff line number Diff line change
Expand Up @@ -59,26 +59,6 @@ abstract class ErrorService(private val config: Map<String, Any> = emptyMap()) {
data class ErrorConfig(val fail:Boolean=false, val log:Boolean=false, val logMessages:Boolean=false,
val dlqTopic:String? = null, val dlqHeaderPrefix:String = "", val dlqHeaders:Boolean = false, val dlqReplication: Int? = 3) {

/*
https://www.confluent.io/blog/kafka-connect-deep-dive-error-handling-dead-letter-queues
"errors.retry.timeout": "-1",
"errors.retry.delay.max.ms": "1000",
"errors.tolerance": "all", "none" == fail-fast, abort sink task
fail-fast for configuration errors (e.g. validate cypher statements on start)
errors.tolerance = all -> silently ignore all bad messages
org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java
"errors.log.enable": true,
"errors.deadletterqueue.context.headers.enable"=true/false
"errors.deadletterqueue.topic.name": "test-error-topic",
"errors.deadletterqueue.topic.replication.factor": 1,
"errors.log.include.messages": true,
*/

companion object {
const val TOLERANCE = "errors.tolerance"
const val LOG = "errors.log.enable"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ data class RelationshipPatternConfiguration(val start: NodePatternConfiguration,
// (:Source{!id})-[:REL_TYPE{foo, -bar}]->(:Target{!targetId})
private val cypherRelationshipPatternConfigured = """^\(:(.*?)\)(<)?-\[(?::)([\w\_]+)(\{\s*(-?[\w\*\.]+\s*(?:,\s*-?[\w\*\.]+\s*)*)\})?\]-(>)?\(:(.*?)\)$""".toRegex()
// LabelA{!id} REL_TYPE{foo, -bar} LabelB{!targetId}
private val simpleRelationshipPatternConfigured = """^(.*?) ([\w\_]+)(\{\s*(-?[\w\*\.]+\s*(?:,\s*-?[\w\*\.]+\s*)*)\})? (.*?)$""".toRegex() // """^\((.*?)\)-\[(?::)([\w\_]+)(\{\s*(-?[\w\*\.]+\s*(?:,\s*-?[\w\*\.]+\s*)*)\})?\]->\((.*?)\)$""".toRegex()
private val simpleRelationshipPatternConfigured = """^(.*?) ([\w\_]+)(\{\s*(-?[\w\*\.]+\s*(?:,\s*-?[\w\*\.]+\s*)*)\})? (.*?)$""".toRegex()

data class RelationshipPatternMetaData(val startPattern: String, val endPattern: String, val relType: String, val properties: List<String>) {
companion object {
Expand Down
4 changes: 3 additions & 1 deletion extended/src/main/kotlin/apoc/kafka/utils/JSONUtils.kt
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,9 @@ class TemporalAccessorSerializer : JsonSerializer<TemporalAccessor>() {
}
}

// NOTE: it works differently from apoc.JSONUtil
/**
* NOTE: it works differently from apoc.JSONUtil
*/
object JSONUtils {

private val OBJECT_MAPPER: ObjectMapper = jacksonObjectMapper()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package apoc.kafka.consumer.kafka
import apoc.kafka.PublishProcedures
import apoc.kafka.consumer.procedures.StreamsSinkProcedures
import apoc.kafka.producer.integrations.KafkaEventSinkSuiteIT
//import io.confluent.kafka.serializers.KafkaAvroSerializer
import org.apache.avro.generic.GenericRecord
import org.apache.kafka.clients.producer.KafkaProducer
import org.junit.jupiter.api.AfterAll
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import apoc.kafka.common.support.KafkaTestUtils
import apoc.kafka.common.support.Neo4jContainerExtension
import apoc.kafka.utils.KafkaUtil
import apoc.util.JsonUtil
//import io.confluent.kafka.serializers.KafkaAvroSerializer
import kotlinx.coroutines.delay
import kotlinx.coroutines.runBlocking
import org.apache.avro.generic.GenericRecord
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -220,40 +220,4 @@ class KafkaStreamsSinkProceduresTSE : KafkaEventSinkBaseTSE() {
val offsetAndMetadata = kafkaConsumer.committed(TopicPartition(topic, partition))
assertNull(offsetAndMetadata)
}

// @Test
// fun `should consume AVRO messages`() {
// val db = createDbWithKafkaConfigs("apoc.kafka.${ConsumerConfig.GROUP_ID_CONFIG}" to "avroajeje")
// val PLACE_SCHEMA = SchemaBuilder.builder("com.namespace")
// .record("Place").fields()
// .name("name").type().stringType().noDefault()
// .name("coordinates").type().array().items().doubleType().noDefault()
// .name("citizens").type().longType().noDefault()
// .endRecord()
// val coordinates = listOf(42.30000, -11.22222)
// val citizens = 1_000_000L
// val struct = GenericRecordBuilder(PLACE_SCHEMA)
// .set("name", "Foo")
// .set("coordinates", coordinates)
// .set("citizens", citizens)
// .build()
// val topic = "avro-procedure"
// val keyDeserializer = KafkaAvroDeserializer::class.java.name
// val valueDeserializer = KafkaAvroDeserializer::class.java.name
// kafkaAvroProducer.send(ProducerRecord(topic, null, struct)).get()
// val schemaRegistryUrl = KafkaEventSinkSuiteIT.schemaRegistry.getSchemaRegistryUrl()
// db.executeTransactionally("""
// CALL apoc.kafka.consume('$topic', {timeout: 5000, keyDeserializer: '$keyDeserializer', valueDeserializer: '$valueDeserializer', schemaRegistryUrl: '$schemaRegistryUrl'}) YIELD event
// RETURN event
// """.trimIndent(), emptyMap()
// ) { result ->
// assertTrue { result.hasNext() }
// val resultMap = result.next()
// assertTrue { resultMap.containsKey("event") }
// assertNotNull(resultMap["event"], "should contain event")
// val event = resultMap["event"] as Map<String, Any?>
// val resultData = event["data"] as Map<String, Any?>
// assertEquals(struct.toMap(), resultData)
// }
// }
}
Original file line number Diff line number Diff line change
Expand Up @@ -119,9 +119,6 @@ class KafkaEventRouterProcedureTSE : KafkaEventRouterBaseTSE() {
val db = createDbWithKafkaConfigs()
setUpProcedureTests()
db.execute("CREATE (n:Baz {age: 23, name: 'Foo', surname: 'Bar'})")
//
// val recordsCreation = kafkaConsumer.poll(5000)
// assertEquals(1, recordsCreation.count())

db.execute("MATCH (n:Baz) \n" +
"CALL apoc.kafka.publish.sync('neo4j', n) \n" +
Expand Down Expand Up @@ -159,8 +156,6 @@ class KafkaEventRouterProcedureTSE : KafkaEventRouterBaseTSE() {
val db = createDbWithKafkaConfigs()
setUpProcedureTests()
db.execute("CREATE (:Foo {one: 'two'})-[:KNOWS {alpha: 'beta'}]->(:Bar {three: 'four'})")
// val recordsCreation = kafkaConsumer.poll(5000)
// assertEquals(3, recordsCreation.count())

db.execute("""
MATCH (:Foo)-[r:KNOWS]->(:Bar)
Expand Down Expand Up @@ -192,9 +187,6 @@ class KafkaEventRouterProcedureTSE : KafkaEventRouterBaseTSE() {
setUpProcedureTests()
db.execute("CREATE (n:Foo {id: 1, name: 'Bar'})")

// val recordsCreation = kafkaConsumer.poll(5000)
// assertEquals(1, recordsCreation.count())

val message = "Hello World"
db.execute("MATCH (n:Foo {id: 1}) CALL apoc.kafka.publish.sync('neo4j', '$message', {key: n.foo}) YIELD value RETURN value") {
assertSyncResult(it)
Expand Down

0 comments on commit e517560

Please sign in to comment.