Usage example

In this example, we collect crime statistics from different Boston neighborhoods. You can download a dataset from the following source.

Before you start using the Spark connector, you need to find out the following information:

  1. ADB master host name.

  2. ADB master running port.

  3. Name of the database to connect to.

  4. Name of the schema and table to be accessed.

  5. User credentials for connection.

Load a connector

The file name has the following structure:

adb-spark-connector_<scala_version>-<adb-connector-version>.tar.gz

File name example:

adb-spark-connector_2.11-1.0.tar.gz

The package contains a license text file and a JAR file.

spark-shell

You can run Spark via the spark-shell command with the --jars option. That option specifies the path to the connector JAR file:

user@spark-node$ export CONNECTOR_JAR=/path/to/adb-spark-connector_2.11-1.0.jar
user@spark-node$ spark-shell --jars $CONNECTOR_JAR
< ... spark-shell startup output messages ... >
scala>

Create an application

When developing a standalone Spark application, it is necessary to combine the connector JAR file with other application dependencies and put the result into the uber-jar file.

You can use the spark-submit command to launch a Spark application assembled with the connector. You can also use the spark-submit command with the --jar option. More information is available in the Spark documentation.

Connection string for ADB

The connector uses JDBC connection to interact with the ADB master. The PostgreSQL JDBC driver comes together with the connector. You can also use a third-party driver to connect to ADB.

In order to use the connector, you need to specify the JDBC connection string. The connection string should include the ADB master host, its port, and the name of the database to which the connection is being made.

Use PostgreSQL JDBC driver

A JDBC connection string for the standard driver has the following structure:

jdbc:postgresql://<master>[:<port>]/<database_name>

Example:

jdbc:postgresql://adb-master:5432/spark

More information is available in the PostgreSQL documentation.

Use another JDBC driver

Spark connector supports the use of third-party JDBC drivers. In order to use a third-party JDBC driver, do the following:

  1. Create an ODBC connection string for the ADB master.

  2. Submit a JAR file with a third-party JDBC driver via one of the following ways:

    • Use the --jars environment variable when calling spark-shell or spark-submit.

    • Build the uber-jar file with all dependencies.

    • Install a JAR file with a third-party driver on all Spark nodes.

  3. Set the full path to the driver Java class in the connector options.

Code samples

Example of data loading

Specify the connector settings:

scala>   val options = Map(
   |     "spark.adb.url" -> "jdbc:postgresql://10.92.3.151:5432/spark",
   |     "spark.adb.user" -> "spark",
   |     "spark.adb.password" -> "Orion123",
   |     "spark.adb.dbschema" -> "test_data",
   |     "spark.adb.dbtable" -> "crimes"
   |   )

options: scala.collection.immutable.Map[String,String] = Map(spark.adb.password -> Orion123, spark.adb.dbschema -> test_data, spark.adb.dbtable -> crimes, spark.adb.user -> spark, "spark.adb.url" -> jdbc:postgresql://10.92.3.151:5432/spark)

Register a dataframe:

val crimes  = spark
   .read
   .format("adb")
   .options(options)
   .load()
crimes: org.apache.spark.sql.DataFrame = [incident_number: string, offense_code: int ... 15 more fields]

Get a schema:

crimes.printSchema()
scala> crimes.printSchema()
root
|-- incident_number: string (nullable = false)
|-- offense_code: integer (nullable = false)
|-- offense_code_group: string (nullable = true)
|-- offense_description: string (nullable = true)
|-- district: string (nullable = true)
|-- reporting_area: string (nullable = true)
|-- shooting: string (nullable = true)
|-- occurred_on_date: timestamp (nullable = true)
|-- year: integer (nullable = false)
|-- month: integer (nullable = true)
|-- day_of_week: string (nullable = true)
|-- hour: integer (nullable = true)
|-- ucr_part: string (nullable = true)
|-- street: string (nullable = true)
|-- lat: decimal(38,18) (nullable = true)
|-- long: decimal(38,18) (nullable = true)
|-- location: string (nullable = true)

Calculate the number of rows in a dataset:

scala> crimes.count()
res2: Long = 319037

Execute a query with aggregation:

scala> :paste
crimes
   .groupBy($"DISTRICT")
   .agg(expr("COUNT(INCIDENT_NUMBER) as crimes_total"),
      expr("AVG(Lat) as lat"),
      expr("AVG(Long) as lng")
   ).show()

+--------+------------+--------------------+--------------------+
|DISTRICT|crimes_total|                 lat|                 lng|
+--------+------------+--------------------+--------------------+
|      C6|       23456|42.21209922281136...|-70.8555727710997...|
|    null|        1765|25.23950519369345...|-43.4487743870425...|
|      B2|       49939|42.31600291007640...|-71.0756976057897...|
|     C11|       42526|42.29263759987228...|-71.0512586281297...|
|     E13|       17533|42.30980222459151...|-71.0980043154060...|
|      B3|       35439|42.28305741964864...|-71.0789481958754...|
|      E5|       13238|42.19796309046079...|-71.0043988112886...|
|     A15|        6505|42.17915525091085...|-70.7447250895850...|
|      A7|       13544|42.36070260499385...|-71.0039483303984...|
|     D14|       20127|42.34350724510932...|-71.1312546172648...|
|      D4|       41910|42.34124257016540...|-71.0772502980893...|
|     E18|       17347|42.26267985201514...|-71.1189201704008...|
|      A1|       35708|42.33122331388176...|-71.0199061556178...|
+--------+------------+--------------------+--------------------+

You can execute any SQL query through the master as well. To do this, you need to set additional Spark session settings and enable auxiliary functions.

Load data:

spark.stop
val spark2 = SparkSession
      .builder()
      .master("local[*]")
      .appName("spark_example")
      .config("spark.adb.url","jdbc:postgresql://10.92.3.151:5432/spark")
      .config("spark.adb.driver","org.postgresql.Driver")
      .config("spark.adb.user","spark")
      .config("spark.adb.password","Orion123")
      .getOrCreate()
import io.arenadata.spark.adb.implicits._
val crimes2 = spark2.executeAdbSelectQueryOnMaster("select * from test_data.crimes;")
crimes2: org.apache.spark.sql.DataFrame = [incident_number: string, offense_code: int ... 15 more fields]
scala> crimes2.count()
res1: Long = 319037

Create arbitrary table:

spark2.executeAdbQueryOnMaster("create table test_data.test_table_query(id int);")
spark2.executeAdbQueryOnMaster("insert into test_data.test_table_query values(1);")
spark2.executeAdbQueryOnMaster("insert into test_data.test_table_query values(2);")
spark2.executeAdbQueryOnMaster("insert into test_data.test_table_query values(3);")
val test = spark2.executeAdbSelectQueryOnMaster("select * from test_data.test_table_query;")
test: org.apache.spark.sql.DataFrame = [id: int]
test.show(10)
+---+
| id|
+---+
|  2|
|  3|
|  1|
+---+

Example of data writing

Load the finalResult dataset from Spark to ADB:

val finalResult =
   crimesDistrictAnalytics
     .join(crimesDistrictMedian, "DISTRICT")
     .join(crimesByDistrictByCrimeTypes, "DISTRICT")
     .select($"DISTRICT", $"crimes_total", $"crimes_monthly", $"frequent_crime_types", $"lat", $"lng")
     .repartition(4)
     .cache()

scala>   val options = Map(
   |     "spark.adb.url" -> "jdbc:postgresql://10.92.3.151:5432/spark",
   |     "spark.adb.user" -> "spark",
   |     "spark.adb.password" -> "Orion123",
   |     "spark.adb.dbschema" -> "test_data",
   |     "spark.adb.dbtable" -> "crimes_final_result"
   |   )

   options: scala.collection.immutable.Map[String,String] = Map(spark.adb.password -> Orion123, spark.adb.dbschema -> test_data, spark.adb.dbtable -> crimes, spark.adb.user -> spark, "spark.adb.url" -> jdbc:postgresql://10.92.3.151:5432/spark)


   finalResult
      .write
      .format("adb")
      .options(options)
      .mode(SaveMode.Overwrite)
      .save()

Check the data availability in ADB:

spark=#  \d+ test_data.crimes_final_result
                            Table "test_data.crimes_final_result"
      Column        |       Type       | Modifiers | Storage  | Stats target | Description
----------------------+------------------+-----------+----------+--------------+-------------
district             | text             |           | extended |              |
crimes_total         | bigint           | not null  | plain    |              |
crimes_monthly       | double precision |           | plain    |              |
frequent_crime_types | text             | not null  | extended |              |
lat                  | numeric(38,22)   |           | main     |              |
lng                  | numeric(38,22)   |           | main     |              |
Distributed by: (district)

spark=# select * from test_data.crimes_final_result;
district | crimes_total | crimes_monthly |                    frequent_crime_types                    |            lat            |            lng
----------+--------------+----------------+------------------------------------------------------------+---------------------------+----------------------------
B2       |        49939 |           3985 | M/V, M/V ACCIDENT, VERBAL DISPUTE                          | 42.3160029100764052730000 | -71.0756976057897015220000
D14      |        20127 |         1607.5 | TOWED MOTOR VEHICLE, M/V, SICK/INJURED/MEDICAL             | 42.3435072451093210590000 | -71.1312546172648812640000
A15      |         6505 |            499 | M/V ACCIDENT, INVESTIGATE PERSON, M/V                      | 42.1791552509108589950000 | -70.7447250895850891410000
E13      |        17533 |           1368 | SICK/INJURED/MEDICAL, DRUGS, M/V ACCIDENT                  | 42.3098022245915170500000 | -71.0980043154060730210000
A7       |        13544 |           1092 | SICK/INJURED/MEDICAL, DRUGS, INVESTIGATE PERSON            | 42.3607026049938574940000 | -71.0039483303984950860000
E5       |        13238 |         1043.5 | SICK/INJURED/MEDICAL, INVESTIGATE PERSON, M/V ACCIDENT     | 42.1979630904607935020000 | -71.0043988112886597940000
B3       |        35439 |           2764 | VERBAL DISPUTE, INVESTIGATE PERSON, MISSING PERSON         | 42.2830574196486438360000 | -71.0789481958754228740000
E18      |        17347 |           1313 | SICK/INJURED/MEDICAL, M/V ACCIDENT, INVESTIGATE PERSON     | 42.2626798520151451410000 | -71.1189201704008654370000
C11      |        42526 |           3283 | M/V, SICK/INJURED/MEDICAL, INVESTIGATE PERSON              | 42.2926375998722842040000 | -71.0512586281297709920000
A1       |        35708 |           2775 | PROPERTY, ASSAULT SIMPLE, DRUGS                            | 42.3312233138817663820000 | -71.0199061556178000740000
C6       |        23456 |           1870 | DRUGS, SICK/INJURED/MEDICAL, INVESTIGATE PERSON            | 42.2120992228113649950000 | -70.8555727710997195530000
D4       |        41910 |         3297.5 | LARCENY SHOPLIFTING, PROPERTY, LARCENY THEFT FROM BUILDING | 42.3412425701654033500000 | -71.0772502980893959520000
(12 rows)
Found a mistake? Seleсt text and press Ctrl+Enter to report it