Skip to content

First version (draft)

Jean-Luc Canela edited this page Mar 20, 2019 · 1 revision

Welcome to the spark-hands-on wiki!

Read data file

Initially Resilient Distributed Dataset And then DataFrame and ultimately DataSet.

Use spark.read, spark.json, spark.parquet depending on the kind of data you want to load.

val logs = spark.read.text("/home/user/Documents/dev/spark/spark-hands-on/almhuette-raith.log")

# Partitioning

When the folder contains key=value, spark automatically manages file partitioning.

/home/user/Documents/dev/spark/spark-hands-on/accesslog
├── day=2019-01-01
│   ├── part-00000-73151710-74b6-4fd5-8608-e3ef7a58bf50-c000.snappy.parquet
│   └── _SUCCESS
├── day=2019-01-02
│   ├── part-00000-6888fd78-3ce7-42de-a82d-85ef4943c83b-c000.snappy.parquet
│   └── _SUCCESS
├── day=2019-01-03
│   ├── part-00000-599d8019-66f6-4fbf-b7e5-a23e6f69b2a9-c000.snappy.parquet
│   └── _SUCCESS
├── day=2019-01-04
│   ├── part-00000-6279d4db-f8dd-44db-930f-a73644bfb411-c000.snappy.parquet
│   └── _SUCCESS

Type Schema

Sparks automatically manages type schema for parquet / avro files:

val df = spark.read.parquet("/home/user/Documents/dev/spark/spark-hands-on/accesslog")
df.printSchema
root
 |-- time: string (nullable = true)
 |-- ip: string (nullable = true)
 |-- uri: string (nullable = true)
 |-- cookieId: string (nullable = true)
 |-- day: date (nullable = true)

Using SQL

To reference DataFrame:

df.registerTempTable("accesslog")

To reference DataSet:

alllogs.createOrReplaceTempView("AccessLog")

Parsing data

access log data is:

109.169.248.247 - - [12/Dec/2015:18:25:11 +0100] "GET /administrator/ HTTP/1.1" 200 4263 "-" "Mozilla/5.0 (Windows NT 6.0; rv:34.0) Gecko/20100101 Firefox/34.0" "-"

a possible regex:

val R = """^(?<ip>[0-9.]+) (?<identd>[^ ]) (?<user>[^ ]) \[(?<datetime>[^\]]+)\] \"(?<request>[^\"]*)\" (?<status>[^ ]*) (?<size>[^ ]*) \"(?<referer>[^\"]*)\" \"(?<useragent>[^\"]*)\" \"(?<unk>[^\"]*)\"""".r

To parse the data:

import java.time.LocalDateTime
import java.time.ZoneId
import java.time.format.DateTimeFormatter

import java.sql.Timestamp

case class AccessLog(ip: String, ident: String, user: String, timestamp: Timestamp, request: String, status: String, size: String, referer: String, userAgent: String, unk: String)

// https://docs.oracle.com/javase/8/docs/api/index.html?java/time/format/package-summary.html
val formatter = DateTimeFormatter.ofPattern("dd/MMM/yyyy:HH:mm:ss X")
val parisZone = ZoneId.of("Europe/Paris")

object AccessLog {

	def apply(s: String): Option[AccessLog]= R.unapplySeq(s) match {
		case Some(List(ip, ident, user, datetime, request, status, size, referer, userAgent, unk)) => 
       Some(AccessLog(ip, ident, user, Timestamp.valueOf(LocalDateTime.from(formatter.parse(datetime))), request, status, size, referer, userAgent, unk))
		case _ => None
	}
}

To convert from text to typed object (DataSet)

val alllogs = logs.map(_.getString(0)).flatMap(s => AccessLog.apply(s))

Find days having the most requests

Use directly SQL to query the log file:

spark.sql("select cast(timestamp as date) as date, count(*) as count from AccessLog group by date having count > 1000 order by count desc limit 10")

Check the access log for a given day

Use SQL:

spark.sql("select * from AccessLog where cast(timestamp as date) = '2018-05-28'")

Find the logs which could not be parsed

logs.map(_.getString(0)).filter(x => AccessLog.apply(x).isEmpty)

Using the Spark tools

http://localhost:4040/jobs/

User defined functions

Define a simple test function:

import org.apache.spark.sql.functions.udf
val addMark = udf { x: String => x + "!" }
spark.sqlContext.udf.register("addMark", addMark)

How to use the function in SQL:

spark.sql("select addMark(day) from accesslog").take(1)

UDF can take multiple parameters:

val toDateTime = udf { (day: String, time: String) => day + "T" + time }
spark.sqlContext.udf.register("toDateTime", toDateTime)

and then:

spark.sql("select toDateTime(day, time) from accesslog").take(1)

UDF can be useful but are no longer considered best practices. A new optimizer called Catalyst cannot work properly when using UDFs.

Extends alllog for inspection

.withColumn("method", regexp_extract(alllogs("request"), REQ_EX.toString, 1))
.withColumn("uri", regexp_extract(alllogs("request"), REQ_EX.toString, 2))
.withColumn("http", regexp_extract(alllogs("request"), REQ_EX.toString, 3))

extlogs.createOrReplaceTempView("extlogs")

spark.sql("select method, count(*) as count from extlogs group by method having count > 1000")

Catalyst optimizer ?

https://databricks.com/blog/2015/04/13/deep-dive-into-spark-sqls-catalyst-optimizer.html

Reference

https://legacy.gitbook.com/book/jaceklaskowski/mastering-spark-sql/details https://towardsdatascience.com/realtime-prediction-using-spark-structured-streaming-xgboost-and-scala-d4869a9a4c66

Clone this wiki locally