-
Notifications
You must be signed in to change notification settings - Fork 7
First version (draft)
Welcome to the spark-hands-on wiki!
Initially Resilient Distributed Dataset And then DataFrame and ultimately DataSet.
Use spark.read, spark.json, spark.parquet depending on the kind of data you want to load.
val logs = spark.read.text("/home/user/Documents/dev/spark/spark-hands-on/almhuette-raith.log")
# Partitioning
When the folder contains key=value, spark automatically manages file partitioning.
/home/user/Documents/dev/spark/spark-hands-on/accesslog
├── day=2019-01-01
│ ├── part-00000-73151710-74b6-4fd5-8608-e3ef7a58bf50-c000.snappy.parquet
│ └── _SUCCESS
├── day=2019-01-02
│ ├── part-00000-6888fd78-3ce7-42de-a82d-85ef4943c83b-c000.snappy.parquet
│ └── _SUCCESS
├── day=2019-01-03
│ ├── part-00000-599d8019-66f6-4fbf-b7e5-a23e6f69b2a9-c000.snappy.parquet
│ └── _SUCCESS
├── day=2019-01-04
│ ├── part-00000-6279d4db-f8dd-44db-930f-a73644bfb411-c000.snappy.parquet
│ └── _SUCCESS
Sparks automatically manages type schema for parquet / avro files:
val df = spark.read.parquet("/home/user/Documents/dev/spark/spark-hands-on/accesslog")
df.printSchema
root
|-- time: string (nullable = true)
|-- ip: string (nullable = true)
|-- uri: string (nullable = true)
|-- cookieId: string (nullable = true)
|-- day: date (nullable = true)
To reference DataFrame:
df.registerTempTable("accesslog")
To reference DataSet:
alllogs.createOrReplaceTempView("AccessLog")
access log data is:
109.169.248.247 - - [12/Dec/2015:18:25:11 +0100] "GET /administrator/ HTTP/1.1" 200 4263 "-" "Mozilla/5.0 (Windows NT 6.0; rv:34.0) Gecko/20100101 Firefox/34.0" "-"
a possible regex:
val R = """^(?<ip>[0-9.]+) (?<identd>[^ ]) (?<user>[^ ]) \[(?<datetime>[^\]]+)\] \"(?<request>[^\"]*)\" (?<status>[^ ]*) (?<size>[^ ]*) \"(?<referer>[^\"]*)\" \"(?<useragent>[^\"]*)\" \"(?<unk>[^\"]*)\"""".r
To parse the data:
import java.time.LocalDateTime
import java.time.ZoneId
import java.time.format.DateTimeFormatter
import java.sql.Timestamp
case class AccessLog(ip: String, ident: String, user: String, timestamp: Timestamp, request: String, status: String, size: String, referer: String, userAgent: String, unk: String)
// https://docs.oracle.com/javase/8/docs/api/index.html?java/time/format/package-summary.html
val formatter = DateTimeFormatter.ofPattern("dd/MMM/yyyy:HH:mm:ss X")
val parisZone = ZoneId.of("Europe/Paris")
object AccessLog {
def apply(s: String): Option[AccessLog]= R.unapplySeq(s) match {
case Some(List(ip, ident, user, datetime, request, status, size, referer, userAgent, unk)) =>
Some(AccessLog(ip, ident, user, Timestamp.valueOf(LocalDateTime.from(formatter.parse(datetime))), request, status, size, referer, userAgent, unk))
case _ => None
}
}
To convert from text to typed object (DataSet)
val alllogs = logs.map(_.getString(0)).flatMap(s => AccessLog.apply(s))
Use directly SQL to query the log file:
spark.sql("select cast(timestamp as date) as date, count(*) as count from AccessLog group by date having count > 1000 order by count desc limit 10")
Use SQL:
spark.sql("select * from AccessLog where cast(timestamp as date) = '2018-05-28'")
logs.map(_.getString(0)).filter(x => AccessLog.apply(x).isEmpty)
http://localhost:4040/jobs/
Define a simple test function:
import org.apache.spark.sql.functions.udf
val addMark = udf { x: String => x + "!" }
spark.sqlContext.udf.register("addMark", addMark)
How to use the function in SQL:
spark.sql("select addMark(day) from accesslog").take(1)
UDF can take multiple parameters:
val toDateTime = udf { (day: String, time: String) => day + "T" + time }
spark.sqlContext.udf.register("toDateTime", toDateTime)
and then:
spark.sql("select toDateTime(day, time) from accesslog").take(1)
UDF can be useful but are no longer considered best practices. A new optimizer called Catalyst cannot work properly when using UDFs.
.withColumn("method", regexp_extract(alllogs("request"), REQ_EX.toString, 1))
.withColumn("uri", regexp_extract(alllogs("request"), REQ_EX.toString, 2))
.withColumn("http", regexp_extract(alllogs("request"), REQ_EX.toString, 3))
extlogs.createOrReplaceTempView("extlogs")
spark.sql("select method, count(*) as count from extlogs group by method having count > 1000")
https://databricks.com/blog/2015/04/13/deep-dive-into-spark-sqls-catalyst-optimizer.html
https://legacy.gitbook.com/book/jaceklaskowski/mastering-spark-sql/details https://towardsdatascience.com/realtime-prediction-using-spark-structured-streaming-xgboost-and-scala-d4869a9a4c66