Rabbitmq source connector
Used to read data from Rabbitmq.
:::tip The source must be non-parallel (parallelism set to 1) in order to achieve exactly-once. This limitation is mainly due to RabbitMQ’s approach to dispatching messages from a single queue to multiple consumers.
name | type | required | default value |
---|---|---|---|
host | string | yes | - |
port | int | yes | - |
virtual_host | string | yes | - |
username | string | yes | - |
password | string | yes | - |
queue_name | string | yes | - |
schema | config | yes | - |
url | string | no | - |
routing_key | string | no | - |
exchange | string | no | - |
network_recovery_interval | int | no | - |
topology_recovery_enabled | boolean | no | - |
automatic_recovery_enabled | boolean | no | - |
connection_timeout | int | no | - |
requested_channel_max | int | no | - |
requested_frame_max | int | no | - |
requested_heartbeat | int | no | - |
prefetch_count | int | no | - |
delivery_timeout | long | no | - |
common-options | no | - |
the default host to use for connections
the default port to use for connections
virtual host – the virtual host to use when connecting to the broker
the AMQP user name to use when connecting to the broker
the password to use when connecting to the broker
convenience method for setting the fields in an AMQP URI: host, port, username, password and virtual host
the queue to publish the message to
the routing key to publish the message to
the exchange to publish the message to
the schema fields of upstream data.
how long will automatic recovery wait before attempting to reconnect, in ms
if true, enables topology recovery
if true, enables connection recovery
connection tcp establishment timeout in milliseconds; zero for infinite
initially requested maximum channel number; zero for unlimited **Note: Note the value must be between 0 and 65535 (unsigned short in AMQP 0-9-1).
the requested maximum frame size
Set the requested heartbeat timeout **Note: Note the value must be between 0 and 65535 (unsigned short in AMQP 0-9-1).
prefetchCount the max number of messages to receive without acknowledgement
deliveryTimeout maximum wait time, in milliseconds, for the next message delivery
Source plugin common parameters, please refer to Source Common Options for details
simple:
source {
RabbitMQ {
host = "rabbitmq-e2e"
port = 5672
virtual_host = "/"
username = "guest"
password = "guest"
queue_name = "test"
schema = {
fields {
id = bigint
c_map = "map<string, smallint>"
c_array = "array<tinyint>"
}
}
}
}
- Add Rabbitmq source Connector