-
Notifications
You must be signed in to change notification settings - Fork 7
How to create a report in json format with spark
Jean-Luc Canela edited this page Jul 4, 2020
·
2 revisions
Use the text spark reader:
val path = "access.log.gz"
val logs = spark.read.text(path)
Define you case class and converters:
case class AccessLog(
ip: String,
ident: String,
user: String,
datetime: String,
request: String,
status: String,
size: String,
referer: String,
userAgent: String,
unk: String)
val R = """^(?<ip>[0-9.]+) (?<identd>[^ ]) (?<user>[^ ]) \[(?<datetime>[^\]]+)\] \"(?<request>[^\"]*)\" (?<status>[^ ]*) (?<size>[^ ]*) \"(?<referer>[^\"]*)\" \"(?<useragent>[^\"]*)\" \"(?<unk>[^\"]*)\"""".r
def toAccessLog(params: List[String]) = AccessLog(
params(0),
params(1),
params(2),
params(3),
params(4),
params(5),
params(6),
params(7),
params(8),
params(9)
)
And then do all the conversions:
val logAsString = logs.map(_.getString(0))
val dsParsed = logAsString.flatMap(x => R.unapplySeq(x))
val ds = dsParsed.map(toAccessLog _)
val dsWithTime = ds.withColumn("datetime", to_timestamp(ds("datetime"), "dd/MMM/yyyy:HH:mm:ss X"))
val REQ_EX = "([^ ]+)[ ]+([^ ]+)[ ]+([^ ]+)".r
val s = "POST /administrator/index.php HTTP/1.1"
REQ_EX.unapplySeq(s)
val dsExtended = dsWithTime
.withColumn("method", regexp_extract(dsWithTime("request"), REQ_EX.toString, 1))
.withColumn("uri", regexp_extract(dsWithTime("request"), REQ_EX.toString, 2))
.withColumn("http", regexp_extract(dsWithTime("request"), REQ_EX.toString, 3))
.drop("request")
Cache the dataframe and register it as a table so that it can be referenced directly within SQL queries:
dsExtended.cache
dsExtended.createOrReplaceTempView("AccessLogExt")
use the following sql:
val sql = """select count(*) as count, cast(datetime as date) as date
from AccessLogExt
group by date
HAVING count(*) > 20000
order by count desc"""
and then compute and fetch the dates:
def findDatesHavingMoreThan20kConnections: Seq[java.sql.Date] =
spark.sql(sql).select("date").map(_.getDate(0)).collect()
val theDates = findDatesHavingMoreThan20kConnections
val currentDate = theDates(0)
# Report by URI
Define the report functions functions:
def numberOfAccessByUri(currentDate: java.sql.Date) = spark
.sql("select uri, cast(datetime as date) as date, count(*) as countaccess from AccessLogExt group by date, uri order by countaccess desc")
.filter(col("date")===currentDate).drop("date")
case class UriReport(access: Map[String, Long])
def reportByDate(currentDate: java.sql.Date) = UriReport(
numberOfAccessByUri(currentDate)
.collect.map(r => (r.getString(0), r.getLong(1))).toMap
)
Store locally the reports:
import spark.implicits._
val reportAsSeq = theDates.map(date => (date,reportByDate(date)))
reportAsSeq.toDF("date", "uriReport")
.coalesce(1)
.write
.mode("Overwrite")
.json("myjsonreport")