Example of launching jobs with a Dataset
You can use this sample program to validate your Spark framework and to explore how to run Spark jobs using spark-submit
. In this sample program, we collect statistics on the criminal situation in different areas of Boston. The dataset used in the example can be downloaded from this dataset source.
Dataset
The dataset includes records starting from June 14, 2015 up to September 3, 2018. Each row represents a crime report including the type of crime, date and time, and location.
We have two files for processing:
-
crime.csv — the main file with crime records;
-
offense_codes.csv — the data file that contains crime codes.
After the processing we get the following information:
-
the most common types of crime;
-
the distribution of crimes by districts;
-
the frequency of different types of crimes.
Preparations
You can read Scala documentation on the creation of a Scala project using the Scala build tool. You can build the project on your cluster or on any other machine.
NOTE
If you create the project locally, do not forget to copy the jar file to the Spark master host.
|
Using Spark, we aggregate around all districts (district
field) with the following metrics:
-
crimes_total
— the total crimes number in the specified district. -
crimes_monthly
— the median crimes number per month in the specified district. -
frequent_crime_types
— the three most frequent crime types in the entire history of observations in the specified district, separated by a comma with a single space (,
), arranged in the descending order of frequency. -
crime_type
— the first part of theNAME
from the offense_codes.csv table, split by the-
separator (for example, ifNAME
isBURGLARY - COMMERICAL - ATTEMPT
, thencrime_type
isBURGLARY
). -
lat
— the latitude of the district coordinates, average for all incidents. -
lng
— the longitude of the district coordinates, average for all incidents.
You can use the following code samples to build your own Scala project.
package com.example
import com.example.entity.{Crime, OffenseCode}
import org.apache.spark.sql._
import org.apache.spark.sql.functions._
object BostonCrimesMap extends App {
val crimeFilepath = args(0)
val offenseCodesFilePath = args(1)
val resultFilePath = args(2)
val spark = SparkSession
.builder()
.master("local[*]")
.appName("boston_crimes")
.getOrCreate()
import spark.implicits._
val crimes = spark
.read
.option("header", "true")
.option("inferSchema", "true")
.csv(crimeFilepath)
.as[Crime]
val offense_codes = spark
.read
.option("header", "true")
.option("inferSchema", "true")
.csv(offenseCodesFilePath)
.withColumn("CRIME_TYPE", trim(substring_index($"NAME", "-", 1)))
.as[OffenseCode]
val offense_codes_br = spark.sparkContext.broadcast(offense_codes)
val filteredCrimes = crimes
.filter($"DISTRICT".isNotNull).cache()
val crimesWithOffenceCodes = filteredCrimes
.join(offense_codes_br.value, filteredCrimes("OFFENSE_CODE") === offense_codes_br.value("CODE"))
.select("INCIDENT_NUMBER", "DISTRICT", "MONTH", "Lat", "Long", "CRIME_TYPE").cache()
val crimesDistrictAnalytics = filteredCrimes
.groupBy($"DISTRICT")
.agg(expr("COUNT(INCIDENT_NUMBER) as crimes_total"),
expr("AVG(Lat) as lat"),
expr("AVG(Long) as lng")
)
val crimesByDistrictByMonth = filteredCrimes
.groupBy($"DISTRICT", $"MONTH")
.agg(expr("count(INCIDENT_NUMBER) as CRIMES_CNT")).createOrReplaceTempView("crimesByDistrictByMonth")
val crimesDistrictMedian = spark.sql(
"select " +
" DISTRICT" +
" ,percentile(CRIMES_CNT, 0.5) as crimes_monthly " +
" from crimesByDistrictByMonth" +
" group by DISTRICT")
val crimesByDistrictByCrimeTypes = crimesWithOffenceCodes
.groupBy($"DISTRICT", $"CRIME_TYPE")
.agg(expr("count(INCIDENT_NUMBER) as CRIMES_CNT"))
.selectExpr("*", "row_number() over(partition by DISTRICT order by CRIMES_CNT desc) as rn")
.filter($"rn" <= 3)
.drop($"rn")
.drop($"CRIMES_CNT")
.groupBy($"DISTRICT")
.agg(concat_ws(", ", collect_list($"CRIME_TYPE")).alias("frequent_crime_types"))
val finalResult =
crimesDistrictAnalytics
.join(crimesDistrictMedian, "DISTRICT")
.join(crimesByDistrictByCrimeTypes, "DISTRICT")
.select($"DISTRICT", $"crimes_total", $"crimes_monthly", $"frequent_crime_types", $"lat", $"lng")
finalResult.repartition(1).write.mode("OVERWRITE").parquet(resultFilePath)
}
lazy val root = (project in file(".")).
settings(
inThisBuild(List(
organization := "com.example",
scalaVersion := "2.11.8"
)),
name := "boston_crimes",
version := "0.0.1",
sparkVersion := "2.3.0",
sparkComponents := Seq(),
javacOptions ++= Seq("-source", "1.8", "-target", "1.8"),
javaOptions ++= Seq("-Xms512M", "-Xmx2048M", "-XX:MaxPermSize=2048M", "-XX:+CMSClassUnloadingEnabled"),
scalacOptions ++= Seq("-deprecation", "-unchecked"),
parallelExecution in Test := false,
fork := true,
coverageHighlighting := true,
libraryDependencies ++= Seq(
"org.apache.spark" %% "spark-streaming" % "2.3.0" % "provided",
"org.apache.spark" %% "spark-sql" % "2.3.0" % "provided",
"org.scalatest" %% "scalatest" % "3.0.1" % "test",
"org.scalacheck" %% "scalacheck" % "1.13.4" % "test",
"com.holdenkarau" %% "spark-testing-base" % "2.3.0_0.9.0" % "test"
),
// uses compile classpath for the run task, including "provided" jar (cf http://stackoverflow.com/a/21803413/3827)
run in Compile := Defaults.runTask(fullClasspath in Compile, mainClass in (Compile, run), runner in (Compile, run)).evaluated,
scalacOptions ++= Seq("-deprecation", "-unchecked"),
pomIncludeRepository := { x => false },
resolvers ++= Seq(
"sonatype-releases" at "https://oss.sonatype.org/content/repositories/releases/",
"Typesafe repository" at "http://repo.typesafe.com/typesafe/releases/",
"Second Typesafe repo" at "http://repo.typesafe.com/typesafe/maven-releases/",
Resolver.sonatypeRepo("public")
),
pomIncludeRepository := { x => false },
// publish settings
publishTo := {
val nexus = "https://oss.sonatype.org/"
if (isSnapshot.value)
Some("snapshots" at nexus + "content/repositories/snapshots")
else
Some("releases" at nexus + "service/local/staging/deploy/maven2")
}
)
package com.example.entity
case class Crime(
INCIDENT_NUMBER: Option[String],
OFFENSE_CODE: Option[Int],
OFFENSE_CODE_GROUP: Option[String],
OFFENSE_DESCRIPTION: Option[String],
DISTRICT: Option[String],
REPORTING_AREA: Option[String],
SHOOTING: Option[String],
OCCURRED_ON_DATE: Option[String],
YEAR: Option[Int],
MONTH: Option[Int],
DAY_OF_WEEK: Option[String],
HOUR: Option[Int],
UCR_PART: Option[String],
STREET: Option[String],
Lat: Option[Double],
Long: Option[Double],
Location: Option[String]
)
package com.example.entity
case class OffenseCode(
CODE: Option[Int],
NAME: Option[String],
CRIME_TYPE: Option[String]
)
Build the uber-jar in the project folder using the command:
$ sbt clean assembly
Launching calculation
After you get the uber-jar, ensure that you have placed this file to your cluster using the Spark server. In the project directory, launch the calculation with the following command:
$ spark-submit --master local --class com.example.BostonCrimesMap path-to/boston_crimes.jar path-to/crime.csv path-to/offense_codes.csv path-to/result
View results
All results are written to the directory that you have specified in the spark-submit
command.
It is represented in a parquet format.
Use regular commands to see the result.
{"DISTRICT":"C6","crimes_total":23460,"crimes_monthly":1870,"frequent_crime_types":"SICK/INJURED/MEDICAL, DRUGS, INVESTIGATE PERSON","lat":42.21212258445548,"lng":-70.85561011772236} {"DISTRICT":"B2","crimes_total":49945,"crimes_monthly":3986,"frequent_crime_types":"VERBAL DISPUTE, SICK/INJURED/MEDICAL, INVESTIGATE PERSON","lat":42.31600367732765,"lng":-71.0756993065433} {"DISTRICT":"C11","crimes_total":42530,"crimes_monthly":3284,"frequent_crime_types":"SICK/INJURED/MEDICAL, INVESTIGATE PERSON, VERBAL DISPUTE","lat":42.29263740900067,"lng":-71.05125995734362} {"DISTRICT":"E13","crimes_total":17536,"crimes_monthly":1368.5,"frequent_crime_types":"SICK/INJURED/MEDICAL, INVESTIGATE PERSON, DRUGS","lat":42.309803655709956,"lng":-71.09800478878388} {"DISTRICT":"B3","crimes_total":35442,"crimes_monthly":2764,"frequent_crime_types":"VERBAL DISPUTE, INVESTIGATE PERSON, MISSING PERSON","lat":42.283059445201054,"lng":-71.07894914185492} {"DISTRICT":"E5","crimes_total":13239,"crimes_monthly":1043.5,"frequent_crime_types":"SICK/INJURED/MEDICAL, INVESTIGATE PERSON, PROPERTY","lat":42.19796999447013,"lng":-71.00440862434752} {"DISTRICT":"A15","crimes_total":6505,"crimes_monthly":499,"frequent_crime_types":"INVESTIGATE PERSON, VANDALISM, SICK/INJURED/MEDICAL","lat":42.17915525091085,"lng":-70.74472508958515} {"DISTRICT":"A7","crimes_total":13544,"crimes_monthly":1092,"frequent_crime_types":"SICK/INJURED/MEDICAL, INVESTIGATE PERSON, VANDALISM","lat":42.36070260499386,"lng":-71.00394833039842} {"DISTRICT":"D14","crimes_total":20127,"crimes_monthly":1607.5,"frequent_crime_types":"TOWED MOTOR VEHICLE, SICK/INJURED/MEDICAL, INVESTIGATE PERSON","lat":42.343507245109336,"lng":-71.13125461726499} {"DISTRICT":"D4","crimes_total":41915,"crimes_monthly":3297.5,"frequent_crime_types":"PROPERTY, INVESTIGATE PERSON, SICK/INJURED/MEDICAL","lat":42.34124251790852,"lng":-71.07725024946983} {"DISTRICT":"E18","crimes_total":17348,"crimes_monthly":1313,"frequent_crime_types":"SICK/INJURED/MEDICAL, INVESTIGATE PERSON, VERBAL DISPUTE","lat":42.26268061122604,"lng":-71.11891998757692} {"DISTRICT":"A1","crimes_total":35717,"crimes_monthly":2775,"frequent_crime_types":"PROPERTY, SICK/INJURED/MEDICAL, WARRANT ARREST","lat":42.33123077259828,"lng":-71.01991881362002}