Пример запуска задач c DataFrame

Spark DataFrame (DF) — это структура данных, организованная в именованные столбцы и распределенная по кластеру. Концептуально DF похож на таблицу в реляционной базе данных, но с определенными оптимизациями. Технически Spark DataFrame представляет собой DataSet, хранящий данные типа Row.

В данной статье показаны базовые примеры использования DF на языке Scala, которые можно запускать в spark-shell. Также Spark поддерживает другие языки, например Java, Python и R.

Операции с DataFrame

Создание

Объект DataFrame может быть создан из различных источников, наиболее распространенные способы представлены ниже.

  • Из структурированных данных

  • Из RDD

  • Из внешней базы данных, используя JDBC

Ниже показано создание DF из JSON-файла, хранящегося в HDFS.

val df = spark
    .read
    .json("hdfs://adh/user/admin/testload/people.json")

// read multiline JSON
val multiline_df = spark
    .read
    .option("multiline","true")
    .json("hdfs://adh/user/admin/testload/people_multiline.json")

В данном примере схема для DF определяется автоматически на основе структуры JSON-файла. Для получения дополнительной информации о создании DF из других источников, таких как Avro, Parquet, ORC и многих других, смотрите доступные источники данных.

Вы можете преобразовать RDD в DF с помощью метода rdd.toDF(). По умолчанию Spark генерирует схему автоматически на основе структуры RDD, однако вы также можете указать схему явно.

val rdd = sc.textFile("hdfs://adh/user/admin/testload/test_data_rdd.txt")
val df = rdd.toDF()

Пример ниже демонстрирует получение DF из таблицы MySQL. Чтобы код примера заработал, в classpath должен быть доступен JAR MySQL-драйвера.

val df_mysql = spark.read.format("jdbc")
   .option("url", "jdbc:mysql://<host>:<port>/db_name")
   .option("driver", "com.mysql.jdbc.Driver")
   .option("dbtable", "<table_name>")
   .option("user", "<username>")
   .option("password", "<password>")
   .load()

Получение DF из других СУБД выглядит аналогично.

Помимо вышеупомянутых, существует множество других способов получения объекта DataFrame, например, с помощью методов SparkSession.sql()/SparkSession.createDataFrame() или запроса к таблице Hive/HBase и так далее. Более подробная информация об этих методах доступна в документации Spark.

Модификация

Все операции DataFrame делятся на два типа:

  • Трансформации (transformations), которые возвращают новый DF. Например, map(), filter(), agg(), col(), join() и так далее.

  • Действия (actions), которые возвращают некое значение программе-драйверу после выполнения вычислений над DF. Например, collect(), count() и так далее.

Все операции с DF являются "ленивыми" (lazy), то есть они не выполняются сразу. Вычисления производятся только тогда, когда какой-то результат должен быть возвращен программе-драйверу, например, вызов df.show().

Полная справочная информация о поддерживаемых операциях доступна на странице Spark DataSet API.

Просмотр содержимого

Для просмотра содержимого DF используйте метод df.show(x), который возвращает первые x строк DF.

ПРИМЕЧАНИЕ
Используйте с осторожностью конструкции типа df.collect().foreach(println), так как это может привести к нехватке памяти на машине-драйвере Spark, поскольку collect() загружает данные DF со всех узлов кластера на одну машину.

Для получения информации о DF-схеме, используйте df.printSchema() как показано ниже.

root
 |-- age: long (nullable = true)
 |-- dept: string (nullable = true)
 |-- id: long (nullable = true)
 |-- lastname: string (nullable = true)
 |-- name: string (nullable = true)

Выполнение SQL-запросов

Модуль Spark SQL позволяет выполнять традиционные SQL-запросы к DF. Для этого необходимо создать временное представление (View) из DataFrame с помощью метода createOrReplaceTempView(). Затем вы можете отправлять SQL-запросы к этому представлению, используя метод SparkSession.sql(). Пример приведен ниже.

val df = spark
    .read
    .csv("hdfs://adh/user/admin/testload/employees.json")

df.createOrReplaceTempView("employees")

val df_sql = spark.sql("SELECT name, lastname, email FROM employees")
df_sql.show()

После создания временное представление ведет себя как обычная таблица SQL, однако оно нигде не сохраняется в виде файла. Представление привязано к сессии SparkSession, в которой оно было создано, и остается доступным до тех пор, пока существует объект SparkSession. Больше информации о выполнении SQL-запросов доступно в разделе SparkSession.sql() API.

Пример

Следующий пример демонстрирует использование основных операций с DF. Чтобы пример отработал без ошибок, соответствующий файл с данными (employees.json) должен быть доступен в HDFS. Создание тестового файла показано ниже.

  1. В локальной файловой системе создайте файл employees.json:

    echo -e '{"id":1,"name":"Sarah","lastname":"Connor","dept":"hr","age":34}\n{"id":2,"name":"Michael","lastname":"Scott","dept":"sales","age":47}\n{"id":3,"name":"Luke","lastname":"Skywalker","dept":"it","age":45}\n{"id":4,"name":"James","lastname":"Hetfield","dept":"sales","age":50}' > employees.json
  2. Создайте директорию в HDFS, установите права доступа и скопируйте тестовый файл в HDFS:

    $ sudo -u hdfs hdfs dfs -mkdir /user/admin
    $ sudo -u hdfs hdfs dfs -chown admin:admin /user/admin
    $ sudo -u hdfs hdfs dfs -put /home/admin/spark-demo/employees.json /user/admin/employees.json

    Необходимо убедиться, что пользователь hdfs имеет доступ к файлу employees.json.

Когда тестовые данные загружены в HDFS, вы можете выполнить следующий Scala-код непосредственно в spark-shell (/bin/spark-shell).

val df_employees = spark.read.json("hdfs://adh/user/admin/employees.json") (1)

println("DataFrame created:")
df_employees.show()

println("DataFrame schema:")
df_employees.printSchema()

println("Show name-age selection from all employees:")
df_employees.select("name","age").show()

println("Get all employees older than 40")
df_employees.filter("age > 40").show()


val depts_data = Seq((1,"it"),(2,"sales"),(3,"hr"))
val rdd = sc.parallelize(depts_data)
val df_department = rdd.toDF("id","department") (2)

println("DataFrame created:")
df_department.show()

val joined = df_employees.filter("age > 30").join(df_department, df_employees("dept") === df_department("department")) (3)

println("Get number of employees per department")
joined.groupBy("department").count().show()

println("Get average age per department")
joined.groupBy("department").mean("age").show() (4)
1 Создание DF из JSON-файла в HDFS.
2 Создание DF из RDD.
3 Фильтрация по столбцу age и выполнение JOIN для двух DF.
4 Использование GROUP BY с агрегацией по столбцу.

 

В результате выполнения примера в консоль выводится следующий результат:

DataFrame created:
+---+-----+---+---------+-------+
|age| dept| id| lastname|   name|
+---+-----+---+---------+-------+
| 34|   hr|  1|   Connor|  Sarah|
| 47|sales|  2|    Scott|Michael|
| 45|   it|  3|Skywalker|   Luke|
| 50|sales|  4| Hetfield|  James|
+---+-----+---+---------+-------+

DataFrame schema:
root
 |-- age: long (nullable = true)
 |-- dept: string (nullable = true)
 |-- id: long (nullable = true)
 |-- lastname: string (nullable = true)
 |-- name: string (nullable = true)

Show name-age selection from all employees:
+-------+---+
|   name|age|
+-------+---+
|  Sarah| 34|
|Michael| 47|
|   Luke| 45|
|  James| 50|
+-------+---+

Get all employees older than 40:
+---+-----+---+---------+-------+
|age| dept| id| lastname|   name|
+---+-----+---+---------+-------+
| 47|sales|  2|    Scott|Michael|
| 45|   it|  3|Skywalker|   Luke|
| 50|sales|  4| Hetfield|  James|
+---+-----+---+---------+-------+

DataFrame created:
+---+----------+
| id|department|
+---+----------+
|  1|        it|
|  2|     sales|
|  3|        hr|
+---+----------+

Get number of employees per department:
+----------+-----+
|department|count|
+----------+-----+
|     sales|    2|
|        hr|    1|
|        it|    1|
+----------+-----+

Get average age per department:
+----------+--------+
|department|avg(age)|
+----------+--------+
|     sales|    48.5|
|        hr|    34.0|
|        it|    45.0|
+----------+--------+
Нашли ошибку? Выделите текст и нажмите Ctrl+Enter чтобы сообщить о ней