Пример использования

В этом примере мы используем статистику о преступлениях в разных районах Бостона. Вы можете загрузить набор данных, используя ссылку.

Перед тем как вы начнёте использовать Spark connector, необходимо выяснить следующую информацию:

  1. Имя хоста ADB master.

  2. Порт, на котором запущен ADB master.

  3. Имя базы данных, к которой нужно подсоединиться.

  4. Имя схемы и таблицы, к которым нужен доступ.

  5. Учётные данные пользователя для соединения.

Загрузка connector

Имя файла имеет следующую структуру:

adb-spark-connector_<scala_version>-<adb-connector-version>.tar.gz

Пример имени файла:

adb-spark-connector_2.11-1.0.tar.gz

Архив содержит текстовый файл лицензии и JAR-файл.

spark-shell

Вы можете запустить Spark, используя команду spark-shell с опцией --jars. Эта опция указывает путь к JAR-файлу connector:

user@spark-node$ export CONNECTOR_JAR=/path/to/adb-spark-connector_2.11-1.0.jar
user@spark-node$ spark-shell --jars $CONNECTOR_JAR
< ... spark-shell startup output messages ... >
scala>

Создание приложения

При разработке отдельного приложения Spark необходимо совместить JAR-файл для connector с другими зависимостями приложения и поместить результат в файл uber-jar.

Вы можете использовать команду spark-submit, чтобы запустить приложение Spark в сборке (assembled) с connector. Вы также можете использовать команду spark-submit с опцией --jar. Больше информации доступно в документации Spark.

Строка подключения для ADB

Для взаимодействия connector и ADB master используется соединение JDBC. Драйвер PostgreSQL JDBC поставляется вместе с connector. Вы также можете использовать сторонний драйвер для подключения к ADB.

Чтобы использовать connector, нужно указать строку подключения JDBC. Эта строка соединения должна включать мастер-хост ADB, его порт и название базы данных, к которой осуществляется подключение.

Использование драйвера PostgreSQL JDBC

Строка подключения JDBC для стандартного драйвера имеет следующую структуру:

jdbc:postgresql://<master>[:<port>]/<database_name>

Пример:

jdbc:postgresql://adb-master:5432/spark

Больше информации доступно в документации PostgreSQL.

Использование другого драйвера JDBC

Spark connector поддерживает использование сторонних драйверов JDBC. Чтобы использовать сторонний драйвер JDBC, сделайте следующее:

  1. Создайте строку для подключения ODBC к ADB master.

  2. Загрузите JAR-файл со сторонним драйвером JDBC одним из следующих способов:

    • Используйте переменную окружения --jars при вызове spark-shell или spark-submit.

    • Выполните билд файла uber-jar со всеми зависимостями.

    • Установите JAR-файл со сторонним драйвером на всех нодах Spark.

  3. Задайте полный путь к Java-классу драйвера, используя опции connector.

Примеры кода

Пример загрузки данных

Укажите настройки connector:

scala>   val options = Map(
   |     "spark.adb.url" -> "jdbc:postgresql://10.92.3.151:5432/spark",
   |     "spark.adb.user" -> "spark",
   |     "spark.adb.password" -> "Orion123",
   |     "spark.adb.dbschema" -> "test_data",
   |     "spark.adb.dbtable" -> "crimes"
   |   )

options: scala.collection.immutable.Map[String,String] = Map(spark.adb.password -> Orion123, spark.adb.dbschema -> test_data, spark.adb.dbtable -> crimes, spark.adb.user -> spark, "spark.adb.url" -> jdbc:postgresql://10.92.3.151:5432/spark)

Выберите dataframe:

val crimes  = spark
   .read
   .format("adb")
   .options(options)
   .load()
crimes: org.apache.spark.sql.DataFrame = [incident_number: string, offense_code: int ... 15 more fields]

Получите схему:

crimes.printSchema()
scala> crimes.printSchema()
root
|-- incident_number: string (nullable = false)
|-- offense_code: integer (nullable = false)
|-- offense_code_group: string (nullable = true)
|-- offense_description: string (nullable = true)
|-- district: string (nullable = true)
|-- reporting_area: string (nullable = true)
|-- shooting: string (nullable = true)
|-- occurred_on_date: timestamp (nullable = true)
|-- year: integer (nullable = false)
|-- month: integer (nullable = true)
|-- day_of_week: string (nullable = true)
|-- hour: integer (nullable = true)
|-- ucr_part: string (nullable = true)
|-- street: string (nullable = true)
|-- lat: decimal(38,18) (nullable = true)
|-- long: decimal(38,18) (nullable = true)
|-- location: string (nullable = true)

Вычислите количество рядов в наборе данных:

scala> crimes.count()
res2: Long = 319037

Выполните агрегированный запрос:

scala> :paste
crimes
   .groupBy($"DISTRICT")
   .agg(expr("COUNT(INCIDENT_NUMBER) as crimes_total"),
      expr("AVG(Lat) as lat"),
      expr("AVG(Long) as lng")
   ).show()

+--------+------------+--------------------+--------------------+
|DISTRICT|crimes_total|                 lat|                 lng|
+--------+------------+--------------------+--------------------+
|      C6|       23456|42.21209922281136...|-70.8555727710997...|
|    null|        1765|25.23950519369345...|-43.4487743870425...|
|      B2|       49939|42.31600291007640...|-71.0756976057897...|
|     C11|       42526|42.29263759987228...|-71.0512586281297...|
|     E13|       17533|42.30980222459151...|-71.0980043154060...|
|      B3|       35439|42.28305741964864...|-71.0789481958754...|
|      E5|       13238|42.19796309046079...|-71.0043988112886...|
|     A15|        6505|42.17915525091085...|-70.7447250895850...|
|      A7|       13544|42.36070260499385...|-71.0039483303984...|
|     D14|       20127|42.34350724510932...|-71.1312546172648...|
|      D4|       41910|42.34124257016540...|-71.0772502980893...|
|     E18|       17347|42.26267985201514...|-71.1189201704008...|
|      A1|       35708|42.33122331388176...|-71.0199061556178...|
+--------+------------+--------------------+--------------------+

Вы также можете выполнить любой запрос SQL через master. Для этого вам нужно дополнительно отредактировать настройки Spark-сессии и включить вспомогательные функции.

Загрузите данные:

spark.stop
val spark2 = SparkSession
      .builder()
      .master("local[*]")
      .appName("spark_example")
      .config("spark.adb.url","jdbc:postgresql://10.92.3.151:5432/spark")
      .config("spark.adb.driver","org.postgresql.Driver")
      .config("spark.adb.user","spark")
      .config("spark.adb.password","Orion123")
      .getOrCreate()
import io.arenadata.spark.adb.implicits._
val crimes2 = spark2.executeAdbSelectQueryOnMaster("select * from test_data.crimes;")
crimes2: org.apache.spark.sql.DataFrame = [incident_number: string, offense_code: int ... 15 more fields]
scala> crimes2.count()
res1: Long = 319037

Создайте произвольную таблицу:

spark2.executeAdbQueryOnMaster("create table test_data.test_table_query(id int);")
spark2.executeAdbQueryOnMaster("insert into test_data.test_table_query values(1);")
spark2.executeAdbQueryOnMaster("insert into test_data.test_table_query values(2);")
spark2.executeAdbQueryOnMaster("insert into test_data.test_table_query values(3);")
val test = spark2.executeAdbSelectQueryOnMaster("select * from test_data.test_table_query;")
test: org.apache.spark.sql.DataFrame = [id: int]
test.show(10)
+---+
| id|
+---+
|  2|
|  3|
|  1|
+---+

Пример записи данных

Загрузите набор данных finalResult из Spark в ADB:

val finalResult =
   crimesDistrictAnalytics
     .join(crimesDistrictMedian, "DISTRICT")
     .join(crimesByDistrictByCrimeTypes, "DISTRICT")
     .select($"DISTRICT", $"crimes_total", $"crimes_monthly", $"frequent_crime_types", $"lat", $"lng")
     .repartition(4)
     .cache()

scala>   val options = Map(
   |     "spark.adb.url" -> "jdbc:postgresql://10.92.3.151:5432/spark",
   |     "spark.adb.user" -> "spark",
   |     "spark.adb.password" -> "Orion123",
   |     "spark.adb.dbschema" -> "test_data",
   |     "spark.adb.dbtable" -> "crimes_final_result"
   |   )

   options: scala.collection.immutable.Map[String,String] = Map(spark.adb.password -> Orion123, spark.adb.dbschema -> test_data, spark.adb.dbtable -> crimes, spark.adb.user -> spark, "spark.adb.url" -> jdbc:postgresql://10.92.3.151:5432/spark)


   finalResult
      .write
      .format("adb")
      .options(options)
      .mode(SaveMode.Overwrite)
      .save()

Проверьте доступность данных в ADB:

spark=#  \d+ test_data.crimes_final_result
                            Table "test_data.crimes_final_result"
      Column        |       Type       | Modifiers | Storage  | Stats target | Description
----------------------+------------------+-----------+----------+--------------+-------------
district             | text             |           | extended |              |
crimes_total         | bigint           | not null  | plain    |              |
crimes_monthly       | double precision |           | plain    |              |
frequent_crime_types | text             | not null  | extended |              |
lat                  | numeric(38,22)   |           | main     |              |
lng                  | numeric(38,22)   |           | main     |              |
Distributed by: (district)

spark=# select * from test_data.crimes_final_result;
district | crimes_total | crimes_monthly |                    frequent_crime_types                    |            lat            |            lng
----------+--------------+----------------+------------------------------------------------------------+---------------------------+----------------------------
B2       |        49939 |           3985 | M/V, M/V ACCIDENT, VERBAL DISPUTE                          | 42.3160029100764052730000 | -71.0756976057897015220000
D14      |        20127 |         1607.5 | TOWED MOTOR VEHICLE, M/V, SICK/INJURED/MEDICAL             | 42.3435072451093210590000 | -71.1312546172648812640000
A15      |         6505 |            499 | M/V ACCIDENT, INVESTIGATE PERSON, M/V                      | 42.1791552509108589950000 | -70.7447250895850891410000
E13      |        17533 |           1368 | SICK/INJURED/MEDICAL, DRUGS, M/V ACCIDENT                  | 42.3098022245915170500000 | -71.0980043154060730210000
A7       |        13544 |           1092 | SICK/INJURED/MEDICAL, DRUGS, INVESTIGATE PERSON            | 42.3607026049938574940000 | -71.0039483303984950860000
E5       |        13238 |         1043.5 | SICK/INJURED/MEDICAL, INVESTIGATE PERSON, M/V ACCIDENT     | 42.1979630904607935020000 | -71.0043988112886597940000
B3       |        35439 |           2764 | VERBAL DISPUTE, INVESTIGATE PERSON, MISSING PERSON         | 42.2830574196486438360000 | -71.0789481958754228740000
E18      |        17347 |           1313 | SICK/INJURED/MEDICAL, M/V ACCIDENT, INVESTIGATE PERSON     | 42.2626798520151451410000 | -71.1189201704008654370000
C11      |        42526 |           3283 | M/V, SICK/INJURED/MEDICAL, INVESTIGATE PERSON              | 42.2926375998722842040000 | -71.0512586281297709920000
A1       |        35708 |           2775 | PROPERTY, ASSAULT SIMPLE, DRUGS                            | 42.3312233138817663820000 | -71.0199061556178000740000
C6       |        23456 |           1870 | DRUGS, SICK/INJURED/MEDICAL, INVESTIGATE PERSON            | 42.2120992228113649950000 | -70.8555727710997195530000
D4       |        41910 |         3297.5 | LARCENY SHOPLIFTING, PROPERTY, LARCENY THEFT FROM BUILDING | 42.3412425701654033500000 | -71.0772502980893959520000
(12 rows)
Нашли ошибку? Выделите текст и нажмите Ctrl+Enter чтобы сообщить о ней