Skip to content
Jean-Luc Canela edited this page Mar 20, 2019 · 10 revisions

Spark Hands-on

https://spark.apache.org/

What's in the project

$ tree
.
├── LICENSE
├── README.md
├── accesslog (some generated log files)
│   ├── day=2019-01-01
│   │   ├── _SUCCESS
│   │   └── part-00000-73151710-74b6-4fd5-8608-e3ef7a58bf50-c000.snappy.parquet
│   ├── day=2019-01-02
│   │   ├── _SUCCESS
│   │   └── part-00000-6888fd78-3ce7-42de-a82d-85ef4943c83b-c000.snappy.parquet
|   [...]
├── almhuette-raith.log
│   └── access.log.gz (a real web server log file)
├── build.sbt
├── sample.scala
├── spark-hands-on.snb
└── src
    └── main
        └── scala
            └── Generator.scala

How to fetch data on your laptop

The log file of almhuette-raith have been downloaded. Available in the github repository.

git clone https://github.com/jlcanela/spark-hands-on.git
ls 
# or
curl -o access.log.gz -L https://github.com/jlcanela/spark-hands-on/raw/master/almhuette-raith.log/access.log.gz

Let's check the file is ok:

gzcat /Users/XXXXX/Documents/dev/summit/spark-hands-on/almhuette-raith.log/access.log.gz | wc -l

It should display 2660050 lines.

Let's start

bin/spark-shell

Use the log file fetched from http://www.almhuette-raith.at/apache-log/access.log

val path = "/Users/XXXXX/Documents/dev/summit/spark-hands-on/almhuette-raith.log"
val logs = spark.read.text(path)
logs.count
// should be 2660050

We can print the data schema

logs.printSchema

Schema is logical type of data and can have multiple representations

val rowExample = logs.take(2).drop(1).head
// Row is untyped data

We can convert a Row to a String:

val logAsString = logs.map(_.getString(0))
logAsString.count
// should be 2660050

The schema is similar:

logAsString.printSchema
val stringExample = logAsString.take(2).drop(1).head

We can convert this string to a more typed data:

// case class enable us to define stucture
case class AccessLog(ip: String, ident: String, user: String, datetime: String, request: String, status: String, size: String, referer: String, userAgent: String, unk: String)

Let's create an access log:

val l1 = AccessLog("ip", "ident", "user", "datetime", "request", "status", "size", "referer", "userAgent", "unk")
// you can now access the field directly, with l1.ip for example

The AccessLog constructor is the function AccessLog.apply:

AccessLog.apply _

We can use regex:

val R = """^(?<ip>[0-9.]+) (?<identd>[^ ]) (?<user>[^ ]) \[(?<datetime>[^\]]+)\] \"(?<request>[^\"]*)\" (?<status>[^ ]*) (?<size>[^ ]*) \"(?<referer>[^\"]*)\" \"(?<useragent>[^\"]*)\" \"(?<unk>[^\"]*)\"""".r

Parse the strings:

val dsParsed = logAsString.flatMap(x => R.unapplySeq(x))
dsParsed.count
// should be 2655452

We have fewer lines ! 2655452 < 2660050 ??? => 4598 missing lines ?

The Option[T] type can be very useful with Some[T] and None.

Option[T] = Some[T] | None ?
option.map( T1 => T2 )
some.get 
option.getOrElse(default)
none.get => NoSuchElementException

For example with parsing result:

val parsed = R.unapplySeq(stringExample)
R.unapplySeq("not a good string")

Let's check if some data cannot be parsed:

val dsUnparsed = logAsString.filter(x => R.unapplySeq(x).isEmpty)
dsUnparsed.count
// should be 4598

What is this data ? 

dsUnparsed.show(false)

Note that AccessLog constructors need 10 arguments, it will not work with a single argument

AccessLog(parsed)

We can adapt from list to multiple arguments for AccessLog:

def toAccessLog(params: List[String]) = AccessLog(params(0), params(1), params(2), params(3), params(4), params(5), params(6), params(7), params(8), params(9))

The type safe developer (including scala) usually hate the ugly naked get

toAccessLog(parsed.get)

So it's better to use this:

parsed.map(toAccessLog)

We can then convert the parsed list of string to an AccessLog:

val ds = dsParsed.map(toAccessLog _)
ds.count
// should be 2655452
ds.printSchema

We can convert the time string to a timestamp:

val dsWithTime = ds.withColumn("datetime", to_timestamp(ds("datetime"), "dd/MMM/yyyy:HH:mm:ss X"))

Note that the type have been updated:

dsWithTime.printSchema

Let's cache data to avoid everytime a new computation:

dsWithTime.cache

(use this caution)

We can then use directly this data as SQL:

dsWithTime.createOrReplaceTempView("AccessLog")
spark.sql("select count(*) from AccessLog").show(false)

Which days do we have the most requests ?

spark.sql("select cast(datetime as date) as date, count(*) as count from AccessLog group by date having count > 1000 order by count desc limit 10").show(false)

What happened 2018-06-27 ?

spark.sql("select * from AccessLog where cast(datetime as date) = '2018-06-27' limit 100").show(false)

Let's play with request field:

spark.sql("select request from AccessLog limit 10").show(false)
// POST /administrator/index.php HTTP/1.1

Let's parse the request field:

val REQ_EX = "([^ ]+)[ ]+([^ ]+)[ ]+([^ ]+)".r
val s = "POST /administrator/index.php HTTP/1.1"
REQ_EX.unapplySeq(s)

Looks good, all requests can be filtered by the regex

dsWithTime.filter(x => REQ_EX.unapplySeq(x.getString(x.fieldIndex("request"))).isEmpty).show(false)

Let's replace the request column by method / uri / http

val dsExtended = dsWithTime.withColumn("method", regexp_extract(dsWithTime("request"), REQ_EX.toString, 1)).withColumn("uri", regexp_extract(dsWithTime("request"), REQ_EX.toString, 2)).withColumn("http", regexp_extract(dsWithTime("request"), REQ_EX.toString, 3)).drop("request")

The schema have been updated:

dsExtended.printSchema

How many requests starts with administrator ? 

We can use the scala version:

dsExtended.filter(col("uri").startsWith("/administrator")).count
// 701979

Or use sql:

dsExtended.createOrReplaceTempView("AccessLogExt")
spark.sql("select count(*) from AccessLogExt where uri like '/administrator%'").show(false)
// count = 701979 :)

To check all the available functions in sql, go to https://spark.apache.org/docs/latest/api/sql/

What are the administrator access by day ?

spark.sql("select count(*) from AccessLogExt where uri like '/administrator%'").show(false)
spark.sql("select cast(datetime as date) as date, count(*) as count from AccessLogExt where uri like '/administrator%' group by date having count > 1000 order by count desc limit 10").show(false)

What happened 2016-02-18 ?

spark.sql("select * from AccessLogExt where cast(datetime as date) = '2016-02-18' and uri like '/administrator%' limit 100").show(false)

Can we count group by uri ?

spark.sql("select count(*) as count, uri from AccessLogExt where cast(datetime as date) = '2016-02-18' and uri like '/administrator%' group by uri  order by count desc limit 100").show(false)

May be count group by ip ?

spark.sql("select ip, count(*) as count from AccessLogExt where cast(datetime as date) = '2016-02-18' and uri like '/administrator%' group by ip order by count desc limit 100").show(false)

What is the distribution of count(distinct(ip)) by date around the 2016-02-18 ? 

spark.sql("select count(distinct(ip)) as count, cast(datetime as date) as date from AccessLogExt where to_date(datetime)  > to_date('2016-02-10') and to_date(datetime)  <= to_date('2016-02-28') and uri like '/administrator%' group by cast(datetime as date) order by cast(datetime as date) asc").show(false)

Note : Thanks @ https://docs-snaplogic.atlassian.net/wiki/spaces/SD/pages/2458071/Date+Functions+and+Properties+Spark+SQL for date functions

We just received by a secure channel the list of known suspicious ip of Mr Robot, unfortunately they are encrypted:

// 036c1eb2fe119b31853b8234d29e02898b6dffba => suspicious
// b398b4c034b719a5dd341bf494eb7391e8a0d74f => suspicious
// 552aca3388d939abf1baaf7cf4197eec5a4d880c => not suspicious

Let's create a dataset:

case class SuspiciousIp(sha1: String, suspicious: Boolean)
val suspiciousip = Seq(
  SuspiciousIp("036c1eb2fe119b31853b8234d29e02898b6dffba", true), 
  SuspiciousIp("b398b4c034b719a5dd341bf494eb7391e8a0d74f", true ), 
  SuspiciousIp("552aca3388d939abf1baaf7cf4197eec5a4d880c", false)).toDF
suspiciousip.show(false)
suspiciousip.createOrReplaceTempView("SuspiciousIp")

How can we find the real IP address ? 

val alldistinctip = spark.sql("select distinct(ip) from AccessLog")
alldistinctip.createOrReplaceTempView("DistinctIp")
val clearIp = spark.sql("select DistinctIp.ip, SuspiciousIp.sha1, SuspiciousIp.suspicious from SuspiciousIp left join DistinctIp on sha1(DistinctIp.ip) = SuspiciousIp.sha1")
clearIp.cache
clearIp.show(false)

We can then save the ip address with write:

clearIp.write.csv("clear-ip.csv")

Let's check what happened

Another option:

clearIp.coalesce(1).write.option("header", "true").csv("clean-ip2.csv")

The following file format are supported: 

  • avro
  • parquet
  • orc
  • text

Daily text log file should be compressed to save I/O, it's not splittable but it's not a problem in this case.

Some information here: http://aseigneurin.github.io/2016/11/08/spark-file-formats-and-storage-options.html

Tungsten format

Let's have a look at https://www.slideshare.net/databricks/deep-dive-memory-management-in-apache-spark Slide 48 - 49

Catalyst optimizer

Let's have a look at https://fr.slideshare.net/SparkSummit/deep-dive-into-catalyst-apache-spark-20s-optimizer-63071120