-
Notifications
You must be signed in to change notification settings - Fork 7
Home
$ tree
.
├── LICENSE
├── README.md
├── accesslog (some generated log files)
│ ├── day=2019-01-01
│ │ ├── _SUCCESS
│ │ └── part-00000-73151710-74b6-4fd5-8608-e3ef7a58bf50-c000.snappy.parquet
│ ├── day=2019-01-02
│ │ ├── _SUCCESS
│ │ └── part-00000-6888fd78-3ce7-42de-a82d-85ef4943c83b-c000.snappy.parquet
| [...]
├── almhuette-raith.log
│ └── access.log.gz (a real web server log file)
├── build.sbt
├── sample.scala
├── spark-hands-on.snb
└── src
└── main
└── scala
└── Generator.scala
The log file of almhuette-raith have been downloaded. Available in the github repository.
git clone https://github.com/jlcanela/spark-hands-on.git
ls
# or
curl -o access.log.gz -L https://github.com/jlcanela/spark-hands-on/raw/master/almhuette-raith.log/access.log.gz
Let's check the file is ok:
gzcat /Users/XXXXX/Documents/dev/summit/spark-hands-on/almhuette-raith.log/access.log.gz | wc -l
It should display 2660050 lines.
bin/spark-shell
Use the log file fetched from http://www.almhuette-raith.at/apache-log/access.log
val path = "/Users/XXXXX/Documents/dev/summit/spark-hands-on/almhuette-raith.log"
val logs = spark.read.text(path)
logs.count
// should be 2660050
We can print the data schema
logs.printSchema
Schema is logical type of data and can have multiple representations
val rowExample = logs.take(2).drop(1).head
// Row is untyped data
We can convert a Row to a String:
val logAsString = logs.map(_.getString(0))
logAsString.count
// should be 2660050
The schema is similar:
logAsString.printSchema
val stringExample = logAsString.take(2).drop(1).head
We can convert this string to a more typed data:
// case class enable us to define stucture
case class AccessLog(ip: String, ident: String, user: String, datetime: String, request: String, status: String, size: String, referer: String, userAgent: String, unk: String)
Let's create an access log:
val l1 = AccessLog("ip", "ident", "user", "datetime", "request", "status", "size", "referer", "userAgent", "unk")
// you can now access the field directly, with l1.ip for example
The AccessLog constructor is the function AccessLog.apply:
AccessLog.apply _
We can use regex:
val R = """^(?<ip>[0-9.]+) (?<identd>[^ ]) (?<user>[^ ]) \[(?<datetime>[^\]]+)\] \"(?<request>[^\"]*)\" (?<status>[^ ]*) (?<size>[^ ]*) \"(?<referer>[^\"]*)\" \"(?<useragent>[^\"]*)\" \"(?<unk>[^\"]*)\"""".r
Parse the strings:
val dsParsed = logAsString.flatMap(x => R.unapplySeq(x))
dsParsed.count
// should be 2655452
We have fewer lines ! 2655452 < 2660050 ??? => 4598 missing lines ?
The Option[T] type can be very useful with Some[T] and None.
Option[T] = Some[T] | None ?
option.map( T1 => T2 )
some.get
option.getOrElse(default)
none.get => NoSuchElementException
For example with parsing result:
val parsed = R.unapplySeq(stringExample)
R.unapplySeq("not a good string")
Let's check if some data cannot be parsed:
val dsUnparsed = logAsString.filter(x => R.unapplySeq(x).isEmpty)
dsUnparsed.count
// should be 4598
What is this data ?
dsUnparsed.show(false)
Note that AccessLog constructors need 10 arguments, it will not work with a single argument
AccessLog(parsed)
We can adapt from list to multiple arguments for AccessLog:
def toAccessLog(params: List[String]) = AccessLog(params(0), params(1), params(2), params(3), params(4), params(5), params(6), params(7), params(8), params(9))
The type safe developer (including scala) usually hate the ugly naked get
toAccessLog(parsed.get)
So it's better to use this:
parsed.map(toAccessLog)
We can then convert the parsed list of string to an AccessLog:
val ds = dsParsed.map(toAccessLog _)
ds.count
// should be 2655452
ds.printSchema
We can convert the time string to a timestamp:
val dsWithTime = ds.withColumn("datetime", to_timestamp(ds("datetime"), "dd/MMM/yyyy:HH:mm:ss X"))
Note that the type have been updated:
dsWithTime.printSchema
Let's cache data to avoid everytime a new computation:
dsWithTime.cache
(use this caution)
We can then use directly this data as SQL:
dsWithTime.createOrReplaceTempView("AccessLog")
spark.sql("select count(*) from AccessLog").show(false)
Which days do we have the most requests ?
spark.sql("select cast(datetime as date) as date, count(*) as count from AccessLog group by date having count > 1000 order by count desc limit 10").show(false)
What happened 2018-06-27 ?
spark.sql("select * from AccessLog where cast(datetime as date) = '2018-06-27' limit 100").show(false)
Let's play with request field:
spark.sql("select request from AccessLog limit 10").show(false)
// POST /administrator/index.php HTTP/1.1
Let's parse the request field:
val REQ_EX = "([^ ]+)[ ]+([^ ]+)[ ]+([^ ]+)".r
val s = "POST /administrator/index.php HTTP/1.1"
REQ_EX.unapplySeq(s)
Looks good, all requests can be filtered by the regex
dsWithTime.filter(x => REQ_EX.unapplySeq(x.getString(x.fieldIndex("request"))).isEmpty).show(false)
Let's replace the request column by method / uri / http
val dsExtended = dsWithTime.withColumn("method", regexp_extract(dsWithTime("request"), REQ_EX.toString, 1)).withColumn("uri", regexp_extract(dsWithTime("request"), REQ_EX.toString, 2)).withColumn("http", regexp_extract(dsWithTime("request"), REQ_EX.toString, 3)).drop("request")
The schema have been updated:
dsExtended.printSchema
How many requests starts with administrator ?
We can use the scala version:
dsExtended.filter(col("uri").startsWith("/administrator")).count
// 701979
Or use sql:
dsExtended.createOrReplaceTempView("AccessLogExt")
spark.sql("select count(*) from AccessLogExt where uri like '/administrator%'").show(false)
// count = 701979 :)
To check all the available functions in sql, go to https://spark.apache.org/docs/latest/api/sql/
What are the administrator access by day ?
spark.sql("select count(*) from AccessLogExt where uri like '/administrator%'").show(false)
spark.sql("select cast(datetime as date) as date, count(*) as count from AccessLogExt where uri like '/administrator%' group by date having count > 1000 order by count desc limit 10").show(false)
What happened 2016-02-18 ?
spark.sql("select * from AccessLogExt where cast(datetime as date) = '2016-02-18' and uri like '/administrator%' limit 100").show(false)
Can we count group by uri ?
spark.sql("select count(*) as count, uri from AccessLogExt where cast(datetime as date) = '2016-02-18' and uri like '/administrator%' group by uri order by count desc limit 100").show(false)
May be count group by ip ?
spark.sql("select ip, count(*) as count from AccessLogExt where cast(datetime as date) = '2016-02-18' and uri like '/administrator%' group by ip order by count desc limit 100").show(false)
What is the distribution of count(distinct(ip)) by date around the 2016-02-18 ?
spark.sql("select count(distinct(ip)) as count, cast(datetime as date) as date from AccessLogExt where to_date(datetime) > to_date('2016-02-10') and to_date(datetime) <= to_date('2016-02-28') and uri like '/administrator%' group by cast(datetime as date) order by cast(datetime as date) asc").show(false)
Note : Thanks @ https://docs-snaplogic.atlassian.net/wiki/spaces/SD/pages/2458071/Date+Functions+and+Properties+Spark+SQL for date functions
We just received by a secure channel the list of known suspicious ip of Mr Robot, unfortunately they are encrypted:
// 036c1eb2fe119b31853b8234d29e02898b6dffba => suspicious
// b398b4c034b719a5dd341bf494eb7391e8a0d74f => suspicious
// 552aca3388d939abf1baaf7cf4197eec5a4d880c => not suspicious
Let's create a dataset:
case class SuspiciousIp(sha1: String, suspicious: Boolean)
val suspiciousip = Seq(
SuspiciousIp("036c1eb2fe119b31853b8234d29e02898b6dffba", true),
SuspiciousIp("b398b4c034b719a5dd341bf494eb7391e8a0d74f", true ),
SuspiciousIp("552aca3388d939abf1baaf7cf4197eec5a4d880c", false)).toDF
suspiciousip.show(false)
suspiciousip.createOrReplaceTempView("SuspiciousIp")
How can we find the real IP address ?
val alldistinctip = spark.sql("select distinct(ip) from AccessLog")
alldistinctip.createOrReplaceTempView("DistinctIp")
val clearIp = spark.sql("select DistinctIp.ip, SuspiciousIp.sha1, SuspiciousIp.suspicious from SuspiciousIp left join DistinctIp on sha1(DistinctIp.ip) = SuspiciousIp.sha1")
clearIp.cache
clearIp.show(false)
We can then save the ip address with write:
clearIp.write.csv("clear-ip.csv")
Let's check what happened
Another option:
clearIp.coalesce(1).write.option("header", "true").csv("clean-ip2.csv")
The following file format are supported:
- avro
- parquet
- orc
- text
Daily text log file should be compressed to save I/O, it's not splittable but it's not a problem in this case.
Some information here: http://aseigneurin.github.io/2016/11/08/spark-file-formats-and-storage-options.html
Let's have a look at https://www.slideshare.net/databricks/deep-dive-memory-management-in-apache-spark Slide 48 - 49
Let's have a look at https://fr.slideshare.net/SparkSummit/deep-dive-into-catalyst-apache-spark-20s-optimizer-63071120