Skip to content

Commit

Permalink
Merge pull request #27 from project-sunbird/portal-metrics-changes
Browse files Browse the repository at this point in the history
Issue SB-9190 fix: Portal Metrics changes
  • Loading branch information
anandp504 authored Dec 20, 2018
2 parents 59e9d46 + 9d8401f commit 3449139
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 86 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -14,18 +14,17 @@ import org.ekstep.analytics.adapter.ContentAdapter
import org.ekstep.analytics.framework._
import org.ekstep.analytics.framework.dispatcher.AzureDispatcher
import org.ekstep.analytics.framework.util.{CommonUtil, JSONUtils}
import org.ekstep.analytics.util.{Constants, WorkFlowUsageSummaryFact}

import org.ekstep.analytics.util.Constants
import scala.util.Try


case class WorkflowSummaryEvents(deviceId: String, mode: String, dType: String, totalSession: Long, totalTs: Double) extends AlgoInput with Input
case class WorkFlowUsageMetrics(noOfUniqueDevices: Long, totalContentPlaySessions: Long, totalTimeSpent: Double, totalContentPublished: Long) extends AlgoOutput with Output with AlgoInput

case class Metrics(noOfUniqueDevices: Long, totalContentPlaySessions: Long, totalTimeSpent: Double, totalContentPublished: Long) extends AlgoOutput with Output
case class PortalMetrics(eid: String, ets: Long, syncts: Long, metrics_summary: Option[WorkFlowUsageMetrics]) extends AlgoOutput with Output

case class PortalMetrics(eid: String, ets: Long, syncts: Long, metrics_summary: Option[Metrics]) extends AlgoOutput with Output
case class DeviceProfile(device_id: String)

object UpdatePortalMetrics extends IBatchModelTemplate[DerivedEvent, WorkflowSummaryEvents, Metrics, PortalMetrics] with Serializable {
object UpdatePortalMetrics extends IBatchModelTemplate[DerivedEvent, DerivedEvent, WorkFlowUsageMetrics, PortalMetrics] with Serializable {

val className = "org.ekstep.analytics.updater.UpdatePortalMetrics"

Expand All @@ -34,17 +33,15 @@ object UpdatePortalMetrics extends IBatchModelTemplate[DerivedEvent, WorkflowSum
override def name: String = "UpdatePortalMetrics"

/**
* preProcess which will fetch the `WorkFlowUsageSummaryFact` Event data from the Cassandra Database.
* preProcess which will fetch the `workflow_usage_summary` Event data from the Cassandra Database.
*
* @param data - RDD Event Data(Empty RDD event)
* @param config - Configurations to run preProcess
* @param sc - SparkContext
* @return - workflowSummaryEvents
* @return - DerivedEvent
*/
override def preProcess(data: RDD[DerivedEvent], config: Map[String, AnyRef])(implicit sc: SparkContext): RDD[WorkflowSummaryEvents] = {
sc.cassandraTable[WorkFlowUsageSummaryFact](Constants.PLATFORM_KEY_SPACE_NAME, Constants.WORKFLOW_USAGE_SUMMARY_FACT).filter { x => x.d_period == 0 && x.d_tag.equals("all") && x.d_content_id.equals("all") && x.d_user_id.equals("all") }.map(event => {
WorkflowSummaryEvents(event.d_device_id, event.d_mode, event.d_type, event.m_total_sessions, event.m_total_ts)
})
override def preProcess(data: RDD[DerivedEvent], config: Map[String, AnyRef])(implicit sc: SparkContext): RDD[DerivedEvent] = {
data
}

/**
Expand All @@ -54,20 +51,15 @@ object UpdatePortalMetrics extends IBatchModelTemplate[DerivedEvent, WorkflowSum
* @param sc - Spark context
* @return - DashBoardSummary ->(uniqueDevices, totalContentPlayTime, totalTimeSpent,)
*/
override def algorithm(data: RDD[WorkflowSummaryEvents], config: Map[String, AnyRef])(implicit sc: SparkContext): RDD[Metrics] = {
object _constant extends Enumeration {
val APP = "app"
val PLAY = "play"
val CONTENT = "content"
val SESSION = "session"
val ALL = "all"
}

val uniqueDevicesCount = data.filter(x => x.deviceId != _constant.ALL).map(_.deviceId).distinct().count()
val totalContentPlaySessions = data.filter(x => x.mode.equalsIgnoreCase(_constant.PLAY) && x.dType.equalsIgnoreCase(_constant.CONTENT) && x.deviceId.equals("all")).map(_.totalSession).sum().toLong
val totalTimeSpent = data.filter(x => (x.dType.equalsIgnoreCase(_constant.APP) || x.dType.equalsIgnoreCase(_constant.SESSION)) && x.deviceId.equals("all")).map(_.totalTs).sum()
override def algorithm(data: RDD[DerivedEvent], config: Map[String, AnyRef])(implicit sc: SparkContext): RDD[WorkFlowUsageMetrics] = {
val totalContentPublished: Int = Try(ContentAdapter.getPublishedContentList().count).getOrElse(0)
sc.parallelize(Array(Metrics(uniqueDevicesCount, totalContentPlaySessions, CommonUtil.roundDouble(totalTimeSpent/3600, 2), totalContentPublished)))
val noOfUniqueDevices = sc.cassandraTable[DeviceProfile](Constants.DEVICE_KEY_SPACE_NAME, Constants.DEVICE_PROFILE_TABLE).map(_.device_id).distinct().count()
val metrics = sc.cassandraTable[WorkFlowUsageMetricsAlgoOutput](Constants.PLATFORM_KEY_SPACE_NAME, Constants.WORKFLOW_USAGE_SUMMARY).map(event => {
(event.total_timespent,event.total_content_play_sessions)
})
val totalTimeSpent = metrics.map(_._1).sum()
val totalContentPlaySessions = metrics.map(_._2).sum().toLong
sc.parallelize(Array(WorkFlowUsageMetrics(noOfUniqueDevices, totalContentPlaySessions, CommonUtil.roundDouble(totalTimeSpent / 3600, 2), totalContentPublished)))
}

/**
Expand All @@ -77,9 +69,9 @@ object UpdatePortalMetrics extends IBatchModelTemplate[DerivedEvent, WorkflowSum
* @param sc - Spark context
* @return - ME_PORTAL_CUMULATIVE_METRICS MeasuredEvents
*/
override def postProcess(data: RDD[Metrics], config: Map[String, AnyRef])(implicit sc: SparkContext): RDD[PortalMetrics] = {
override def postProcess(data: RDD[WorkFlowUsageMetrics], config: Map[String, AnyRef])(implicit sc: SparkContext): RDD[PortalMetrics] = {
val record = data.first()
val measures = Metrics(record.noOfUniqueDevices, record.totalContentPlaySessions, record.totalTimeSpent, record.totalContentPublished)
val measures = WorkFlowUsageMetrics(record.noOfUniqueDevices, record.totalContentPlaySessions, record.totalTimeSpent, record.totalContentPublished)
val metrics = PortalMetrics(EVENT_ID, System.currentTimeMillis(), System.currentTimeMillis(), Some(measures))
if (config.getOrElse("dispatch", false).asInstanceOf[Boolean]) {
AzureDispatcher.dispatch(Array(JSONUtils.serialize(metrics)), config)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import org.joda.time.DateTime
/**
* @author Manjunath Davanam <manjunathd@ilimi.in>
*/
case class TestPortalMetrics(noOfUniqueDevices: Long, totalContentPlayTime: Double, totalTimeSpent: Double, totalContentPublished: Long)
case class TestPortalMetrics(noOfUniqueDevices: Long, totalContentPlaySessions: Double, totalTimeSpent: Double, totalContentPublished: Long)

class TestUpdatePortalMetrics extends SparkSpec(null) {

Expand Down Expand Up @@ -42,68 +42,14 @@ class TestUpdatePortalMetrics extends SparkSpec(null) {
}

"UpdateDashboardModel" should "Should find the unique device count,totalContentPublished,totalTimeSpent,totalContentPlayTime and should filter when d_time>0(Cumulative)" in {
cleanDataBase()
val inputData = Array(
WorkFlowUsageSummaryFact(0, "b00bc992ef25f1a9a8d63291e20efc8d", "prod.diksha.app", "all", "content", "play", "874ed8a5-782e-4f6c-8f36-e0288455901e", "org.ekstep.delta", "all", DateTime.now, DateTime.now, DateTime.now, 450.0, 4, 112.5, 100, 23.56, 11, 2.15, 12, 15, 18, Array(1), Array(2), Array(3), Some("Textbook")),
WorkFlowUsageSummaryFact(0, AppConf.getConfig("default.channel.id"), "prod.diksha.portal", "all", "content", "play", "874ed8a5-782e-4f6c-8f36-e0288455901e", "org.ekstep.vayuthewind", "all", DateTime.now, DateTime.now, DateTime.now, 450.0, 4, 112.5, 100, 23.56, 11, 2.15, 12, 15, 18, Array(1), Array(2), Array(3), Some("Worksheet")),
WorkFlowUsageSummaryFact(22, AppConf.getConfig("default.channel.id"), "prod.diksha.portal", "all", "Worksheet", "mode1", "874ed8a5-782e-4f6c-8f36-e0288455901e", "org.ekstep.ek", "all", DateTime.now, DateTime.now, DateTime.now, 40, 4, 112.5, 100, 23.56, 11, 33, 12, 15, 18, Array(1), Array(2), Array(3), Some("Worksheet")),
WorkFlowUsageSummaryFact(0, AppConf.getConfig("default.channel.id"), "prod.diksha.portal", "all", "Worksheet", "mode1", "5743895-53457439-54389638-59834758-53", "org.ekstep.vayuthewind", "all", DateTime.now, DateTime.now, DateTime.now, 450.0, 4, 112.5, 100, 23.56, 11, 2.15, 12, 15, 18, Array(1), Array(2), Array(3), Some("Worksheet"))
)
saveToDB(inputData)
val rdd = executeDataProduct()
val out = rdd.collect()
println(JSONUtils.serialize(out.head))
val dashboardSummary = JSONUtils.deserialize[TestPortalMetrics](JSONUtils.serialize(out.head.metrics_summary))
dashboardSummary.totalContentPublished should be(749)
dashboardSummary.noOfUniqueDevices should be(2)
dashboardSummary.totalTimeSpent should be(0.0)
dashboardSummary.totalContentPlayTime should be(0.0)
}

it should "populate zero records when no data is found in Database" in {
println("second test case")
cleanDataBase()
saveToDB(Array())
val result = executeDataProduct().collect().head
val dashboardSummary = JSONUtils.deserialize[TestPortalMetrics](JSONUtils.serialize(result.metrics_summary))
dashboardSummary.totalContentPublished should be(749)
dashboardSummary.noOfUniqueDevices should be(0)
dashboardSummary.totalTimeSpent should be(0)
dashboardSummary.totalContentPlayTime should be(0)
println(JSONUtils.serialize(result))
}

it should "Populate the accurate device count excluding when deviceId='all', totalTimeSpent and totalContentPlaySession " in{
println("3rd testcase")
cleanDataBase()
val inputData = Array(
WorkFlowUsageSummaryFact(0, "b00bc992ef25f1a9a8d63291e20efc8d", "prod.diksha.app", "all", "content", "play", "874ed8a5-782e-4f6c-8f36-e0288455901e", "org.ekstep.delta", "all", DateTime.now, DateTime.now, DateTime.now, 450.0, 100, 100, 100, 23.56, 11, 2.15, 12, 15, 18, Array(1), Array(2), Array(3), Some("Textbook")),
WorkFlowUsageSummaryFact(0, "b00bc992ef25f1a9a8d63291e20efc8d", "prod.diksha.app", "all", "content", "play", "all", "all", "all", DateTime.now, DateTime.now, DateTime.now, 450.0, 30, 10, 100, 23.56, 11, 2.15, 12, 15, 18, Array(1), Array(2), Array(3), Some("Textbook")),
WorkFlowUsageSummaryFact(0, "b00bc992ef25f1a9a8d63291e20efc8d", "prod.diksha.app", "all", "content", "play", "5345345345435", "all", "all", DateTime.now, DateTime.now, DateTime.now, 450.0, 30, 10, 100, 23.56, 11, 2.15, 12, 15, 18, Array(1), Array(2), Array(3), Some("Textbook")),
WorkFlowUsageSummaryFact(0, "b00bc992ef25f1a9a8d63291e20efc8fff", "prod.diksha.app", "all", "content", "play", "all", "all", "all", DateTime.now, DateTime.now, DateTime.now, 450.0, 30, 10, 100, 23.56, 11, 2.15, 12, 15, 18, Array(1), Array(2), Array(3), Some("Textbook")),
WorkFlowUsageSummaryFact(0, "b00bc992ef25f1a9a8d63291e20efc8d", "prod.diksha.app", "all", "content", "play", "534557346543-782e-4f6c-8f36-e02884559085", "org.ekstep.delta", "all", DateTime.now, DateTime.now, DateTime.now, 450.0, 0, 20.5, 100, 23.56, 11, 2.15, 12, 15, 18, Array(1), Array(2), Array(3), Some("Textbook")),
WorkFlowUsageSummaryFact(0, "b00bc992ef25f1a9a8d63291e20efc8d", "prod.diksha.app", "all", "content", "play", "all", "org.ekstep.delta", "all", DateTime.now, DateTime.now, DateTime.now, 450.0, 4, 112.5, 100, 23.56, 11, 2.15, 12, 10, 20, Array(1), Array(2), Array(3), Some("Textbook")),
WorkFlowUsageSummaryFact(0, "b00bc992ef25f1a9a8d63291e20efc8df","prod.diksha.app", "all", "content", "play", "all", "org.ekstep.delta", "all", DateTime.now, DateTime.now, DateTime.now, 450.0, 4, 112.5, 100, 23.56, 11, 2.15, 12, 20, 600, Array(1), Array(2), Array(3), Some("Textbook")),
WorkFlowUsageSummaryFact(0, "b00bc992ef25f1a9a8d63291e20efc8d", "prod.diksha.app", "all", "content", "edit", "534557346543-782e-4f6c-8f36-e02884559085", "org.ekstep.delta", "all", DateTime.now, DateTime.now, DateTime.now, 450.0, 30, 112.5, 100, 23.56, 11, 2.15, 12, 15, 18, Array(1), Array(2), Array(3), Some("Textbook")),
WorkFlowUsageSummaryFact(0, "b00bc992ef25f1a9a8d63291e20efc8da", "prod.diksha.app", "all", "app", "edit", "534557346543-782e-4f6c-8f36-e02884559085a", "org.ekstep.delta", "all", DateTime.now, DateTime.now, DateTime.now, 450.0, 30, 112.5, 100, 23.56, 11, 2.15, 12, 15, 18, Array(1), Array(2), Array(3), Some("Textbook")),
WorkFlowUsageSummaryFact(0, "b00bc992ef25f1a9a8d63291e20efc8db", "prod.diksha.app", "all", "app", "edit", "534557346543-782e-4f6c-8f36-e02884559085b", "org.ekstep.delta", "all", DateTime.now, DateTime.now, DateTime.now, 450.0, 30, 112.5, 100, 23.56, 11, 2.15, 12, 15, 18, Array(1), Array(2), Array(3), Some("Textbook")),
WorkFlowUsageSummaryFact(0, "b00bc992ef25f1a9a8d63291e20efc8dc", "prod.diksha.app", "all", "session", "edit", "534557346543-782e-4f6c-8f36-e02884559085c", "org.ekstep.delta", "all", DateTime.now, DateTime.now, DateTime.now, 100.0, 30, 112.5, 100, 23.56, 11, 2.15, 12, 15, 18, Array(1), Array(2), Array(3), Some("Textbook")),
WorkFlowUsageSummaryFact(0, "b00bc992ef25f1a9a8d63291e20efc8dd", "prod.diksha.app", "all", "session", "edit", "all", "all", "all", DateTime.now, DateTime.now, DateTime.now, 430.8, 30, 112.5, 100, 23.56, 11, 2.15, 12, 15, 18, Array(1), Array(2), Array(3), Some("Textbook")),
WorkFlowUsageSummaryFact(0, "b00bc992ef25f1a9a8d63291e20efc8ddf", "prod.diksha.app", "all", "app", "edit", "all", "all", "all", DateTime.now, DateTime.now, DateTime.now, 430.8, 30, 112.5, 100, 23.56, 11, 2.15, 12, 15, 18, Array(1), Array(2), Array(3), Some("Textbook")),

WorkFlowUsageSummaryFact(0, "b00bc992ef25f1a9a8d63291e20efc8d", "prod.diksha.app", "all", "content", "play", "all", "org.ekstep.delta", "all", DateTime.now, DateTime.now, DateTime.now, 450.0, 100, 100, 100, 23.56, 11, 2.15, 12, 15, 18, Array(1), Array(2), Array(3), Some("Textbook")),
WorkFlowUsageSummaryFact(0, "b00bc992ef25f1a9a8d63291e20efc8d", "prod.diksha.app", "all", "content", "play", "986094586ffdfd", "org.ekstep.delta", "all", DateTime.now, DateTime.now, DateTime.now, 450.0, 100, 100, 100, 23.56, 11, 2.15, 12, 15, 18, Array(1), Array(2), Array(3), Some("Textbook"))

)
saveToDB(inputData)
val result = executeDataProduct().collect().head
println(JSONUtils.serialize(result))
val dashboardSummary = JSONUtils.deserialize[TestPortalMetrics](JSONUtils.serialize(result.metrics_summary))
dashboardSummary.totalContentPublished should be(749)
dashboardSummary.noOfUniqueDevices should be(7)
dashboardSummary.totalTimeSpent should be(0.24)
dashboardSummary.totalContentPlayTime should be(60)

dashboardSummary.totalContentPublished should be(0)
dashboardSummary.noOfUniqueDevices should be(3)
dashboardSummary.totalTimeSpent should be(108.9)
dashboardSummary.totalContentPlaySessions should be(624)
}

}

0 comments on commit 3449139

Please sign in to comment.