Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

MysSqlBinlogEventProcessor swallows errors #199

Open
barryoneill opened this issue Mar 11, 2021 · 2 comments
Open

MysSqlBinlogEventProcessor swallows errors #199

barryoneill opened this issue Mar 11, 2021 · 2 comments

Comments

@barryoneill
Copy link
Member

barryoneill commented Mar 11, 2021

We saw an incident earlier when, when an eventmessage contained a payload > 1MB which caused our binlog->kinesis processor to fall over silently. Only after increasing the logs to DEBUG level did we see:

2021-03-10 23:16:42,268 |-DEBUG application [ioapp-compute-0] received binlog event Event{header=EventHeaderV4{timestamp=1615411407000, eventType=XID, serverId=2052150414, headerLength=19, dataLength=12, nextPosition=102655766, flags=0}, data=XidEventData{xid=7195623}}
Mar 10, 2021 11:16:42 PM com.github.shyiko.mysql.binlog.BinaryLogClient notifyEventListeners
WARNING: io.laserdisc.mysql.binlog.stream.MysSqlBinlogEventProcessor$$Lambda$943/0x00000008408f8440@6e8b71c1 choked on Event{header=EventHeaderV4{timestamp=1615411543000, eventType=EXT_UPDATE_ROWS, serverId=2052150414, headerLength=19, dataLength=72, nextPosition=109147432, flags=0}, data=UpdateRowsEventData{tableId=91, includedColumnsBeforeUpdate={0, 1, 2, 3, 4, 5, 6}, includedColumns={0, 1, 2, 3, 4, 5, 6}, rows=[
    {before=[162959917, 494038, 1, 1, 0, null, 0], after=[162959917, 494038, 1, 1, 1, 1615411543000000, 1]}
]}}
java.lang.InterruptedException
	at java.base/java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1040)
	at java.base/java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1345)
	at cats.effect.internals.IOPlatform$.$anonfun$unsafeResync$2(IOPlatform.scala:52)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.scala:18)
	at cats.effect.internals.TrampolineEC$JVMTrampoline$$anon$3.blockOn(TrampolineEC.scala:84)
	at scala.concurrent.package$.blocking(package.scala:124)
	at cats.effect.internals.IOPlatform$.unsafeResync(IOPlatform.scala:52)
	at cats.effect.IO.unsafeRunTimed(IO.scala:342)
	at cats.effect.IO.unsafeRunSync(IO.scala:256)
	at io.laserdisc.mysql.binlog.stream.MysSqlBinlogEventProcessor.$anonfun$run$1(MysqlBinlogStream.scala:16)
	at com.github.shyiko.mysql.binlog.BinaryLogClient.notifyEventListeners(BinaryLogClient.java:1158)
	at com.github.shyiko.mysql.binlog.BinaryLogClient.listenForEventPackets(BinaryLogClient.java:1005)
	at com.github.shyiko.mysql.binlog.BinaryLogClient.connectWithTimeout(BinaryLogClient.java:517)
	at com.github.shyiko.mysql.binlog.BinaryLogClient.connect(BinaryLogClient.java:490)
	at io.laserdisc.mysql.binlog.stream.MysSqlBinlogEventProcessor.run(MysqlBinlogStream.scala:47)
	at io.laserdisc.mysql.binlog.stream.MysqlBinlogStream$.$anonfun$rawEvents$5(MysqlBinlogStream.scala:59)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.scala:18)
	at cats.effect.internals.IORunLoop$.cats$effect$internals$IORunLoop$$loop(IORunLoop.scala:104)
	at cats.effect.internals.IORunLoop$.restartCancelable(IORunLoop.scala:51)
	at cats.effect.internals.IOBracket$BracketStart.run(IOBracket.scala:100)
	at cats.effect.internals.Trampoline.cats$effect$internals$Trampoline$$immediateLoop(Trampoline.scala:67)
	at cats.effect.internals.Trampoline.startLoop(Trampoline.scala:35)
	at cats.effect.internals.TrampolineEC$JVMTrampoline.super$startLoop(TrampolineEC.scala:90)
	at cats.effect.internals.TrampolineEC$JVMTrampoline.$anonfun$startLoop$1(TrampolineEC.scala:90)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.scala:18)
	at scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:94)
	at cats.effect.internals.TrampolineEC$JVMTrampoline.startLoop(TrampolineEC.scala:90)
	at cats.effect.internals.Trampoline.execute(Trampoline.scala:43)
	at cats.effect.internals.TrampolineEC.execute(TrampolineEC.scala:42)
	at cats.effect.internals.IOBracket$BracketStart.apply(IOBracket.scala:80)
	at cats.effect.internals.IOBracket$BracketStart.apply(IOBracket.scala:58)
	at cats.effect.internals.IORunLoop$.cats$effect$internals$IORunLoop$$loop(IORunLoop.scala:183)
	at cats.effect.internals.IORunLoop$RestartCallback.signal(IORunLoop.scala:463)
	at cats.effect.internals.IORunLoop$RestartCallback.apply(IORunLoop.scala:484)
	at cats.effect.internals.IORunLoop$RestartCallback.apply(IORunLoop.scala:422)
	at cats.effect.internals.IOShift$Tick.run(IOShift.scala:36)
	at cats.effect.internals.PoolUtils$$anon$2$$anon$3.run(PoolUtils.scala:52)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	at java.base/java.lang.Thread.run(Thread.java:834)

It appears to be a result of this function;

binlogClient.registerEventListener { event =>
      ConcurrentEffect[F].toIO(queue.enqueue1(Some(event))).unsafeRunSync()
    }

This results in the event listener failing to continue to process any events, and the consuming application sits idle.

Thankfully at least, the log message contained nextPosition which provided us the offset by which we could manually move the consumer on to the next message.

@semenodm
Copy link
Member

Magically I discovered today that Concurrent effect.toIO swallows errors. I need some time to wrap my head around it

@barryoneill
Copy link
Member Author

Magic is one way to describe it :)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants