-
Notifications
You must be signed in to change notification settings - Fork 314
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
LIVY-239: Moving the logic to generate session IDs from Session Manager to SessionStore #220
base: master
Are you sure you want to change the base?
Conversation
Current coverage is 71.78% (diff: 54.54%)@@ master #220 diff @@
==========================================
Files 91 89 -2
Lines 4697 4516 -181
Methods 0 0
Messages 0 0
Branches 811 764 -47
==========================================
- Hits 3360 3242 -118
+ Misses 861 836 -25
+ Partials 476 438 -38
|
.map(_.nextSessionId).getOrElse(0) | ||
store.set(sessionManagerPath(sessionType), SessionManagerState(nextSessionId + 1)) | ||
nextSessionId |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IIUC, the nextSessionId
you get is the previous one persisted and you didn't increase the number when returned.
Also each time when you get the session id, you will fetch it from persisted storage, I think it is not necessary and will bring in extra overhead.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@jerryshao The scala doc comments are a bit misleading. The value stored in session store is the next unused session ID.
Performance wise, this change calls the session store as many times as the previous code. The only difference is that previously SessionManager
was calling SessionStore.set
, but now the SessionStore
calls it.
Please take a look at SessionManager
(lines 81 to 84) in 5e8474e (
def nextId(): Int = synchronized { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry about the poor doc. My bad. Would you mind fixing it?
I think fixing the doc and rename val nextSessionId = store.get
to lastUsedSessionId?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have to correct myself. Prior to this commit, there was one read from SessionStore
on recovery or on server start. There was also one write per nextId()
call. Now we have one read and one write to SessionStore
per nextId()
call.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this's necessary for HA consistency.
Can we come up with some smart way to not pay this price for recovery mode, but just for HA mode?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is my initial thoughts on it.
We can refactor the logic to generate sessionIDs into a separate trait/class and have two implementation: one for single-node, and one for multi-node. The single node mode can simply use a local atomic value, but the multi-node mode should use a more sophisticated implementation to guarantee uniqueness and consistency across all nodes.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we are thinking similarly but have 1 difference. You are adding the session id generation to SessionStore while I'm thinking about adding it to StateStore.
I would prefer to keep HA logic out of SessionStore and move these logic into ZookeeperStateStore. Having a single code path in SessionStore makes isolating HA related changes easier.
Is this done for implementing HA? |
@tc0312 Yes. This change is needed to implement multimode HA. |
* If no SessionManagerState is stored, it returns 0. | ||
* It saves the new session ID to the session store. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: Extra space here.
.map(_.nextSessionId).getOrElse(0) | ||
store.set(sessionManagerPath(sessionType), SessionManagerState(nextSessionId + 1)) | ||
nextSessionId |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry about the poor doc. My bad. Would you mind fixing it?
I think fixing the doc and rename val nextSessionId = store.get
to lastUsedSessionId?
def getNextSessionId(sessionType: String): Int = { | ||
store.get[SessionManagerState](sessionManagerPath(sessionType)) | ||
def getNextSessionId(sessionType: String): Int = synchronized { | ||
val nextSessionId = store.get[SessionManagerState](sessionManagerPath(sessionType)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewing this with HA in mind. I think it's possible that another livy-server instance updates SessionManagerState between the store.get()
and store.set()
call.
I think we should add locking/optimistic locking + retry here to avoid servers stepping on each other. Or better, we change the interface of StateStore to make use of Sequence Nodes in ZooKeeper.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's right! But for multi-node HA, I am overriding this method in ZooKeeperStore and I am using a distributed atomic long recipe (https://curator.apache.org/curator-recipes/distributed-atomic-long.html) to guarantee that generated session IDs are unique and are generated and stored atomically. But for single node HA synchronizing on the session store instance would suffice.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
With your ZooKeeperStore change, who's doing nextSessionId + 1?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
SessionStore
calls into the distributed Atomic recipe from Apache curator.
This is the how I am doing it:
val distributedSessionId = new DistributedAtomicLong(curatorClient, zkPath, retryPolicy)
def nextBatchSessionId: Int = {
recursiveTry(distributedSessionId, MAX_RETRY) match {
case Some(sessionId) =>
sessionId.toInt
case None =>
val msg: String = "Failed to get the next session id from Zookeeper"
logger.warn(msg)
throw new IOException(msg)
}
}
@tailrec
private def recursiveTry(distributedLong: DistributedLong, retryCount: Int): Option[Long] = {
val updatedValue = distributedLong.increment
updatedValue.succeeded match {
case _ if retryCount <= 0 =>
None
case true if retryCount > 0 =>
Option(updatedValue.preValue())
case _ =>
recursiveTry(distributedLong, retryCount - 1)
}
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are these codes in SessionStore or a subclass of SessionStore?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you mean ZooKeeperStateStore
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes. it is in ZooKeeperStateStore
. Sorry!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Did you add a new method in StateStore to generate unique id?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes I did. But as you said in your comment, moving the logic out of SessionStore
makes a lot more sense.
I would prefer to keep HA logic out of SessionStore and move these logic into ZookeeperStateStore. Having a single code path in SessionStore makes isolating HA related changes easier.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am going to update the pull request based on the feedback.
sessionStore.saveNextSessionId(sessionType, idCounter.get()) | ||
id | ||
} | ||
// sessionStore.getNextSessionId is guaranteed to return atomic and returns unique IDs. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: Extra space between return & atomic.
Can we have a whole plan of Livy HA before doing this small refactoring work? My concern is that currently we don't have a decided plan of Livy HA, if finally the plan is changed or updated, current changes may be obsolete. |
@jerryshao There is a JIRA ticket open for it: https://issues.cloudera.org/browse/LIVY-231 |
This commit also - updates the test cases around SessionStore - Removes the SessionManagerState class (Not neede anymore). We are storing a Long value that can be incremented. Task-url: https://issues.cloudera.org/browse/LIVY-239
Summary of changes:
AtomicCounter
that is used inSessionManager
to generate session IDs.SessionStore
instead of SessionManagernextSessionId
is called.Task-url: https://issues.cloudera.org/browse/LIVY-239