Skip to content

How to create a report in json format with spark

Jean-Luc Canela edited this page Jul 4, 2020 · 2 revisions

Access raw data

Use the text spark reader:

val path = "access.log.gz"
val logs = spark.read.text(path)

Convert the data to the right format

Define you case class and converters:

case class AccessLog(
  ip: String, 
  ident: String, 
  user: String, 
  datetime: String, 
  request: String, 
  status: String, 
  size: String, 
  referer: String, 
  userAgent: String, 
  unk: String)

val R = """^(?<ip>[0-9.]+) (?<identd>[^ ]) (?<user>[^ ]) \[(?<datetime>[^\]]+)\] \"(?<request>[^\"]*)\" (?<status>[^ ]*) (?<size>[^ ]*) \"(?<referer>[^\"]*)\" \"(?<useragent>[^\"]*)\" \"(?<unk>[^\"]*)\"""".r

def toAccessLog(params: List[String]) = AccessLog(
  params(0), 
  params(1), 
  params(2), 
  params(3), 
  params(4), 
  params(5), 
  params(6), 
  params(7), 
  params(8), 
  params(9)
)

And then do all the conversions:

val logAsString = logs.map(_.getString(0))
val dsParsed = logAsString.flatMap(x => R.unapplySeq(x))
 
val ds = dsParsed.map(toAccessLog _)
val dsWithTime = ds.withColumn("datetime", to_timestamp(ds("datetime"), "dd/MMM/yyyy:HH:mm:ss X"))
 
val REQ_EX = "([^ ]+)[ ]+([^ ]+)[ ]+([^ ]+)".r
val s = "POST /administrator/index.php HTTP/1.1"
REQ_EX.unapplySeq(s)

val dsExtended = dsWithTime
  .withColumn("method", regexp_extract(dsWithTime("request"), REQ_EX.toString, 1))
  .withColumn("uri", regexp_extract(dsWithTime("request"), REQ_EX.toString, 2))
  .withColumn("http", regexp_extract(dsWithTime("request"), REQ_EX.toString, 3))
  .drop("request")

Cache the dataframe and register it as a table so that it can be referenced directly within SQL queries:

dsExtended.cache
dsExtended.createOrReplaceTempView("AccessLogExt")

Find dates having more than 20k connections

use the following sql:

val sql = """select count(*) as count, cast(datetime as date) as date
 from AccessLogExt
  group by date
  HAVING count(*) > 20000
  order by count desc"""

and then compute and fetch the dates: 

def findDatesHavingMoreThan20kConnections: Seq[java.sql.Date] = 
  spark.sql(sql).select("date").map(_.getDate(0)).collect()
 
val theDates = findDatesHavingMoreThan20kConnections
val currentDate = theDates(0)

# Report by URI

Define the report functions functions:

def numberOfAccessByUri(currentDate: java.sql.Date) = spark
  .sql("select uri, cast(datetime as date) as date, count(*) as countaccess from AccessLogExt group by date, uri order by countaccess desc")
  .filter(col("date")===currentDate).drop("date")

case class UriReport(access: Map[String, Long])

def reportByDate(currentDate: java.sql.Date) = UriReport(
  numberOfAccessByUri(currentDate)
    .collect.map(r => (r.getString(0), r.getLong(1))).toMap
  )

Write reports as json

Store locally the reports:

import spark.implicits._

val reportAsSeq = theDates.map(date => (date,reportByDate(date)))
reportAsSeq.toDF("date", "uriReport")
  .coalesce(1)
  .write
  .mode("Overwrite")
  .json("myjsonreport")