Skip to content

Commit

Permalink
Issue #SC-231 chore: Fix null value in first access
Browse files Browse the repository at this point in the history
  • Loading branch information
Sanjay-Krishnan93 committed Jan 10, 2019
1 parent bcf1344 commit 822debc
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -68,13 +68,18 @@ object DeviceSummaryModel extends IBatchModelTemplate[String, DeviceInput, Devic
(index, DeviceSummary(index.device_id, index.channel, CommonUtil.roundDouble(total_ts, 2), total_launches, contents_played, unique_contents_played, content_downloads, DialStats(dial_count, dial_success, dial_failure), DtRange(startTimestamp, endTimestamp), syncts, startTimestamp))
}
val firstAccessFromCassandra = summary.map{ x => x._1}
.joinWithCassandraTable[Long](Constants.DEVICE_KEY_SPACE_NAME, Constants.DEVICE_PROFILE_TABLE).select("first_access")
.joinWithCassandraTable[Option[Long]](Constants.DEVICE_KEY_SPACE_NAME, Constants.DEVICE_PROFILE_TABLE).select("first_access")
.on(SomeColumns("device_id", "channel"))
summary.leftOuterJoin(firstAccessFromCassandra)
.map{ x => x._2._1.copy(firstAccess = x._2._2.getOrElse(x._2._1.firstAccess))}
.map{ x =>
val firstAccessValue = x._2._2.getOrElse(Option(0L))
if(firstAccessValue.getOrElse(0L) != 0)
x._2._1.copy(firstAccess = firstAccessValue.getOrElse(x._2._1.firstAccess))
else x._2._1
}
}

override def postProcess(data: RDD[DeviceSummary], config: Map[String, AnyRef])(implicit sc: SparkContext): RDD[MeasuredEvent] = {
override def postProcess(data: RDD[DeviceSummary], config: Map[String, AnyRef])(implicit sc: SparkContext): RDD[MeasuredEvent] = {
data.map { x =>
val mid = CommonUtil.getMessageId("ME_DEVICE_SUMMARY", x.device_id, "DAY", x.dt_range, "NA", None, Option(x.channel))
val measures = Map(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,4 +93,25 @@ class TestDeviceSummaryModel extends SparkSpec(null) {
}
}

it should "update the value of first_access with dt_range.from if null is returned from Cassandra" in {
CassandraConnector(sc.getConf).withSessionDo { session =>
session.execute("TRUNCATE " + Constants.DEVICE_KEY_SPACE_NAME + "." + Constants.DEVICE_PROFILE_TABLE)
session.execute("INSERT INTO " + Constants.DEVICE_KEY_SPACE_NAME + "." + Constants.DEVICE_PROFILE_TABLE +"(device_id, channel)" +
"VALUES('49edda82418a1e916e9906a2fd7942cb','b00bc992ef25f1a9a8d63291e20efc8d')")
session.execute("INSERT INTO " + Constants.DEVICE_KEY_SPACE_NAME + "." + Constants.DEVICE_PROFILE_TABLE +"(device_id, channel, first_access)" +
"VALUES('88edda82418a1e916e9906a2fd7942cb','b00bc992ef25f1a9a8d63291e20efc8d', 1536909035000)")
}
val rdd = loadFile[String]("src/test/resources/device-summary/test_data1.log")
val measuredEvent = DeviceSummaryModel.execute(rdd, None)
measuredEvent.collect().foreach{ x =>
val summary = JSONUtils.deserialize[DeviceSummary](JSONUtils.serialize(x.edata.eks))
if(x.dimensions.did.get.equals("49edda82418a1e916e9906a2fd7942cb"))
summary.firstAccess should be(1537550355883L)
else if(x.dimensions.did.get.equals("88edda82418a1e916e9906a2fd7942cb"))
summary.firstAccess should be(1536909035000L)
else
summary.firstAccess should be(1537550355883L)
}
}

}

0 comments on commit 822debc

Please sign in to comment.