-
Notifications
You must be signed in to change notification settings - Fork 315
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
LIVY-359. Cache livy logs as config driven #332
base: master
Are you sure you want to change the base?
LIVY-359. Cache livy logs as config driven #332
Conversation
.map(new LineBufferedProcess(_)) | ||
// When Livy is running with YARN, SparkYarnApp can provide better YARN integration. | ||
// (e.g. Reflect YARN application state to session state). | ||
.map(new LineBufferedProcess(_, livyConf.getInt(LivyConf.SPARK_LOGS_SIZE))) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@jerryshao Earlier we were returning None if the master is local (not yarn) so the driver logs didn't go to livy logs.
I think this change should solve the problem.
Let me know if we are ok with this change?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, that's what I want.
_lock.lock() | ||
try { | ||
_lines = _lines :+ line | ||
info(s"stdout: $line") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@jerryshao In client mode we were seeing spark driver logs in livy log after great delay as the log statement was after the loop. I agree log inside the lock is heavy.
Let me know if there is a better way to handle the scenario.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we can move this line out of try finally
block.
7d76aae
to
e94f507
Compare
Codecov Report
@@ Coverage Diff @@
## master #332 +/- ##
============================================
+ Coverage 70.42% 70.49% +0.06%
- Complexity 726 727 +1
============================================
Files 96 96
Lines 5123 5124 +1
Branches 774 774
============================================
+ Hits 3608 3612 +4
+ Misses 997 994 -3
Partials 518 518
Continue to review full report at Codecov.
|
@praveenkanamarlapudi thanks for bring this out, could you please file a JIRA and update the PR title first. |
Also please fix the UTs. |
.map(new LineBufferedProcess(_)) | ||
// When Livy is running with YARN, SparkYarnApp can provide better YARN integration. | ||
// (e.g. Reflect YARN application state to session state). | ||
.map(new LineBufferedProcess(_, livyConf.getInt(LivyConf.SPARK_LOGS_SIZE))) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, that's what I want.
@@ -36,17 +38,17 @@ class LineBufferedStream(inputStream: InputStream) extends Logging { | |||
private val thread = new Thread { | |||
override def run() = { | |||
val lines = Source.fromInputStream(inputStream).getLines() | |||
for (line <- lines) { | |||
for (line <- lines) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: remove unnecessary white space.
_lock.lock() | ||
try { | ||
_lines = _lines :+ line | ||
info(s"stdout: $line") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we can move this line out of try finally
block.
_lock.lock() | ||
try { | ||
_lines = _lines :+ line | ||
info(s"stdout: $line") | ||
if(logSize > 0) _lines add line |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need to add if
check, I assume if logSize is zero then we cannot add line
to this queue.
@@ -59,7 +61,7 @@ class LineBufferedStream(inputStream: InputStream) extends Logging { | |||
thread.setDaemon(true) | |||
thread.start() | |||
|
|||
def lines: IndexedSeq[String] = _lines | |||
def lines: IndexedSeq[String] = IndexedSeq.empty[String] ++ _lines.toArray(Array.empty[String]) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this will introduce threading issue when copying to new array.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shall we add lock here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think so.
@@ -100,7 +100,7 @@ object LivySparkUtils extends Logging { | |||
pb.environment().put("LIVY_TEST_CLASSPATH", sys.props("java.class.path")) | |||
} | |||
|
|||
val process = new LineBufferedProcess(pb.start()) | |||
val process = new LineBufferedProcess(pb.start(), 200) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Using configuration instead of hard-coded number.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it runs only when livy server is started. Do we need to make it config driven?
If the number is set too low by mistake, the livy server mayn't start.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok, I see.
@@ -92,7 +92,8 @@ object SparkApp { | |||
if (livyConf.isRunningOnYarn()) { | |||
new SparkYarnApp(uniqueAppTag, appId, process, listener, livyConf) | |||
} else { | |||
require(process.isDefined, "process must not be None when Livy master is not YARN.") | |||
// process is None in recovery mode | |||
// require(process.isDefined, "process must not be None when Livy master is not YARN.") | |||
new SparkProcApp(process.get, listener) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If process
can be None
, will this process.get
work?
override def log(): IndexedSeq[String] = process.inputLines | ||
|
||
override def log(): IndexedSeq[String] = | ||
("stdout: " +: process.inputLines) ++ ("\nstderr: " +: process.errorLines) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add empty line after it.
6c9dba2
to
75d8511
Compare
conf/livy.conf.template
Outdated
@@ -31,6 +31,9 @@ | |||
# If livy should impersonate the requesting users when creating a new session. | |||
# livy.impersonation.enabled = true | |||
|
|||
# Logs size livy can cache for each session/batch. 0 means don't cache the logs. | |||
# livy.spark.logs.size = 200 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would suggest to rename this to "livy.session.cache-log.size".
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure. Let me do that.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we name it like livy.cache-log.size
or something as it applies to both interactive and batch?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK, I'm fine with it.
} | ||
val driverProcess = client.flatMap { c => Option(c.getDriverProcess) } | ||
.map(new LineBufferedProcess(_, livyConf.getInt(LivyConf.SPARK_LOGS_SIZE))) | ||
driverProcess.map(_ => SparkApp.create(appTag, appId, driverProcess, livyConf, Some(this))) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This line is a bit strange and not so scala-ish. Why not keep the original style.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
driverProcess can be None for session recovery. We need to return None for that case and create SparkApp otherwise. Let me know if there is a better way to do that.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see. Thanks.
Also can we change the style to:
driverProcess.map { _ => SparkApp.create(appTag, appId, driverProcess, livyConf, Some(this)) }
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure.
try { | ||
_lines = _lines :+ line | ||
try { | ||
_lines add line |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Remove the prefix whitespace before try
. Also be better to use _lines.add(line)
for clearance.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My bad. let me update it.
75d8511
to
2d092ae
Compare
@jerryshao updated PR as per the comments. Please have a look at it. |
@@ -90,7 +97,7 @@ class LineBufferedStream(inputStream: InputStream) extends Logging { | |||
} | |||
|
|||
override def next(): String = { | |||
val line = _lines(index) | |||
val line = _lines.poll() | |||
index += 1 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This index
seems not used any more.
Also I was thinking this poll
may has threading issue with add
. I'm not sure if EvictingQueue
is a thread safe queue.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hm. It's not thread safe. Added lock to it.
2d092ae
to
f84bf47
Compare
LGTM, thanks for the fix. |
@jerryshao Are you waiting for any changes from me? |
No, I don't have further comments, because I don't have merging permission, so it is pending. CC @zjffdu can you please take a review, thanks! |
@alex-the-man @zjffdu can you please review the PR. |
Hi @praveenkanamarlapudi sorry for delay, can you please submit this PR against incubator-livy (https://github.com/apache/incubator-livy/pulls)? |
Hi @praveenkanamarlapudi would you please submit this PR again in incubator-livy? If you're not working on this, I can help to cherry-pick the changes. |
Hi @jerryshao I will make the PR tomorrow. |
Made the PR-14 |
Changes Made: