Jean-Luc Canela edited this page Mar 20, 2019 · 10 revisions

Spark Hands-on

What's in the project

$ tree
├── accesslog (some generated log files)
│   ├── day=2019-01-01
│   │   ├── _SUCCESS
│   │   └── part-00000-73151710-74b6-4fd5-8608-e3ef7a58bf50-c000.snappy.parquet
│   ├── day=2019-01-02
│   │   ├── _SUCCESS
│   │   └── part-00000-6888fd78-3ce7-42de-a82d-85ef4943c83b-c000.snappy.parquet
|   [...]
├── almhuette-raith.log
│   └── access.log.gz (a real web server log file)
├── build.sbt
├── sample.scala
├── spark-hands-on.snb
└── src
    └── main
        └── scala
            └── Generator.scala

How to fetch data on your laptop

The log file of almhuette-raith have been downloaded. Available in the github repository.

git clone
# or
curl -o access.log.gz -L

Let's check the file is ok:

gzcat /Users/XXXXX/Documents/dev/summit/spark-hands-on/almhuette-raith.log/access.log.gz | wc -l

It should display 2660050 lines.

Let's start


Use the log file fetched from

val path = "/Users/XXXXX/Documents/dev/summit/spark-hands-on/almhuette-raith.log"
val logs =
// should be 2660050

We can print the data schema


Schema is logical type of data and can have multiple representations

val rowExample = logs.take(2).drop(1).head
// Row is untyped data

We can convert a Row to a String:

val logAsString =
// should be 2660050

The schema is similar:

val stringExample = logAsString.take(2).drop(1).head

We can convert this string to a more typed data:

// case class enable us to define stucture
case class AccessLog(ip: String, ident: String, user: String, datetime: String, request: String, status: String, size: String, referer: String, userAgent: String, unk: String)

Let's create an access log:

val l1 = AccessLog("ip", "ident", "user", "datetime", "request", "status", "size", "referer", "userAgent", "unk")
// you can now access the field directly, with l1.ip for example

The AccessLog constructor is the function AccessLog.apply:

AccessLog.apply _

We can use regex:

val R = """^(?<ip>[0-9.]+) (?<identd>[^ ]) (?<user>[^ ]) \[(?<datetime>[^\]]+)\] \"(?<request>[^\"]*)\" (?<status>[^ ]*) (?<size>[^ ]*) \"(?<referer>[^\"]*)\" \"(?<useragent>[^\"]*)\" \"(?<unk>[^\"]*)\"""".r

Parse the strings:

val dsParsed = logAsString.flatMap(x => R.unapplySeq(x))
// should be 2655452

We have fewer lines ! 2655452 < 2660050 ??? => 4598 missing lines ?

The Option[T] type can be very useful with Some[T] and None.

Option[T] = Some[T] | None ? T1 => T2 )
none.get => NoSuchElementException

For example with parsing result:

val parsed = R.unapplySeq(stringExample)
R.unapplySeq("not a good string")

Let's check if some data cannot be parsed:

val dsUnparsed = logAsString.filter(x => R.unapplySeq(x).isEmpty)
// should be 4598

What is this data ?

Note that AccessLog constructors need 10 arguments, it will not work with a single argument


We can adapt from list to multiple arguments for AccessLog:

def toAccessLog(params: List[String]) = AccessLog(params(0), params(1), params(2), params(3), params(4), params(5), params(6), params(7), params(8), params(9))

The type safe developer (including scala) usually hate the ugly naked get


So it's better to use this:

We can then convert the parsed list of string to an AccessLog:

val ds = _)
// should be 2655452

We can convert the time string to a timestamp:

val dsWithTime = ds.withColumn("datetime", to_timestamp(ds("datetime"), "dd/MMM/yyyy:HH:mm:ss X"))

Note that the type have been updated:


Let's cache data to avoid everytime a new computation:


(use this caution)

We can then use directly this data as SQL:

spark.sql("select count(*) from AccessLog").show(false)

Which days do we have the most requests ?

spark.sql("select cast(datetime as date) as date, count(*) as count from AccessLog group by date having count > 1000 order by count desc limit 10").show(false)

What happened 2018-06-27 ?

spark.sql("select * from AccessLog where cast(datetime as date) = '2018-06-27' limit 100").show(false)

Let's play with request field:

spark.sql("select request from AccessLog limit 10").show(false)
// POST /administrator/index.php HTTP/1.1

Let's parse the request field:

val REQ_EX = "([^ ]+)[ ]+([^ ]+)[ ]+([^ ]+)".r
val s = "POST /administrator/index.php HTTP/1.1"

Looks good, all requests can be filtered by the regex

dsWithTime.filter(x => REQ_EX.unapplySeq(x.getString(x.fieldIndex("request"))).isEmpty).show(false)

Let's replace the request column by method / uri / http

val dsExtended = dsWithTime.withColumn("method", regexp_extract(dsWithTime("request"), REQ_EX.toString, 1)).withColumn("uri", regexp_extract(dsWithTime("request"), REQ_EX.toString, 2)).withColumn("http", regexp_extract(dsWithTime("request"), REQ_EX.toString, 3)).drop("request")

The schema have been updated:


How many requests starts with administrator ? 

We can use the scala version:

// 701979

Or use sql:

spark.sql("select count(*) from AccessLogExt where uri like '/administrator%'").show(false)
// count = 701979 :)

To check all the available functions in sql, go to

What are the administrator access by day ?

spark.sql("select count(*) from AccessLogExt where uri like '/administrator%'").show(false)
spark.sql("select cast(datetime as date) as date, count(*) as count from AccessLogExt where uri like '/administrator%' group by date having count > 1000 order by count desc limit 10").show(false)

What happened 2016-02-18 ?

spark.sql("select * from AccessLogExt where cast(datetime as date) = '2016-02-18' and uri like '/administrator%' limit 100").show(false)

Can we count group by uri ?

spark.sql("select count(*) as count, uri from AccessLogExt where cast(datetime as date) = '2016-02-18' and uri like '/administrator%' group by uri  order by count desc limit 100").show(false)

May be count group by ip ?

spark.sql("select ip, count(*) as count from AccessLogExt where cast(datetime as date) = '2016-02-18' and uri like '/administrator%' group by ip order by count desc limit 100").show(false)

What is the distribution of count(distinct(ip)) by date around the 2016-02-18 ? 

spark.sql("select count(distinct(ip)) as count, cast(datetime as date) as date from AccessLogExt where to_date(datetime)  > to_date('2016-02-10') and to_date(datetime)  <= to_date('2016-02-28') and uri like '/administrator%' group by cast(datetime as date) order by cast(datetime as date) asc").show(false)

Note : Thanks @ for date functions

We just received by a secure channel the list of known suspicious ip of Mr Robot, unfortunately they are encrypted:

// 036c1eb2fe119b31853b8234d29e02898b6dffba => suspicious
// b398b4c034b719a5dd341bf494eb7391e8a0d74f => suspicious
// 552aca3388d939abf1baaf7cf4197eec5a4d880c => not suspicious

Let's create a dataset:

case class SuspiciousIp(sha1: String, suspicious: Boolean)
val suspiciousip = Seq(
  SuspiciousIp("036c1eb2fe119b31853b8234d29e02898b6dffba", true), 
  SuspiciousIp("b398b4c034b719a5dd341bf494eb7391e8a0d74f", true ), 
  SuspiciousIp("552aca3388d939abf1baaf7cf4197eec5a4d880c", false)).toDF

How can we find the real IP address ? 

val alldistinctip = spark.sql("select distinct(ip) from AccessLog")
val clearIp = spark.sql("select DistinctIp.ip, SuspiciousIp.sha1, SuspiciousIp.suspicious from SuspiciousIp left join DistinctIp on sha1(DistinctIp.ip) = SuspiciousIp.sha1")

We can then save the ip address with write:


Let's check what happened

Another option:

clearIp.coalesce(1).write.option("header", "true").csv("clean-ip2.csv")

The following file format are supported: 

  • avro
  • parquet
  • orc
  • text

Daily text log file should be compressed to save I/O, it's not splittable but it's not a problem in this case.

