First version (draft)

Read data file

Initially Resilient Distributed Dataset And then DataFrame and ultimately DataSet.

Use, spark.json, spark.parquet depending on the kind of data you want to load.

val logs ="/home/user/Documents/dev/spark/spark-hands-on/almhuette-raith.log")

# Partitioning

When the folder contains key=value, spark automatically manages file partitioning.

├── day=2019-01-01
│   ├── part-00000-73151710-74b6-4fd5-8608-e3ef7a58bf50-c000.snappy.parquet
│   └── _SUCCESS
├── day=2019-01-02
│   ├── part-00000-6888fd78-3ce7-42de-a82d-85ef4943c83b-c000.snappy.parquet
│   └── _SUCCESS
├── day=2019-01-03
│   ├── part-00000-599d8019-66f6-4fbf-b7e5-a23e6f69b2a9-c000.snappy.parquet
│   └── _SUCCESS
├── day=2019-01-04
│   ├── part-00000-6279d4db-f8dd-44db-930f-a73644bfb411-c000.snappy.parquet
│   └── _SUCCESS

Type Schema

Sparks automatically manages type schema for parquet / avro files:

val df ="/home/user/Documents/dev/spark/spark-hands-on/accesslog")
 |-- time: string (nullable = true)
 |-- ip: string (nullable = true)
 |-- uri: string (nullable = true)
 |-- cookieId: string (nullable = true)
 |-- day: date (nullable = true)

Using SQL

Parsing data

access log data is: - - [12/Dec/2015:18:25:11 +0100] "GET /administrator/ HTTP/1.1" 200 4263 "-" "Mozilla/5.0 (Windows NT 6.0; rv:34.0) Gecko/20100101 Firefox/34.0" "-"

a possible regex:

val R = """^(?<ip>[0-9.]+) (?<identd>[^ ]) (?<user>[^ ]) \[(?<datetime>[^\]]+)\] \"(?<request>[^\"]*)\" (?<status>[^ ]*) (?<size>[^ ]*) \"(?<referer>[^\"]*)\" \"(?<useragent>[^\"]*)\" \"(?<unk>[^\"]*)\"""".r

To parse the data:

import java.time.LocalDateTime
import java.time.ZoneId
import java.time.format.DateTimeFormatter

import java.sql.Timestamp

case class AccessLog(ip: String, ident: String, user: String, timestamp: Timestamp, request: String, status: String, size: String, referer: String, userAgent: String, unk: String)

val formatter = DateTimeFormatter.ofPattern("dd/MMM/yyyy:HH:mm:ss X")
val parisZone = ZoneId.of("Europe/Paris")

object AccessLog {

	def apply(s: String): Option[AccessLog]= R.unapplySeq(s) match {
		case Some(List(ip, ident, user, datetime, request, status, size, referer, userAgent, unk)) => 
       Some(AccessLog(ip, ident, user, Timestamp.valueOf(LocalDateTime.from(formatter.parse(datetime))), request, status, size, referer, userAgent, unk))
		case _ => None

To convert from text to typed object (DataSet)

val alllogs = => AccessLog.apply(s))

Find days having the most requests

Use directly SQL to query the log file:

spark.sql("select cast(timestamp as date) as date, count(*) as count from AccessLog group by date having count > 1000 order by count desc limit 10")

Check the access log for a given day

Use SQL:

spark.sql("select * from AccessLog where cast(timestamp as date) = '2018-05-28'")

Find the logs which could not be parsed => AccessLog.apply(x).isEmpty)

User defined functions

Define a simple test function:

import org.apache.spark.sql.functions.udf
val addMark = udf { x: String => x + "!" }
spark.sqlContext.udf.register("addMark", addMark)

How to use the function in SQL:

spark.sql("select addMark(day) from accesslog").take(1)

UDF can take multiple parameters:

val toDateTime = udf { (day: String, time: String) => day + "T" + time }
spark.sqlContext.udf.register("toDateTime", toDateTime)

and then:

spark.sql("select toDateTime(day, time) from accesslog").take(1)

UDF can be useful but are no longer considered best practices. A new optimizer called Catalyst cannot work properly when using UDFs.

Extends alllog for inspection

.withColumn("method", regexp_extract(alllogs("request"), REQ_EX.toString, 1))
.withColumn("uri", regexp_extract(alllogs("request"), REQ_EX.toString, 2))
.withColumn("http", regexp_extract(alllogs("request"), REQ_EX.toString, 3))


spark.sql("select method, count(*) as count from extlogs group by method having count > 1000")

