Пример запуска задач c Dataset
Пример программы, описанной в данной статье, позволяет проверить работоспособность Spark и показывает как запускать Spark задачи с помощью spark-submit
.
Программа собирает статистику по криминальной ситуации в разных районах Бостона.
Dataset, используемый в примере, можно получить по ссылке.
Dataset
Dataset содержит записи с 14 июня 2015 по 3 сентября 2018 года. Каждая строка представляет запись о преступлении, включая вид преступления, дату, время и место.
Программа принимает следующие файлы на входе:
-
crime.csv — основной файл с записями о преступлениях;
-
offense_codes.csv — файл, содержащий коды преступлений.
В результате выполнения программы мы получим следующую информацию:
-
наиболее частые виды преступлений;
-
распределение преступлений по районам;
-
частота совершения преступления по видам.
Данные файлы мы помещаем в директорию с проектом.
Подготовка
Для получения информации по созданию проекта Scala с использованием инструмента сборки Scala обратитесь к документации Scala.
Примечание
Если проект создается локально, не забудьте скопировать jar файл в мастер-хост Spark.
|
Используя Spark, мы агрегируем все районы (поле district
) по следующими метрикам:
-
crimes_total
— общее количество преступлений, совершенных в указанном районе. -
crimes_monthly
-- усредненное количество преступлений в месяц, совершенных в указанном районе. -
frequent_crime_types
— три наиболее частых вида преступлений в указанном районе за всю историю наблюдений. Значения разделены запятой и пробелом (,
) и упорядочены по нисходящей от наиболее частых к редким. -
crime_type
— первая часть поляNAME
из таблицы offense_codes.csv, отсеченная разделителем-
. Например, если полеNAME
содержитBURGLARY - COMMERICAL - ATTEMPT
, тоcrime_type
определяется какBURGLARY
. -
lat
— широта координат района, средняя по всем преступлениям. -
lng
— долгота координат района, средняя по всем преступлениям.
Вы можете использовать следующие примеры кода для сборки Scala проекта.
package com.example
import com.example.entity.{Crime, OffenseCode}
import org.apache.spark.sql._
import org.apache.spark.sql.functions._
object BostonCrimesMap extends App {
val crimeFilepath = args(0)
val offenseCodesFilePath = args(1)
val resultFilePath = args(2)
val spark = SparkSession
.builder()
.master("local[*]")
.appName("boston_crimes")
.getOrCreate()
import spark.implicits._
val crimes = spark
.read
.option("header", "true")
.option("inferSchema", "true")
.csv(crimeFilepath)
.as[Crime]
val offense_codes = spark
.read
.option("header", "true")
.option("inferSchema", "true")
.csv(offenseCodesFilePath)
.withColumn("CRIME_TYPE", trim(substring_index($"NAME", "-", 1)))
.as[OffenseCode]
val offense_codes_br = spark.sparkContext.broadcast(offense_codes)
val filteredCrimes = crimes
.filter($"DISTRICT".isNotNull).cache()
val crimesWithOffenceCodes = filteredCrimes
.join(offense_codes_br.value, filteredCrimes("OFFENSE_CODE") === offense_codes_br.value("CODE"))
.select("INCIDENT_NUMBER", "DISTRICT", "MONTH", "Lat", "Long", "CRIME_TYPE").cache()
val crimesDistrictAnalytics = filteredCrimes
.groupBy($"DISTRICT")
.agg(expr("COUNT(INCIDENT_NUMBER) as crimes_total"),
expr("AVG(Lat) as lat"),
expr("AVG(Long) as lng")
)
val crimesByDistrictByMonth = filteredCrimes
.groupBy($"DISTRICT", $"MONTH")
.agg(expr("count(INCIDENT_NUMBER) as CRIMES_CNT")).createOrReplaceTempView("crimesByDistrictByMonth")
val crimesDistrictMedian = spark.sql(
"select " +
" DISTRICT" +
" ,percentile(CRIMES_CNT, 0.5) as crimes_monthly " +
" from crimesByDistrictByMonth" +
" group by DISTRICT")
val crimesByDistrictByCrimeTypes = crimesWithOffenceCodes
.groupBy($"DISTRICT", $"CRIME_TYPE")
.agg(expr("count(INCIDENT_NUMBER) as CRIMES_CNT"))
.selectExpr("*", "row_number() over(partition by DISTRICT order by CRIMES_CNT desc) as rn")
.filter($"rn" <= 3)
.drop($"rn")
.drop($"CRIMES_CNT")
.groupBy($"DISTRICT")
.agg(concat_ws(", ", collect_list($"CRIME_TYPE")).alias("frequent_crime_types"))
val finalResult =
crimesDistrictAnalytics
.join(crimesDistrictMedian, "DISTRICT")
.join(crimesByDistrictByCrimeTypes, "DISTRICT")
.select($"DISTRICT", $"crimes_total", $"crimes_monthly", $"frequent_crime_types", $"lat", $"lng")
finalResult.repartition(1).write.mode("OVERWRITE").parquet(resultFilePath)
}
lazy val root = (project in file(".")).
settings(
inThisBuild(List(
organization := "com.example",
scalaVersion := "2.11.8"
)),
name := "boston_crimes",
version := "0.0.1",
sparkVersion := "2.3.0",
sparkComponents := Seq(),
javacOptions ++= Seq("-source", "1.8", "-target", "1.8"),
javaOptions ++= Seq("-Xms512M", "-Xmx2048M", "-XX:MaxPermSize=2048M", "-XX:+CMSClassUnloadingEnabled"),
scalacOptions ++= Seq("-deprecation", "-unchecked"),
parallelExecution in Test := false,
fork := true,
coverageHighlighting := true,
libraryDependencies ++= Seq(
"org.apache.spark" %% "spark-streaming" % "2.3.0" % "provided",
"org.apache.spark" %% "spark-sql" % "2.3.0" % "provided",
"org.scalatest" %% "scalatest" % "3.0.1" % "test",
"org.scalacheck" %% "scalacheck" % "1.13.4" % "test",
"com.holdenkarau" %% "spark-testing-base" % "2.3.0_0.9.0" % "test"
),
// uses compile classpath for the run task, including "provided" jar (cf http://stackoverflow.com/a/21803413/3827)
run in Compile := Defaults.runTask(fullClasspath in Compile, mainClass in (Compile, run), runner in (Compile, run)).evaluated,
scalacOptions ++= Seq("-deprecation", "-unchecked"),
pomIncludeRepository := { x => false },
resolvers ++= Seq(
"sonatype-releases" at "https://oss.sonatype.org/content/repositories/releases/",
"Typesafe repository" at "http://repo.typesafe.com/typesafe/releases/",
"Second Typesafe repo" at "http://repo.typesafe.com/typesafe/maven-releases/",
Resolver.sonatypeRepo("public")
),
pomIncludeRepository := { x => false },
// publish settings
publishTo := {
val nexus = "https://oss.sonatype.org/"
if (isSnapshot.value)
Some("snapshots" at nexus + "content/repositories/snapshots")
else
Some("releases" at nexus + "service/local/staging/deploy/maven2")
}
)
package com.example.entity
case class Crime(
INCIDENT_NUMBER: Option[String],
OFFENSE_CODE: Option[Int],
OFFENSE_CODE_GROUP: Option[String],
OFFENSE_DESCRIPTION: Option[String],
DISTRICT: Option[String],
REPORTING_AREA: Option[String],
SHOOTING: Option[String],
OCCURRED_ON_DATE: Option[String],
YEAR: Option[Int],
MONTH: Option[Int],
DAY_OF_WEEK: Option[String],
HOUR: Option[Int],
UCR_PART: Option[String],
STREET: Option[String],
Lat: Option[Double],
Long: Option[Double],
Location: Option[String]
)
package com.example.entity
case class OffenseCode(
CODE: Option[Int],
NAME: Option[String],
CRIME_TYPE: Option[String]
)
Чтобы построить uber-jar, выполните следующую команду в директории проекта:
$ sbt clean assembly
Запуск приложения
После того, как uber-jar готов, убедитесь, что вы поместили jar в ваш кластер, используя Spark-сервер. В директории проекта запустите программу, используя следующую команду:
$ spark-submit --master local --class com.example.BostonCrimesMap path-to/boston_crimes.jar path-to/crime.csv path-to/offense_codes.csv path-to/result
Просмотр результатов
Все результаты сохраняются в директорию, указанную при вызове команды spark-submit
, и представлены в parquet-формате.
{"DISTRICT":"C6","crimes_total":23460,"crimes_monthly":1870,"frequent_crime_types":"SICK/INJURED/MEDICAL, DRUGS, INVESTIGATE PERSON","lat":42.21212258445548,"lng":-70.85561011772236} {"DISTRICT":"B2","crimes_total":49945,"crimes_monthly":3986,"frequent_crime_types":"VERBAL DISPUTE, SICK/INJURED/MEDICAL, INVESTIGATE PERSON","lat":42.31600367732765,"lng":-71.0756993065433} {"DISTRICT":"C11","crimes_total":42530,"crimes_monthly":3284,"frequent_crime_types":"SICK/INJURED/MEDICAL, INVESTIGATE PERSON, VERBAL DISPUTE","lat":42.29263740900067,"lng":-71.05125995734362} {"DISTRICT":"E13","crimes_total":17536,"crimes_monthly":1368.5,"frequent_crime_types":"SICK/INJURED/MEDICAL, INVESTIGATE PERSON, DRUGS","lat":42.309803655709956,"lng":-71.09800478878388} {"DISTRICT":"B3","crimes_total":35442,"crimes_monthly":2764,"frequent_crime_types":"VERBAL DISPUTE, INVESTIGATE PERSON, MISSING PERSON","lat":42.283059445201054,"lng":-71.07894914185492} {"DISTRICT":"E5","crimes_total":13239,"crimes_monthly":1043.5,"frequent_crime_types":"SICK/INJURED/MEDICAL, INVESTIGATE PERSON, PROPERTY","lat":42.19796999447013,"lng":-71.00440862434752} {"DISTRICT":"A15","crimes_total":6505,"crimes_monthly":499,"frequent_crime_types":"INVESTIGATE PERSON, VANDALISM, SICK/INJURED/MEDICAL","lat":42.17915525091085,"lng":-70.74472508958515} {"DISTRICT":"A7","crimes_total":13544,"crimes_monthly":1092,"frequent_crime_types":"SICK/INJURED/MEDICAL, INVESTIGATE PERSON, VANDALISM","lat":42.36070260499386,"lng":-71.00394833039842} {"DISTRICT":"D14","crimes_total":20127,"crimes_monthly":1607.5,"frequent_crime_types":"TOWED MOTOR VEHICLE, SICK/INJURED/MEDICAL, INVESTIGATE PERSON","lat":42.343507245109336,"lng":-71.13125461726499} {"DISTRICT":"D4","crimes_total":41915,"crimes_monthly":3297.5,"frequent_crime_types":"PROPERTY, INVESTIGATE PERSON, SICK/INJURED/MEDICAL","lat":42.34124251790852,"lng":-71.07725024946983} {"DISTRICT":"E18","crimes_total":17348,"crimes_monthly":1313,"frequent_crime_types":"SICK/INJURED/MEDICAL, INVESTIGATE PERSON, VERBAL DISPUTE","lat":42.26268061122604,"lng":-71.11891998757692} {"DISTRICT":"A1","crimes_total":35717,"crimes_monthly":2775,"frequent_crime_types":"PROPERTY, SICK/INJURED/MEDICAL, WARRANT ARREST","lat":42.33123077259828,"lng":-71.01991881362002}