From 3721fa618cbc1ac79d2093ab59b8f4f5098fecbc Mon Sep 17 00:00:00 2001 From: Sowmya Date: Thu, 20 Dec 2018 18:09:45 +0530 Subject: [PATCH] Issue #0000 fix: Cassandra connector fix for ETBCoverageSummarizer. --- .../org/ekstep/analytics/framework/util/CommonUtil.scala | 7 ++++++- .../ekstep/analytics/framework/driver/BatchJobDriver.scala | 3 ++- 2 files changed, 8 insertions(+), 2 deletions(-) diff --git a/platform-framework/analytics-core/src/main/scala/org/ekstep/analytics/framework/util/CommonUtil.scala b/platform-framework/analytics-core/src/main/scala/org/ekstep/analytics/framework/util/CommonUtil.scala index 3e7fbe101..ca957d35a 100644 --- a/platform-framework/analytics-core/src/main/scala/org/ekstep/analytics/framework/util/CommonUtil.scala +++ b/platform-framework/analytics-core/src/main/scala/org/ekstep/analytics/framework/util/CommonUtil.scala @@ -43,7 +43,7 @@ object CommonUtil { config.parallelization.getOrElse(defParallelization); } - def getSparkContext(parallelization: Int, appName: String): SparkContext = { + def getSparkContext(parallelization: Int, appName: String, sparkCassandraConnectionHost: Option[AnyRef] = None): SparkContext = { JobLogger.log("Initializing Spark Context") val conf = new SparkConf().setAppName(appName); val master = conf.getOption("spark.master"); @@ -60,6 +60,11 @@ object CommonUtil { if (!conf.contains("reactiveinflux.url")) { conf.set("reactiveinflux.url", AppConf.getConfig("reactiveinflux.url")); } + if (sparkCassandraConnectionHost.nonEmpty) { + conf.set("spark.cassandra.connection.host", sparkCassandraConnectionHost.get.asInstanceOf[String]) + println("spark.cassandra.connection.host", conf.get("spark.cassandra.connection.host")) + } + // $COVERAGE-ON$ val sc = new SparkContext(conf); setS3Conf(sc); diff --git a/platform-framework/analytics-job-driver/src/main/scala/org/ekstep/analytics/framework/driver/BatchJobDriver.scala b/platform-framework/analytics-job-driver/src/main/scala/org/ekstep/analytics/framework/driver/BatchJobDriver.scala index a2b9e53b6..1684ddd6f 100644 --- a/platform-framework/analytics-job-driver/src/main/scala/org/ekstep/analytics/framework/driver/BatchJobDriver.scala +++ b/platform-framework/analytics-job-driver/src/main/scala/org/ekstep/analytics/framework/driver/BatchJobDriver.scala @@ -19,7 +19,8 @@ object BatchJobDriver { def process[T, R](config: JobConfig, models: List[IBatchModel[T, R]])(implicit mf: Manifest[T], mfr: Manifest[R], sc: SparkContext) { JobContext.parallelization = CommonUtil.getParallelization(config); if (null == sc) { - implicit val sc = CommonUtil.getSparkContext(JobContext.parallelization, config.appName.getOrElse(config.model)); + val sparkCassandraConnectionHost = config.modelParams.getOrElse(Map()).get("sparkCassandraConnectionHost") + implicit val sc = CommonUtil.getSparkContext(JobContext.parallelization, config.appName.getOrElse(config.model), sparkCassandraConnectionHost); try { _process(config, models); } finally {