You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
fillConsumerGroup.on('message', async function(message) {
try {
await ctx.service.consumerTest.start_fill(JSON.parse(message.value));
fillConsumerGroup.commit(true, (err, data) => {
logger.error('commit:', err, data);
});
} catch (error) {
logger.error('ERROR: [GetMessage] ', message, error);
}
});
const fillOptions = {
kafkaHost: config.kafkaHost, // connect directly to kafka broker (instantiates a KafkaClient)
batch: undefined, // put client batch settings if you need them
ssl: false, // optional (defaults to false) or tls options hash
id: `consumer-fill-${uuid}`,
groupId: 'ExampleFillGroup',
sessionTimeout: 15000,
autoCommit: false,
// An array of partition assignment protocols ordered by preference.
// 'roundrobin' or 'range' string for built ins (see below to pass in custom assignment protocol)
protocol: [ 'roundrobin' ],
encoding: 'utf8', // default is utf8, use 'buffer' for binary data
// Offsets to use for new groups other options could be 'earliest' or 'none' (none will emit an error if no offsets were saved)
// equivalent to Java client's auto.offset.reset
fromOffset: 'latest', // default
commitOffsetsOnFirstJoin: true, // on the very first time this consumer group subscribes to a topic, record the offset returned in fromOffset (latest/earliest)
// how to recover from OutOfRangeOffset error (where save offset is past server retention) accepts same value as fromOffset
outOfRangeOffset: 'earliest', // default
fetchMaxBytes: 16777216,
// Callback to allow consumers with autoCommit false a chance to commit before a rebalance finishes
// isAlreadyMember will be false on the first connection, and true on rebalances triggered after that
onRebalance: (isAlreadyMember, callback) => {
callback();
}, // or null
};
Why Noting to be commited?
The text was updated successfully, but these errors were encountered:
Why Noting to be commited?
The text was updated successfully, but these errors were encountered: