Usage example
In this example, we collect crime statistics from different Boston neighborhoods. You can download a dataset from the following source.
Before you start using the Spark connector, you need to find out the following information:
-
ADB master host name.
-
ADB master running port.
-
Name of the database to connect to.
-
Name of the schema and table to be accessed.
-
User credentials for connection.
Load a connector
The file name has the following structure:
adb-spark-connector_<scala_version>-<adb-connector-version>.tar.gz
File name example:
adb-spark-connector_2.11-1.0.tar.gz
The package contains a license text file and a JAR file.
spark-shell
You can run Spark via the spark-shell
command with the --jars
option. That option specifies the path to the connector JAR file:
user@spark-node$ export CONNECTOR_JAR=/path/to/adb-spark-connector_2.11-1.0.jar
user@spark-node$ spark-shell --jars $CONNECTOR_JAR
< ... spark-shell startup output messages ... >
scala>
Create an application
When developing a standalone Spark application, it is necessary to combine the connector JAR file with other application dependencies and put the result into the uber-jar file.
You can use the spark-submit
command to launch a Spark application assembled with the connector. You can also use the spark-submit
command with the --jar
option. More information is available in the Spark documentation.
Connection string for ADB
The connector uses JDBC connection to interact with the ADB master. The PostgreSQL JDBC driver comes together with the connector. You can also use a third-party driver to connect to ADB.
In order to use the connector, you need to specify the JDBC connection string. The connection string should include the ADB master host, its port, and the name of the database to which the connection is being made.
Use PostgreSQL JDBC driver
A JDBC connection string for the standard driver has the following structure:
jdbc:postgresql://<master>[:<port>]/<database_name>
Example:
jdbc:postgresql://adb-master:5432/spark
More information is available in the PostgreSQL documentation.
Use another JDBC driver
Spark connector supports the use of third-party JDBC drivers. In order to use a third-party JDBC driver, do the following:
-
Create an ODBC connection string for the ADB master.
-
Submit a JAR file with a third-party JDBC driver via one of the following ways:
-
Use the
--jars
environment variable when callingspark-shell
orspark-submit
. -
Build the uber-jar file with all dependencies.
-
Install a JAR file with a third-party driver on all Spark nodes.
-
-
Set the full path to the driver Java class in the connector options.
Code samples
Example of data loading
Specify the connector settings:
scala> val options = Map(
| "spark.adb.url" -> "jdbc:postgresql://10.92.3.151:5432/spark",
| "spark.adb.user" -> "spark",
| "spark.adb.password" -> "Orion123",
| "spark.adb.dbschema" -> "test_data",
| "spark.adb.dbtable" -> "crimes"
| )
options: scala.collection.immutable.Map[String,String] = Map(spark.adb.password -> Orion123, spark.adb.dbschema -> test_data, spark.adb.dbtable -> crimes, spark.adb.user -> spark, "spark.adb.url" -> jdbc:postgresql://10.92.3.151:5432/spark)
Register a dataframe:
val crimes = spark
.read
.format("adb")
.options(options)
.load()
crimes: org.apache.spark.sql.DataFrame = [incident_number: string, offense_code: int ... 15 more fields]
Get a schema:
crimes.printSchema()
scala> crimes.printSchema()
root
|-- incident_number: string (nullable = false)
|-- offense_code: integer (nullable = false)
|-- offense_code_group: string (nullable = true)
|-- offense_description: string (nullable = true)
|-- district: string (nullable = true)
|-- reporting_area: string (nullable = true)
|-- shooting: string (nullable = true)
|-- occurred_on_date: timestamp (nullable = true)
|-- year: integer (nullable = false)
|-- month: integer (nullable = true)
|-- day_of_week: string (nullable = true)
|-- hour: integer (nullable = true)
|-- ucr_part: string (nullable = true)
|-- street: string (nullable = true)
|-- lat: decimal(38,18) (nullable = true)
|-- long: decimal(38,18) (nullable = true)
|-- location: string (nullable = true)
Calculate the number of rows in a dataset:
scala> crimes.count()
res2: Long = 319037
Execute a query with aggregation:
scala> :paste
crimes
.groupBy($"DISTRICT")
.agg(expr("COUNT(INCIDENT_NUMBER) as crimes_total"),
expr("AVG(Lat) as lat"),
expr("AVG(Long) as lng")
).show()
+--------+------------+--------------------+--------------------+
|DISTRICT|crimes_total| lat| lng|
+--------+------------+--------------------+--------------------+
| C6| 23456|42.21209922281136...|-70.8555727710997...|
| null| 1765|25.23950519369345...|-43.4487743870425...|
| B2| 49939|42.31600291007640...|-71.0756976057897...|
| C11| 42526|42.29263759987228...|-71.0512586281297...|
| E13| 17533|42.30980222459151...|-71.0980043154060...|
| B3| 35439|42.28305741964864...|-71.0789481958754...|
| E5| 13238|42.19796309046079...|-71.0043988112886...|
| A15| 6505|42.17915525091085...|-70.7447250895850...|
| A7| 13544|42.36070260499385...|-71.0039483303984...|
| D14| 20127|42.34350724510932...|-71.1312546172648...|
| D4| 41910|42.34124257016540...|-71.0772502980893...|
| E18| 17347|42.26267985201514...|-71.1189201704008...|
| A1| 35708|42.33122331388176...|-71.0199061556178...|
+--------+------------+--------------------+--------------------+
You can execute any SQL query through the master as well. To do this, you need to set additional Spark session settings and enable auxiliary functions.
Load data:
spark.stop
val spark2 = SparkSession
.builder()
.master("local[*]")
.appName("spark_example")
.config("spark.adb.url","jdbc:postgresql://10.92.3.151:5432/spark")
.config("spark.adb.driver","org.postgresql.Driver")
.config("spark.adb.user","spark")
.config("spark.adb.password","Orion123")
.getOrCreate()
import io.arenadata.spark.adb.implicits._
val crimes2 = spark2.executeAdbSelectQueryOnMaster("select * from test_data.crimes;")
crimes2: org.apache.spark.sql.DataFrame = [incident_number: string, offense_code: int ... 15 more fields]
scala> crimes2.count()
res1: Long = 319037
Create arbitrary table:
spark2.executeAdbQueryOnMaster("create table test_data.test_table_query(id int);")
spark2.executeAdbQueryOnMaster("insert into test_data.test_table_query values(1);")
spark2.executeAdbQueryOnMaster("insert into test_data.test_table_query values(2);")
spark2.executeAdbQueryOnMaster("insert into test_data.test_table_query values(3);")
val test = spark2.executeAdbSelectQueryOnMaster("select * from test_data.test_table_query;")
test: org.apache.spark.sql.DataFrame = [id: int]
test.show(10)
+---+
| id|
+---+
| 2|
| 3|
| 1|
+---+
Example of data writing
Load the finalResult
dataset from Spark to ADB:
val finalResult =
crimesDistrictAnalytics
.join(crimesDistrictMedian, "DISTRICT")
.join(crimesByDistrictByCrimeTypes, "DISTRICT")
.select($"DISTRICT", $"crimes_total", $"crimes_monthly", $"frequent_crime_types", $"lat", $"lng")
.repartition(4)
.cache()
scala> val options = Map(
| "spark.adb.url" -> "jdbc:postgresql://10.92.3.151:5432/spark",
| "spark.adb.user" -> "spark",
| "spark.adb.password" -> "Orion123",
| "spark.adb.dbschema" -> "test_data",
| "spark.adb.dbtable" -> "crimes_final_result"
| )
options: scala.collection.immutable.Map[String,String] = Map(spark.adb.password -> Orion123, spark.adb.dbschema -> test_data, spark.adb.dbtable -> crimes, spark.adb.user -> spark, "spark.adb.url" -> jdbc:postgresql://10.92.3.151:5432/spark)
finalResult
.write
.format("adb")
.options(options)
.mode(SaveMode.Overwrite)
.save()
Check the data availability in ADB:
spark=# \d+ test_data.crimes_final_result
Table "test_data.crimes_final_result"
Column | Type | Modifiers | Storage | Stats target | Description
----------------------+------------------+-----------+----------+--------------+-------------
district | text | | extended | |
crimes_total | bigint | not null | plain | |
crimes_monthly | double precision | | plain | |
frequent_crime_types | text | not null | extended | |
lat | numeric(38,22) | | main | |
lng | numeric(38,22) | | main | |
Distributed by: (district)
spark=# select * from test_data.crimes_final_result;
district | crimes_total | crimes_monthly | frequent_crime_types | lat | lng
----------+--------------+----------------+------------------------------------------------------------+---------------------------+----------------------------
B2 | 49939 | 3985 | M/V, M/V ACCIDENT, VERBAL DISPUTE | 42.3160029100764052730000 | -71.0756976057897015220000
D14 | 20127 | 1607.5 | TOWED MOTOR VEHICLE, M/V, SICK/INJURED/MEDICAL | 42.3435072451093210590000 | -71.1312546172648812640000
A15 | 6505 | 499 | M/V ACCIDENT, INVESTIGATE PERSON, M/V | 42.1791552509108589950000 | -70.7447250895850891410000
E13 | 17533 | 1368 | SICK/INJURED/MEDICAL, DRUGS, M/V ACCIDENT | 42.3098022245915170500000 | -71.0980043154060730210000
A7 | 13544 | 1092 | SICK/INJURED/MEDICAL, DRUGS, INVESTIGATE PERSON | 42.3607026049938574940000 | -71.0039483303984950860000
E5 | 13238 | 1043.5 | SICK/INJURED/MEDICAL, INVESTIGATE PERSON, M/V ACCIDENT | 42.1979630904607935020000 | -71.0043988112886597940000
B3 | 35439 | 2764 | VERBAL DISPUTE, INVESTIGATE PERSON, MISSING PERSON | 42.2830574196486438360000 | -71.0789481958754228740000
E18 | 17347 | 1313 | SICK/INJURED/MEDICAL, M/V ACCIDENT, INVESTIGATE PERSON | 42.2626798520151451410000 | -71.1189201704008654370000
C11 | 42526 | 3283 | M/V, SICK/INJURED/MEDICAL, INVESTIGATE PERSON | 42.2926375998722842040000 | -71.0512586281297709920000
A1 | 35708 | 2775 | PROPERTY, ASSAULT SIMPLE, DRUGS | 42.3312233138817663820000 | -71.0199061556178000740000
C6 | 23456 | 1870 | DRUGS, SICK/INJURED/MEDICAL, INVESTIGATE PERSON | 42.2120992228113649950000 | -70.8555727710997195530000
D4 | 41910 | 3297.5 | LARCENY SHOPLIFTING, PROPERTY, LARCENY THEFT FROM BUILDING | 42.3412425701654033500000 | -71.0772502980893959520000
(12 rows)