Usage example

In this sample program, we collect statistics on the criminal situation in different areas of Boston. The fresh dataset you can download from this dataset source.

Before you start using the Spark connector, you need to clarify the following information:

  1. The host name of the ADB master.

  2. The port on which the ADD master is running.

  3. Name of the database to connect to.

  4. Name of the schema and table to be accessed.

  5. User credentials for connection.

Connector loading

The file has the following name form:

adb-spark-connector_<scala_version>-<adb-connector-version>.tar.gz
Name example
adb-spark-connector_2.11-1.0.tar.gz

The package contains a license text file and a jar file.

Spark-shell

You can run Spark interactively using spark-shell command with the command-line option --jars, which specifies the path to the connector jar file:

Running the connector
user@spark-node$ export CONNECTOR_JAR=/path/to/adb-spark-connector_2.11-1.0.jar
user@spark-node$ spark-shell --jars $CONNECTOR_JAR
< ... spark-shell startup output messages ... >
scala>

Creating an app

When developing a standalone Spark application, it is necessary to combine the connector jar file together with other application dependencies into an uber-jar file.

You can use the spark-submit command to launch a Spark application assembled with the connector. You can also use the spark-submit command with the command line option --jar with the path to the connector jar file. You can read all info about this topic at the Spark official docs.

Connection string to ADB

The connector uses a JDBC connection to interact with the ADB master. The PostgreSQL JDBC driver version 42.18 is included with the connector. You can also use a third-party driver to connect to ADB.

You need to specify the JDBC connection string to use the connector. The connection string should include the ADB hostmaster, its port, as well as the name of the database in which the connection is being made.

Using the PostgreSQL JDBC driver

The JDBC connection string for the standard driver looks like this:

jdbc:postgresql://<master>[:<port>]/<database_name>
Example
jdbc:postgresql://adb-master:5432/spark

More info about how to connect to database you can find on the official PostgreSQL docs.

Using another JDBC driver

The Spark connector supports the use of a third-party JDBC driver. To use it, you should:

  1. Construct an ODBC connection string to the ADB master.

  2. Provide a jar file with a third-party JDBC driver in one of the following ways:

    • Add it using the --jars environment variable when calling spark-shell or spark-submit;

    • Build an uber-jar file with all dependencies;

    • Install a jar file with a third-party driver on all spark nodes.

  3. Set the full path to the java class of the driver in the connector options (Spark connector options).

Code samples

Data loading sample

Specify the connector settings:

scala>   val options = Map(
   |     "spark.adb.url" -> "jdbc:postgresql://10.92.3.151:5432/spark",
   |     "spark.adb.user" -> "spark",
   |     "spark.adb.password" -> "Orion123",
   |     "spark.adb.dbschema" -> "test_data",
   |     "spark.adb.dbtable" -> "crimes"
   |   )

options: scala.collection.immutable.Map[String,String] = Map(spark.adb.password -> Orion123, spark.adb.dbschema -> test_data, spark.adb.dbtable -> crimes, spark.adb.user -> spark, "spark.adb.url" -> jdbc:postgresql://10.92.3.151:5432/spark)

Register a dataframe and get a schema:

val crimes  = spark
   .read
   .format("adb")
   .options(options)
   .load()
crimes: org.apache.spark.sql.DataFrame = [incident_number: string, offense_code: int ... 15 more fields]
crimes.printSchema()
scala> crimes.printSchema()
root
|-- incident_number: string (nullable = false)
|-- offense_code: integer (nullable = false)
|-- offense_code_group: string (nullable = true)
|-- offense_description: string (nullable = true)
|-- district: string (nullable = true)
|-- reporting_area: string (nullable = true)
|-- shooting: string (nullable = true)
|-- occurred_on_date: timestamp (nullable = true)
|-- year: integer (nullable = false)
|-- month: integer (nullable = true)
|-- day_of_week: string (nullable = true)
|-- hour: integer (nullable = true)
|-- ucr_part: string (nullable = true)
|-- street: string (nullable = true)
|-- lat: decimal(38,18) (nullable = true)
|-- long: decimal(38,18) (nullable = true)
|-- location: string (nullable = true)

Calculating the number of rows in a dataset:

scala> crimes.count()
res2: Long = 319037

Query execution with aggregation:

scala> :paste
crimes
   .groupBy($"DISTRICT")
   .agg(expr("COUNT(INCIDENT_NUMBER) as crimes_total"),
      expr("AVG(Lat) as lat"),
      expr("AVG(Long) as lng")
   ).show()

+--------+------------+--------------------+--------------------+
|DISTRICT|crimes_total|                 lat|                 lng|
+--------+------------+--------------------+--------------------+
|      C6|       23456|42.21209922281136...|-70.8555727710997...|
|    null|        1765|25.23950519369345...|-43.4487743870425...|
|      B2|       49939|42.31600291007640...|-71.0756976057897...|
|     C11|       42526|42.29263759987228...|-71.0512586281297...|
|     E13|       17533|42.30980222459151...|-71.0980043154060...|
|      B3|       35439|42.28305741964864...|-71.0789481958754...|
|      E5|       13238|42.19796309046079...|-71.0043988112886...|
|     A15|        6505|42.17915525091085...|-70.7447250895850...|
|      A7|       13544|42.36070260499385...|-71.0039483303984...|
|     D14|       20127|42.34350724510932...|-71.1312546172648...|
|      D4|       41910|42.34124257016540...|-71.0772502980893...|
|     E18|       17347|42.26267985201514...|-71.1189201704008...|
|      A1|       35708|42.33122331388176...|-71.0199061556178...|
+--------+------------+--------------------+--------------------+

Also you can execute any sql query through the master. To do this, you need to set additional Spark session settings and enable implicit auxiliary functions.

Example of data loading:

spark.stop
val spark2 = SparkSession
      .builder()
      .master("local[*]")
      .appName("spark_example")
      .config("spark.adb.url","jdbc:postgresql://10.92.3.151:5432/spark")
      .config("spark.adb.driver","org.postgresql.Driver")
      .config("spark.adb.user","spark")
      .config("spark.adb.password","Orion123")
      .getOrCreate()
import io.arenadata.spark.adb.implicits._
val crimes2 = spark2.executeAdbSelectQueryOnMaster("select * from test_data.crimes;")
crimes2: org.apache.spark.sql.DataFrame = [incident_number: string, offense_code: int ... 15 more fields]
scala> crimes2.count()
res1: Long = 319037

Example of creating an arbitrary table:

spark2.executeAdbQueryOnMaster("create table test_data.test_table_query(id int);")
spark2.executeAdbQueryOnMaster("insert into test_data.test_table_query values(1);")
spark2.executeAdbQueryOnMaster("insert into test_data.test_table_query values(2);")
spark2.executeAdbQueryOnMaster("insert into test_data.test_table_query values(3);")
val test = spark2.executeAdbSelectQueryOnMaster("select * from test_data.test_table_query;")
test: org.apache.spark.sql.DataFrame = [id: int]
test.show(10)
+---+
| id|
+---+
|  2|
|  3|
|  1|
+---+

Example of data writing

Loading the finalResult dataset to ADB from Spark:

val finalResult =
   crimesDistrictAnalytics
     .join(crimesDistrictMedian, "DISTRICT")
     .join(crimesByDistrictByCrimeTypes, "DISTRICT")
     .select($"DISTRICT", $"crimes_total", $"crimes_monthly", $"frequent_crime_types", $"lat", $"lng")
     .repartition(4)
     .cache()

scala>   val options = Map(
   |     "spark.adb.url" -> "jdbc:postgresql://10.92.3.151:5432/spark",
   |     "spark.adb.user" -> "spark",
   |     "spark.adb.password" -> "Orion123",
   |     "spark.adb.dbschema" -> "test_data",
   |     "spark.adb.dbtable" -> "crimes_final_result"
   |   )

   options: scala.collection.immutable.Map[String,String] = Map(spark.adb.password -> Orion123, spark.adb.dbschema -> test_data, spark.adb.dbtable -> crimes, spark.adb.user -> spark, "spark.adb.url" -> jdbc:postgresql://10.92.3.151:5432/spark)


   finalResult
      .write
      .format("adb")
      .options(options)
      .mode(SaveMode.Overwrite)
      .save()

Checking the availability of data in ADB:

spark=#  \d+ test_data.crimes_final_result
                            Table "test_data.crimes_final_result"
      Column        |       Type       | Modifiers | Storage  | Stats target | Description
----------------------+------------------+-----------+----------+--------------+-------------
district             | text             |           | extended |              |
crimes_total         | bigint           | not null  | plain    |              |
crimes_monthly       | double precision |           | plain    |              |
frequent_crime_types | text             | not null  | extended |              |
lat                  | numeric(38,22)   |           | main     |              |
lng                  | numeric(38,22)   |           | main     |              |
Distributed by: (district)

spark=# select * from test_data.crimes_final_result;
district | crimes_total | crimes_monthly |                    frequent_crime_types                    |            lat            |            lng
----------+--------------+----------------+------------------------------------------------------------+---------------------------+----------------------------
B2       |        49939 |           3985 | M/V, M/V ACCIDENT, VERBAL DISPUTE                          | 42.3160029100764052730000 | -71.0756976057897015220000
D14      |        20127 |         1607.5 | TOWED MOTOR VEHICLE, M/V, SICK/INJURED/MEDICAL             | 42.3435072451093210590000 | -71.1312546172648812640000
A15      |         6505 |            499 | M/V ACCIDENT, INVESTIGATE PERSON, M/V                      | 42.1791552509108589950000 | -70.7447250895850891410000
E13      |        17533 |           1368 | SICK/INJURED/MEDICAL, DRUGS, M/V ACCIDENT                  | 42.3098022245915170500000 | -71.0980043154060730210000
A7       |        13544 |           1092 | SICK/INJURED/MEDICAL, DRUGS, INVESTIGATE PERSON            | 42.3607026049938574940000 | -71.0039483303984950860000
E5       |        13238 |         1043.5 | SICK/INJURED/MEDICAL, INVESTIGATE PERSON, M/V ACCIDENT     | 42.1979630904607935020000 | -71.0043988112886597940000
B3       |        35439 |           2764 | VERBAL DISPUTE, INVESTIGATE PERSON, MISSING PERSON         | 42.2830574196486438360000 | -71.0789481958754228740000
E18      |        17347 |           1313 | SICK/INJURED/MEDICAL, M/V ACCIDENT, INVESTIGATE PERSON     | 42.2626798520151451410000 | -71.1189201704008654370000
C11      |        42526 |           3283 | M/V, SICK/INJURED/MEDICAL, INVESTIGATE PERSON              | 42.2926375998722842040000 | -71.0512586281297709920000
A1       |        35708 |           2775 | PROPERTY, ASSAULT SIMPLE, DRUGS                            | 42.3312233138817663820000 | -71.0199061556178000740000
C6       |        23456 |           1870 | DRUGS, SICK/INJURED/MEDICAL, INVESTIGATE PERSON            | 42.2120992228113649950000 | -70.8555727710997195530000
D4       |        41910 |         3297.5 | LARCENY SHOPLIFTING, PROPERTY, LARCENY THEFT FROM BUILDING | 42.3412425701654033500000 | -71.0772502980893959520000
(12 rows)
Found a mistake? Seleсt text and press Ctrl+Enter to report it