Skip to content

Commit

Permalink
cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
vga91 committed Dec 6, 2024
1 parent 1bf777a commit cc550a0
Show file tree
Hide file tree
Showing 46 changed files with 13 additions and 5,484 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,7 @@ endif::env-docs[]
[[apoc_neo4j_plugin_quickstart]]
== APOC Kafka Plugin

Any configuration option that starts with `apoc.kafka.` controls how the plugin itself behaves. For a full
list of options available, see the documentation subsections on the xref:database-integration/kafka/producer.adoc[source] and xref:database-integration/kafka/consumer.adoc#apoc_kafka_sink[sink].
Any configuration option that starts with `apoc.kafka.` controls how the plugin itself behaves.

=== Install the Plugin

Expand All @@ -30,11 +29,7 @@ Configuration settings which are valid for those connectors will also work for A
For example, in the Kafka documentation linked below, the configuration setting named `batch.size` should be stated as
`apoc.kafka.batch.size` in APOC Kafka.

The following are common configuration settings you may wish to use. _This is not a complete
list_. The full list of configuration options and reference material is available from Confluent's
site for link:{url-confluent-install}/configuration/consumer-configs.html[sink configurations] and
link:{url-confluent-install}/configuration/producer-configs.html[source configurations].

The following are common configuration settings you may wish to use.
.Most Common Needed Configuration Settings
|===
|Setting Name |Description |Default Value
Expand Down Expand Up @@ -75,94 +70,8 @@ apoc.kafka.bootstrap.servers=localhost:9092
If you are using Confluent Cloud (managed Kafka), you can connect to Kafka as described in
the xref:database-integration/kafka/cloud.adoc#confluent_cloud[Confluent Cloud] section

=== Decide: Sink, Source, or Both

Configuring APOC Neo4j plugin comes in three different parts, depending on your need:

. *Required*: Configuring a connection to Kafka

.neo4j.conf
[source,ini]
----
apoc.kafka.bootstrap.servers=localhost:9092
----

. _Optional_: Configuring Neo4j to produce records to Kafka (xref:database-integration/kafka/producer.adoc[Source])
. _Optional_: Configuring Neo4j to ingest from Kafka (xref:database-integration/kafka/consumer.adoc#apoc_kafka_sink[Sink])

Follow one or both subsections according to your use case and need:

==== Sink

Take data from Kafka and store it in Neo4j (Neo4j as a data consumer) by adding configuration such as:

.neo4j.conf
[source,ini]
----
apoc.kafka.sink.enabled=true
apoc.kafka.sink.topic.cypher.my-ingest-topic=MERGE (n:Label {id: event.id}) ON CREATE SET n += event.properties
----

This will process every message that comes in on `my-ingest-topic` with the given cypher statement. When
that cypher statement executes, the `event` variable that is referenced will be set to the message received,
so this sample cypher will create a `(:Label)` node in the graph with the given ID, copying all of the
properties in the source message.

For full details on what you can do here, see the xref:database-integration/kafka/consumer.adoc#apoc_kafka_sink[Sink] section of the documentation.

==== Source

Produce data from Neo4j and send it to a Kafka topic (Neo4j as a data producer) by adding configuration such as:

.neo4j.conf
[source,ini]
----
apoc.kafka.source.topic.nodes.my-nodes-topic=Person{*}
apoc.kafka.source.topic.relationships.my-rels-topic=BELONGS-TO{*}
apoc.kafka.source.enabled=true
apoc.kafka.source.schema.polling.interval=10000
----

This will produce all graph nodes labeled `(:Person)` on to the topic `my-nodes-topic` and all
relationships of type `-[:BELONGS-TO]->` to the topic named `my-rels-topic`. Further, schema changes will
be polled every 10,000 ms, which affects how quickly the database picks up new indexes/schema changes.
Please note that if not specified a value for `apoc.kafka.source.schema.polling.interval` property then Streams plugin will use
300,000 ms as default.

The expressions `Person{\*}` and `BELONGS-TO{*}` are _patterns_. You can find documentation on how to change
these in the xref:database-integration/kafka/producer.adoc#source-patterns[Patterns] section.

For full details on what you can do here, see the xref:database-integration/kafka/producer.adoc[Source] section of the documentation.

==== Restart Neo4j

Once the plugin is installed and configured, restarting the database will make it active.
If you have configured Neo4j to consume from kafka, it will begin immediately processing messages.

[NOTE]

====
When installing the latest version of the APOC Kafka plugin into Neo4j 4.x, watching to logs you could find something
similar to the following:
[source,logs]
----
2020-03-25 20:13:50.606+0000 WARN Unrecognized setting. No declared setting with name: apoc.kafka.max.partition.fetch.bytes
2020-03-25 20:13:50.608+0000 WARN Unrecognized setting. No declared setting with name: apoc.kafka.sink.errors.log.include.messages
2020-03-25 20:13:50.608+0000 WARN Unrecognized setting. No declared setting with name: apoc.kafka.auto.offset.reset
2020-03-25 20:13:50.608+0000 WARN Unrecognized setting. No declared setting with name: apoc.kafka.bootstrap.servers
2020-03-25 20:13:50.608+0000 WARN Unrecognized setting. No declared setting with name: apoc.kafka.max.poll.records
2020-03-25 20:13:50.609+0000 WARN Unrecognized setting. No declared setting with name: apoc.kafka.sink.errors.log.enable
2020-03-25 20:13:50.609+0000 WARN Unrecognized setting. No declared setting with name: apoc.kafka.source.enabled
2020-03-25 20:13:50.609+0000 WARN Unrecognized setting. No declared setting with name: apoc.kafka.sink.topic.cypher.boa.to.kafkaTest
2020-03-25 20:13:50.609+0000 WARN Unrecognized setting. No declared setting with name: apoc.kafka.sink.errors.tolerance
2020-03-25 20:13:50.609+0000 WARN Unrecognized setting. No declared setting with name: apoc.kafka.group.id
2020-03-25 20:13:50.609+0000 WARN Unrecognized setting. No declared setting with name: apoc.kafka.sink.errors.deadletterqueue.context.headers.enable
2020-03-25 20:13:50.609+0000 WARN Unrecognized setting. No declared setting with name: apoc.kafka.sink.errors.deadletterqueue.context.header.prefix
2020-03-25 20:13:50.610+0000 WARN Unrecognized setting. No declared setting with name: apoc.kafka.sink.errors.deadletterqueue.topic.name
2020-03-25 20:13:50.610+0000 WARN Unrecognized setting. No declared setting with name: apoc.kafka.sink.enabled.to.kafkaTest
----
*These are not errors*. They comes from the new Neo4j 4 Configuration System, which warns that it doesn't recognize those
properties. Despite these warnings the plugin will work properly.
====
19 changes: 0 additions & 19 deletions extended/src/main/java/apoc/ExtendedApocConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -74,25 +74,6 @@ public enum UuidFormatType { hex, base64 }

public static final String CONFIG_DIR = "config-dir=";

private static final String CONF_DIR_ARG = "config-dir=";
private static final String SOURCE_ENABLED = "apoc.kafka.source.enabled";
private static final boolean SOURCE_ENABLED_VALUE = true;
private static final String PROCEDURES_ENABLED = "apoc.kafka.procedures.enabled";
private static final boolean PROCEDURES_ENABLED_VALUE = true;
private static final String SINK_ENABLED = "apoc.kafka.sink.enabled";
private static final boolean SINK_ENABLED_VALUE = false;
private static final String CHECK_APOC_TIMEOUT = "apoc.kafka.check.apoc.timeout";
private static final String CHECK_APOC_INTERVAL = "apoc.kafka.check.apoc.interval";
private static final String CLUSTER_ONLY = "apoc.kafka.cluster.only";
private static final String CHECK_WRITEABLE_INSTANCE_INTERVAL = "apoc.kafka.check.writeable.instance.interval";
private static final String SYSTEM_DB_WAIT_TIMEOUT = "apoc.kafka.systemdb.wait.timeout";
private static final long SYSTEM_DB_WAIT_TIMEOUT_VALUE = 10000L;
private static final String POLL_INTERVAL = "apoc.kafka.sink.poll.interval";
private static final String INSTANCE_WAIT_TIMEOUT = "apoc.kafka.wait.timeout";
private static final long INSTANCE_WAIT_TIMEOUT_VALUE = 120000L;
private static final int DEFAULT_TRIGGER_PERIOD = 10000;
private static final String DEFAULT_PATH = ".";

public ExtendedApocConfig(LogService log, GlobalProcedures globalProceduresRegistry, String defaultConfigPath) {
this.log = log.getInternalLog(ApocConfig.class);
this.defaultConfigPath = defaultConfigPath;
Expand Down
9 changes: 0 additions & 9 deletions extended/src/main/kotlin/apoc/kafka/KafkaHandler.kt
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package apoc.kafka
import apoc.ApocConfig
import apoc.ExtendedApocConfig.APOC_KAFKA_ENABLED
import apoc.kafka.config.StreamsConfig
import apoc.kafka.consumer.StreamsSinkConfigurationListener
import apoc.kafka.producer.StreamsRouterConfigurationListener
import org.neo4j.kernel.internal.GraphDatabaseAPI
import org.neo4j.kernel.lifecycle.LifecycleAdapter
Expand All @@ -28,21 +27,13 @@ class KafkaHandler(): LifecycleAdapter() {
} catch (e: Exception) {
log.error("Exception in StreamsRouterConfigurationListener {}", e.message)
}

try {
StreamsSinkConfigurationListener(db, log)
.start(StreamsConfig.getConfiguration())
} catch (e: Exception) {
log.error("Exception in StreamsSinkConfigurationListener {}", e.message)
}
}
}

override fun stop() {
if(ApocConfig.apocConfig().getBoolean(APOC_KAFKA_ENABLED)) {

StreamsRouterConfigurationListener(db, log).shutdown()
StreamsSinkConfigurationListener(db, log).shutdown()
}
}
}
12 changes: 4 additions & 8 deletions extended/src/main/kotlin/apoc/kafka/PublishProcedures.kt
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
package apoc.kafka

//import apoc.kafka.producer.StreamsEventRouter
//import apoc.kafka.producer.StreamsTransactionEventHandler
//import apoc.kafka.producer.StreamsTransactionEventHandler

import apoc.kafka.producer.events.StreamsEventBuilder
import apoc.kafka.producer.kafka.KafkaEventRouter
import apoc.kafka.utils.KafkaUtil
Expand All @@ -20,9 +18,8 @@ import java.util.stream.Stream

data class StreamPublishResult(@JvmField val value: Map<String, Any>)

data class StreamsEventSinkStoreEntry(val eventRouter: KafkaEventRouter,
// val txHandler: StreamsTransactionEventHandler
)
data class StreamsEventSinkStoreEntry(val eventRouter: KafkaEventRouter)

class PublishProcedures {

@JvmField @Context
Expand Down Expand Up @@ -101,9 +98,8 @@ class PublishProcedures {
fun register(
db: GraphDatabaseAPI,
evtRouter: KafkaEventRouter,
// txHandler: StreamsTransactionEventHandler
) {
streamsEventRouterStore[KafkaUtil.getName(db)] = StreamsEventSinkStoreEntry(evtRouter/*, txHandler*/)
streamsEventRouterStore[KafkaUtil.getName(db)] = StreamsEventSinkStoreEntry(evtRouter)
}

fun unregister(db: GraphDatabaseAPI) {
Expand Down
15 changes: 0 additions & 15 deletions extended/src/main/kotlin/apoc/kafka/config/StreamsConfig.kt
Original file line number Diff line number Diff line change
Expand Up @@ -21,15 +21,6 @@ class StreamsConfig {
const val SOURCE_ENABLED_VALUE = true
const val PROCEDURES_ENABLED = "apoc.kafka.procedures.enabled"
const val PROCEDURES_ENABLED_VALUE = true
const val SINK_ENABLED = "apoc.kafka.sink.enabled"
const val SINK_ENABLED_VALUE = false
const val CHECK_APOC_TIMEOUT = "apoc.kafka.check.apoc.timeout"
const val CHECK_APOC_INTERVAL = "apoc.kafka.check.apoc.interval"
const val CLUSTER_ONLY = "apoc.kafka.cluster.only"
const val CHECK_WRITEABLE_INSTANCE_INTERVAL = "apoc.kafka.check.writeable.instance.interval"
const val POLL_INTERVAL = "apoc.kafka.sink.poll.interval"
const val INSTANCE_WAIT_TIMEOUT = "apoc.kafka.wait.timeout"
const val INSTANCE_WAIT_TIMEOUT_VALUE = 120000L

fun isSourceGloballyEnabled(config: Map<String, Any?>) = config.getOrDefault(SOURCE_ENABLED, SOURCE_ENABLED_VALUE).toString().toBoolean()

Expand All @@ -39,12 +30,6 @@ class StreamsConfig {

fun hasProceduresEnabled(config: Map<String, Any?>, dbName: String) = config.getOrDefault("${PROCEDURES_ENABLED}.$dbName", hasProceduresGloballyEnabled(config)).toString().toBoolean()

fun isSinkGloballyEnabled(config: Map<String, Any?>) = config.getOrDefault(SINK_ENABLED, SINK_ENABLED_VALUE).toString().toBoolean()

fun isSinkEnabled(config: Map<String, Any?>, dbName: String) = config.getOrDefault("${SINK_ENABLED}.to.$dbName", isSinkGloballyEnabled(config)).toString().toBoolean()

fun getInstanceWaitTimeout(config: Map<String, Any?>) = config.getOrDefault(INSTANCE_WAIT_TIMEOUT, INSTANCE_WAIT_TIMEOUT_VALUE).toString().toLong()

fun convert(props: Map<String,String>, config: Map<String, String>): Map<String, String> {
val mutProps = props.toMutableMap()
val mappingKeys = mapOf(
Expand Down

This file was deleted.

22 changes: 0 additions & 22 deletions extended/src/main/kotlin/apoc/kafka/consumer/StreamsEventSink.kt

This file was deleted.

This file was deleted.

This file was deleted.

Loading

0 comments on commit cc550a0

Please sign in to comment.