Пример запуска задач c Dataset

Пример программы, описанной в данной статье, позволяет проверить работоспособность Spark и показывает как запускать Spark задачи с помощью spark-submit. Программа собирает статистику по криминальной ситуации в разных районах Бостона. Dataset, используемый в примере, можно получить по ссылке.

Dataset

Dataset содержит записи с 14 июня 2015 по 3 сентября 2018 года. Каждая строка представляет запись о преступлении, включая вид преступления, дату, время и место.

Программа принимает следующие файлы на входе:

  • crime.csv — основной файл с записями о преступлениях;

  • offense_codes.csv — файл, содержащий коды преступлений.

 

В результате выполнения программы мы получим следующую информацию:

  • наиболее частые виды преступлений;

  • распределение преступлений по районам;

  • частота совершения преступления по видам.

Данные файлы мы помещаем в директорию с проектом.

Подготовка

Для получения информации по созданию проекта Scala с использованием инструмента сборки Scala обратитесь к документации Scala.

Примечание
Если проект создается локально, не забудьте скопировать jar файл в мастер-хост Spark.

Используя Spark, мы агрегируем все районы (поле district) по следующими метрикам:

  • crimes_total — общее количество преступлений, совершенных в указанном районе.

  • crimes_monthly-- усредненное количество преступлений в месяц, совершенных в указанном районе.

  • frequent_crime_types — три наиболее частых вида преступлений в указанном районе за всю историю наблюдений. Значения разделены запятой и пробелом () и упорядочены по нисходящей от наиболее частых к редким.

  • crime_type — первая часть поля NAME из таблицы offense_codes.csv, отсеченная разделителем -. Например, если поле NAME содержит BURGLARY - COMMERICAL - ATTEMPT, то crime_type определяется как BURGLARY.

  • lat — широта координат района, средняя по всем преступлениям.

  • lng — долгота координат района, средняя по всем преступлениям.

Вы можете использовать следующие примеры кода для сборки Scala проекта.

Основной пример кода Scala
package com.example

import com.example.entity.{Crime, OffenseCode}
import org.apache.spark.sql._
import org.apache.spark.sql.functions._

object BostonCrimesMap extends App {

    val crimeFilepath = args(0)
    val offenseCodesFilePath = args(1)
    val resultFilePath = args(2)

  val spark = SparkSession
    .builder()
    .master("local[*]")
    .appName("boston_crimes")
    .getOrCreate()

  import spark.implicits._

  val crimes = spark
    .read
    .option("header", "true")
    .option("inferSchema", "true")
    .csv(crimeFilepath)
    .as[Crime]


  val offense_codes = spark
    .read
    .option("header", "true")
    .option("inferSchema", "true")
    .csv(offenseCodesFilePath)
    .withColumn("CRIME_TYPE", trim(substring_index($"NAME", "-", 1)))
    .as[OffenseCode]


  val offense_codes_br = spark.sparkContext.broadcast(offense_codes)

  val filteredCrimes = crimes
    .filter($"DISTRICT".isNotNull).cache()

  val crimesWithOffenceCodes = filteredCrimes
    .join(offense_codes_br.value, filteredCrimes("OFFENSE_CODE") === offense_codes_br.value("CODE"))
    .select("INCIDENT_NUMBER", "DISTRICT", "MONTH", "Lat", "Long", "CRIME_TYPE").cache()

  val crimesDistrictAnalytics = filteredCrimes
    .groupBy($"DISTRICT")
    .agg(expr("COUNT(INCIDENT_NUMBER) as crimes_total"),
      expr("AVG(Lat) as lat"),
      expr("AVG(Long) as lng")
    )


  val crimesByDistrictByMonth = filteredCrimes
    .groupBy($"DISTRICT", $"MONTH")
    .agg(expr("count(INCIDENT_NUMBER) as CRIMES_CNT")).createOrReplaceTempView("crimesByDistrictByMonth")

  val crimesDistrictMedian = spark.sql(
    "select " +
      " DISTRICT" +
      " ,percentile(CRIMES_CNT, 0.5) as crimes_monthly " +
      " from crimesByDistrictByMonth" +
      " group by DISTRICT")


  val crimesByDistrictByCrimeTypes = crimesWithOffenceCodes
    .groupBy($"DISTRICT", $"CRIME_TYPE")
    .agg(expr("count(INCIDENT_NUMBER) as CRIMES_CNT"))
    .selectExpr("*", "row_number() over(partition by DISTRICT order by CRIMES_CNT desc) as rn")
    .filter($"rn" <= 3)
    .drop($"rn")
    .drop($"CRIMES_CNT")
    .groupBy($"DISTRICT")
    .agg(concat_ws(", ", collect_list($"CRIME_TYPE")).alias("frequent_crime_types"))


  val finalResult =
    crimesDistrictAnalytics
      .join(crimesDistrictMedian, "DISTRICT")
      .join(crimesByDistrictByCrimeTypes, "DISTRICT")
      .select($"DISTRICT", $"crimes_total", $"crimes_monthly", $"frequent_crime_types", $"lat", $"lng")

  finalResult.repartition(1).write.mode("OVERWRITE").parquet(resultFilePath)


}
Пример build.sbt
lazy val root = (project in file(".")).

  settings(
    inThisBuild(List(
      organization := "com.example",
      scalaVersion := "2.11.8"
    )),
    name := "boston_crimes",
    version := "0.0.1",

    sparkVersion := "2.3.0",
    sparkComponents := Seq(),

    javacOptions ++= Seq("-source", "1.8", "-target", "1.8"),
    javaOptions ++= Seq("-Xms512M", "-Xmx2048M", "-XX:MaxPermSize=2048M", "-XX:+CMSClassUnloadingEnabled"),
    scalacOptions ++= Seq("-deprecation", "-unchecked"),
    parallelExecution in Test := false,
    fork := true,

    coverageHighlighting := true,

    libraryDependencies ++= Seq(
      "org.apache.spark" %% "spark-streaming" % "2.3.0" % "provided",
      "org.apache.spark" %% "spark-sql" % "2.3.0" % "provided",

      "org.scalatest" %% "scalatest" % "3.0.1" % "test",
      "org.scalacheck" %% "scalacheck" % "1.13.4" % "test",
      "com.holdenkarau" %% "spark-testing-base" % "2.3.0_0.9.0" % "test"
    ),

    // uses compile classpath for the run task, including "provided" jar (cf http://stackoverflow.com/a/21803413/3827)
    run in Compile := Defaults.runTask(fullClasspath in Compile, mainClass in (Compile, run), runner in (Compile, run)).evaluated,

    scalacOptions ++= Seq("-deprecation", "-unchecked"),
    pomIncludeRepository := { x => false },

   resolvers ++= Seq(
      "sonatype-releases" at "https://oss.sonatype.org/content/repositories/releases/",
      "Typesafe repository" at "http://repo.typesafe.com/typesafe/releases/",
      "Second Typesafe repo" at "http://repo.typesafe.com/typesafe/maven-releases/",
      Resolver.sonatypeRepo("public")
    ),

    pomIncludeRepository := { x => false },

    // publish settings
    publishTo := {
      val nexus = "https://oss.sonatype.org/"
      if (isSnapshot.value)
        Some("snapshots" at nexus + "content/repositories/snapshots")
      else
        Some("releases"  at nexus + "service/local/staging/deploy/maven2")
    }
  )
Пример класса Crime.scala
package com.example.entity

case class Crime(
                  INCIDENT_NUMBER: Option[String],
                  OFFENSE_CODE: Option[Int],
                  OFFENSE_CODE_GROUP: Option[String],
                  OFFENSE_DESCRIPTION: Option[String],
                  DISTRICT: Option[String],
                  REPORTING_AREA: Option[String],
                  SHOOTING: Option[String],
                  OCCURRED_ON_DATE: Option[String],
                  YEAR: Option[Int],
                  MONTH: Option[Int],
                  DAY_OF_WEEK: Option[String],
                  HOUR: Option[Int],
                  UCR_PART: Option[String],
                  STREET: Option[String],
                  Lat: Option[Double],
                  Long: Option[Double],
                  Location: Option[String]
                )
Пример класса OffenseCode.scala
package com.example.entity

case class OffenseCode(
                      CODE: Option[Int],
                      NAME: Option[String],
                      CRIME_TYPE: Option[String]
                      )

Чтобы построить uber-jar, выполните следующую команду в директории проекта:

$ sbt clean assembly

Запуск приложения

После того, как uber-jar готов, убедитесь, что вы поместили jar в ваш кластер, используя Spark-сервер. В директории проекта запустите программу, используя следующую команду:

$ spark-submit --master local --class com.example.BostonCrimesMap path-to/boston_crimes.jar path-to/crime.csv path-to/offense_codes.csv path-to/result

Просмотр результатов

Все результаты сохраняются в директорию, указанную при вызове команды spark-submit, и представлены в parquet-формате.

Пример результата выполнения
{"DISTRICT":"C6","crimes_total":23460,"crimes_monthly":1870,"frequent_crime_types":"SICK/INJURED/MEDICAL, DRUGS, INVESTIGATE PERSON","lat":42.21212258445548,"lng":-70.85561011772236}
{"DISTRICT":"B2","crimes_total":49945,"crimes_monthly":3986,"frequent_crime_types":"VERBAL DISPUTE, SICK/INJURED/MEDICAL, INVESTIGATE PERSON","lat":42.31600367732765,"lng":-71.0756993065433}
{"DISTRICT":"C11","crimes_total":42530,"crimes_monthly":3284,"frequent_crime_types":"SICK/INJURED/MEDICAL, INVESTIGATE PERSON, VERBAL DISPUTE","lat":42.29263740900067,"lng":-71.05125995734362}
{"DISTRICT":"E13","crimes_total":17536,"crimes_monthly":1368.5,"frequent_crime_types":"SICK/INJURED/MEDICAL, INVESTIGATE PERSON, DRUGS","lat":42.309803655709956,"lng":-71.09800478878388}
{"DISTRICT":"B3","crimes_total":35442,"crimes_monthly":2764,"frequent_crime_types":"VERBAL DISPUTE, INVESTIGATE PERSON, MISSING PERSON","lat":42.283059445201054,"lng":-71.07894914185492}
{"DISTRICT":"E5","crimes_total":13239,"crimes_monthly":1043.5,"frequent_crime_types":"SICK/INJURED/MEDICAL, INVESTIGATE PERSON, PROPERTY","lat":42.19796999447013,"lng":-71.00440862434752}
{"DISTRICT":"A15","crimes_total":6505,"crimes_monthly":499,"frequent_crime_types":"INVESTIGATE PERSON, VANDALISM, SICK/INJURED/MEDICAL","lat":42.17915525091085,"lng":-70.74472508958515}
{"DISTRICT":"A7","crimes_total":13544,"crimes_monthly":1092,"frequent_crime_types":"SICK/INJURED/MEDICAL, INVESTIGATE PERSON, VANDALISM","lat":42.36070260499386,"lng":-71.00394833039842}
{"DISTRICT":"D14","crimes_total":20127,"crimes_monthly":1607.5,"frequent_crime_types":"TOWED MOTOR VEHICLE, SICK/INJURED/MEDICAL, INVESTIGATE PERSON","lat":42.343507245109336,"lng":-71.13125461726499}
{"DISTRICT":"D4","crimes_total":41915,"crimes_monthly":3297.5,"frequent_crime_types":"PROPERTY, INVESTIGATE PERSON, SICK/INJURED/MEDICAL","lat":42.34124251790852,"lng":-71.07725024946983}
{"DISTRICT":"E18","crimes_total":17348,"crimes_monthly":1313,"frequent_crime_types":"SICK/INJURED/MEDICAL, INVESTIGATE PERSON, VERBAL DISPUTE","lat":42.26268061122604,"lng":-71.11891998757692}
{"DISTRICT":"A1","crimes_total":35717,"crimes_monthly":2775,"frequent_crime_types":"PROPERTY, SICK/INJURED/MEDICAL, WARRANT ARREST","lat":42.33123077259828,"lng":-71.01991881362002}
Нашли ошибку? Выделите текст и нажмите Ctrl+Enter чтобы сообщить о ней