Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Error using the same application name in two different streams #35

Closed
abhijeetDunzo opened this issue Jul 29, 2019 · 2 comments
Closed

Comments

@abhijeetDunzo
Copy link

I created consumers reading from two different streams and I am using the same application-name.

On starting the application I am getting this error intermittently:

InvalidArgumentException: StartingSequenceNumber 49598037874099682602785444260063985364880176723926188034 used in GetShardIterator on shard shardId-000000000000 in stream dummy under account 716421886079 is invalid because it did not come from this stream

StartConsuming gets called from main, which calls consumeStream with same application name but different stream name.

func StartConsuming(conf *config.Config, waitForKinesis *sync.WaitGroup) {

	waitForKinesis.Add(1)
	streamNameA := "logistics-executor"
	go consumeStreamA(streamNameA, conf.Application.Name, waitForKinesis)
	//
	//waitForKinesis.Add(1)
	//streamNameB := "dummy2"
	//go consumeStreamB(streamNameB, conf.Application.Name, waitForKinesis)
}
func consumeStreamB(streamName, applicationName string, waitForKinesis *sync.WaitGroup) {
	recordsB := make(chan []byte)
	k, wg := kinesis.Init(recordsB, streamName, applicationName, false)
	readFromStreamB(recordsB)
	logger.Log.Infof("Stopping K from stream B")
	k.Stop()

	logger.Log.Infof("waiting on wg from stream B")
	wg.Wait()
	logger.Log.Infof("done on wg from stream B")
	waitForKinesis.Done()

}

func readFromStreamB(records chan []byte) {
	logger.Log.Infof("Reading inside ReadFromStream B")
	sigc := make(chan os.Signal, 1)
	signal.Notify(sigc, syscall.SIGINT)

	for {
		//time.Sleep(2 * time.Second)
		select {
		case <-sigc:
			logger.Log.Infof("sigc readFromStreamB")
			return

		case record := <-records:
			logger.Log.Infof("ReadFromStream B %v\n", string(record))
			//logger.Log.Infof("perform business logic here B !! ")
		}
	}
}
@garethlewin
Copy link
Contributor

Kinsumer isn't designed to handle two streams with the same application name. There is an issue #34 which when implemented might allow this but for now, the dynamo tables names are derived from just the application name, so two different streams would collide.

I recommend using different application names for now.

@kunalkapadia
Copy link

Thanks for the quick revert, @garethlewin!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants