From 822debcbb890bcda5fa56d3fd499daaf99777c76 Mon Sep 17 00:00:00 2001 From: Sanjay-Krishnan93 Date: Thu, 10 Jan 2019 16:43:07 +0530 Subject: [PATCH] Issue #SC-231 chore: Fix null value in first access --- .../analytics/model/DeviceSummaryModel.scala | 11 +++++++--- .../model/TestDeviceSummaryModel.scala | 21 +++++++++++++++++++ 2 files changed, 29 insertions(+), 3 deletions(-) diff --git a/platform-modules/batch-models/src/main/scala/org/ekstep/analytics/model/DeviceSummaryModel.scala b/platform-modules/batch-models/src/main/scala/org/ekstep/analytics/model/DeviceSummaryModel.scala index 6466957dd..36b8775e7 100644 --- a/platform-modules/batch-models/src/main/scala/org/ekstep/analytics/model/DeviceSummaryModel.scala +++ b/platform-modules/batch-models/src/main/scala/org/ekstep/analytics/model/DeviceSummaryModel.scala @@ -68,13 +68,18 @@ object DeviceSummaryModel extends IBatchModelTemplate[String, DeviceInput, Devic (index, DeviceSummary(index.device_id, index.channel, CommonUtil.roundDouble(total_ts, 2), total_launches, contents_played, unique_contents_played, content_downloads, DialStats(dial_count, dial_success, dial_failure), DtRange(startTimestamp, endTimestamp), syncts, startTimestamp)) } val firstAccessFromCassandra = summary.map{ x => x._1} - .joinWithCassandraTable[Long](Constants.DEVICE_KEY_SPACE_NAME, Constants.DEVICE_PROFILE_TABLE).select("first_access") + .joinWithCassandraTable[Option[Long]](Constants.DEVICE_KEY_SPACE_NAME, Constants.DEVICE_PROFILE_TABLE).select("first_access") .on(SomeColumns("device_id", "channel")) summary.leftOuterJoin(firstAccessFromCassandra) - .map{ x => x._2._1.copy(firstAccess = x._2._2.getOrElse(x._2._1.firstAccess))} + .map{ x => + val firstAccessValue = x._2._2.getOrElse(Option(0L)) + if(firstAccessValue.getOrElse(0L) != 0) + x._2._1.copy(firstAccess = firstAccessValue.getOrElse(x._2._1.firstAccess)) + else x._2._1 + } } - override def postProcess(data: RDD[DeviceSummary], config: Map[String, AnyRef])(implicit sc: SparkContext): RDD[MeasuredEvent] = { + override def postProcess(data: RDD[DeviceSummary], config: Map[String, AnyRef])(implicit sc: SparkContext): RDD[MeasuredEvent] = { data.map { x => val mid = CommonUtil.getMessageId("ME_DEVICE_SUMMARY", x.device_id, "DAY", x.dt_range, "NA", None, Option(x.channel)) val measures = Map( diff --git a/platform-modules/batch-models/src/test/scala/org/ekstep/analytics/model/TestDeviceSummaryModel.scala b/platform-modules/batch-models/src/test/scala/org/ekstep/analytics/model/TestDeviceSummaryModel.scala index fc24fae49..9478aeb34 100644 --- a/platform-modules/batch-models/src/test/scala/org/ekstep/analytics/model/TestDeviceSummaryModel.scala +++ b/platform-modules/batch-models/src/test/scala/org/ekstep/analytics/model/TestDeviceSummaryModel.scala @@ -93,4 +93,25 @@ class TestDeviceSummaryModel extends SparkSpec(null) { } } + it should "update the value of first_access with dt_range.from if null is returned from Cassandra" in { + CassandraConnector(sc.getConf).withSessionDo { session => + session.execute("TRUNCATE " + Constants.DEVICE_KEY_SPACE_NAME + "." + Constants.DEVICE_PROFILE_TABLE) + session.execute("INSERT INTO " + Constants.DEVICE_KEY_SPACE_NAME + "." + Constants.DEVICE_PROFILE_TABLE +"(device_id, channel)" + + "VALUES('49edda82418a1e916e9906a2fd7942cb','b00bc992ef25f1a9a8d63291e20efc8d')") + session.execute("INSERT INTO " + Constants.DEVICE_KEY_SPACE_NAME + "." + Constants.DEVICE_PROFILE_TABLE +"(device_id, channel, first_access)" + + "VALUES('88edda82418a1e916e9906a2fd7942cb','b00bc992ef25f1a9a8d63291e20efc8d', 1536909035000)") + } + val rdd = loadFile[String]("src/test/resources/device-summary/test_data1.log") + val measuredEvent = DeviceSummaryModel.execute(rdd, None) + measuredEvent.collect().foreach{ x => + val summary = JSONUtils.deserialize[DeviceSummary](JSONUtils.serialize(x.edata.eks)) + if(x.dimensions.did.get.equals("49edda82418a1e916e9906a2fd7942cb")) + summary.firstAccess should be(1537550355883L) + else if(x.dimensions.did.get.equals("88edda82418a1e916e9906a2fd7942cb")) + summary.firstAccess should be(1536909035000L) + else + summary.firstAccess should be(1537550355883L) + } + } + } \ No newline at end of file