Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,8 @@ object BatchSession extends Logging {
val conf = SparkApp.prepareSparkConf(
appTag,
livyConf,
prepareConf(
request.conf, request.jars, request.files, request.archives, request.pyFiles, livyConf))
prepareConf(owner, request.conf,
request.jars, request.files, request.archives, request.pyFiles, livyConf))
require(request.file != null, "File is required.")

val builder = new SparkProcessBuilder(livyConf)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,8 +79,8 @@ object InteractiveSession extends Logging {
val impersonatedUser = accessManager.checkImpersonation(proxyUser, owner)

val client = mockClient.orElse {
val conf = SparkApp.prepareSparkConf(appTag, livyConf, prepareConf(
request.conf, request.jars, request.files, request.archives, request.pyFiles, livyConf))
val conf = SparkApp.prepareSparkConf(appTag, livyConf, prepareConf(owner, request.conf,
request.jars, request.files, request.archives, request.pyFiles, livyConf))

val builderProperties = prepareBuilderProp(conf, request.kind, livyConf)

Expand Down
5 changes: 4 additions & 1 deletion server/src/main/scala/org/apache/livy/sessions/Session.scala
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ object Session {
* - Verify that file URIs don't reference non-whitelisted local resources
*/
def prepareConf(
owner: String,
conf: Map[String, String],
jars: Seq[String],
files: Seq[String],
Expand Down Expand Up @@ -90,7 +91,9 @@ object Session {
val masterConfList = Map(LivyConf.SPARK_MASTER -> livyConf.sparkMaster()) ++
livyConf.sparkDeployMode().map(LivyConf.SPARK_DEPLOY_MODE -> _).toMap

conf ++ masterConfList ++ merged
val ownerConf = Map("spark.livy.owner" -> (if (owner == null) "" else owner))

conf ++ masterConfList ++ merged ++ ownerConf
}

/**
Expand Down
26 changes: 18 additions & 8 deletions server/src/test/scala/org/apache/livy/sessions/SessionSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -59,30 +59,40 @@ class SessionSpec extends FunSuite with LivyBaseUnitTestSuite {
conf.set(LivyConf.LOCAL_FS_WHITELIST, "/allowed")

// Test baseline.
assert(Session.prepareConf(Map(), Nil, Nil, Nil, Nil, conf) === Map("spark.master" -> "local"))
assert(Session.prepareConf("", Map(), Nil, Nil, Nil, Nil, conf)
=== Map("spark.master" -> "local", "spark.livy.owner" -> ""))

// Test validations.
intercept[IllegalArgumentException] {
Session.prepareConf(Map("spark.do_not_set" -> "1"), Nil, Nil, Nil, Nil, conf)
Session.prepareConf("", Map("spark.do_not_set" -> "1"), Nil, Nil, Nil, Nil, conf)
}
conf.sparkFileLists.foreach { key =>
intercept[IllegalArgumentException] {
Session.prepareConf(Map(key -> "file:/not_allowed"), Nil, Nil, Nil, Nil, conf)
Session.prepareConf("", Map(key -> "file:/not_allowed"), Nil, Nil, Nil, Nil, conf)
}
}
intercept[IllegalArgumentException] {
Session.prepareConf(Map(), Seq("file:/not_allowed"), Nil, Nil, Nil, conf)
Session.prepareConf("", Map(), Seq("file:/not_allowed"), Nil, Nil, Nil, conf)
}
intercept[IllegalArgumentException] {
Session.prepareConf(Map(), Nil, Seq("file:/not_allowed"), Nil, Nil, conf)
Session.prepareConf("", Map(), Nil, Seq("file:/not_allowed"), Nil, Nil, conf)
}
intercept[IllegalArgumentException] {
Session.prepareConf(Map(), Nil, Nil, Seq("file:/not_allowed"), Nil, conf)
Session.prepareConf("", Map(), Nil, Nil, Seq("file:/not_allowed"), Nil, conf)
}
intercept[IllegalArgumentException] {
Session.prepareConf(Map(), Nil, Nil, Nil, Seq("file:/not_allowed"), conf)
Session.prepareConf("", Map(), Nil, Nil, Nil, Seq("file:/not_allowed"), conf)
}

// Test owner
assert(Session.prepareConf("MyOwner", Map(), Nil, Nil, Nil, Nil, conf)
.get("spark.livy.owner") === Some("MyOwner"))
assert(Session.prepareConf("", Map(), Nil, Nil, Nil, Nil, conf)
.get("spark.livy.owner") === Some(""))
assert(Session.prepareConf(null, Map(), Nil, Nil, Nil, Nil, conf)
.get("spark.livy.owner") === Some(""))


// Test that file lists are merged and resolved.
val base = "/file1.txt"
val other = Seq("/file2.txt")
Expand All @@ -91,7 +101,7 @@ class SessionSpec extends FunSuite with LivyBaseUnitTestSuite {
val userLists = Seq(LivyConf.SPARK_JARS, LivyConf.SPARK_FILES, LivyConf.SPARK_ARCHIVES,
LivyConf.SPARK_PY_FILES)
val baseConf = userLists.map { key => (key -> base) }.toMap
val result = Session.prepareConf(baseConf, other, other, other, other, conf)
val result = Session.prepareConf("", baseConf, other, other, other, other, conf)
userLists.foreach { key => assert(result.get(key) === expected) }
}

Expand Down