- 数据摄取
- Apache Spark
- Spark JDBC
Spark JDBC
ClickHouse Supported
JDBC 是 Spark 中最常用的数据源之一。 在本节中,我们将介绍如何使用 ClickHouse 官方 JDBC 连接器 与 Spark 集成。
读取数据
- Java
- Scala
- Python
- Spark SQL
public static void main(String[] args) {
// 初始化 Spark 会话
SparkSession spark = SparkSession.builder().appName("example").master("local").getOrCreate();
String jdbcURL = "jdbc:ch://localhost:8123/default";
String query = "select * from example_table where id > 2";
//---------------------------------------------------------------------------------------------------
// 使用 JDBC 从 ClickHouse 读取表数据
//---------------------------------------------------------------------------------------------------
Properties jdbcProperties = new Properties();
jdbcProperties.put("user", "default");
jdbcProperties.put("password", "123456");
Dataset<Row> df1 = spark.read().jdbc(jdbcURL, String.format("(%s)", query), jdbcProperties);
df1.show();
//---------------------------------------------------------------------------------------------------
// 使用 load 方法从 ClickHouse 读取表数据
//---------------------------------------------------------------------------------------------------
Dataset<Row> df2 = spark.read()
.format("jdbc")
.option("url", jdbcURL)
.option("user", "default")
.option("password", "123456")
.option("query", query)
.load();
df2.show();
// 停止 Spark 会话
spark.stop();
}
object ReadData extends App {
// 初始化 Spark 会话
val spark: SparkSession = SparkSession.builder.appName("example").master("local").getOrCreate
val jdbcURL = "jdbc:ch://localhost:8123/default"
val query: String = "select * from example_table where id > 2"
//---------------------------------------------------------------------------------------------------
// 使用 JDBC 从 ClickHouse 读取表数据
//---------------------------------------------------------------------------------------------------
val connectionProperties = new Properties()
connectionProperties.put("user", "default")
connectionProperties.put("password", "123456")
val df1: Dataset[Row] = spark.read.
jdbc(jdbcURL, s"($query)", connectionProperties)
df1.show()
//---------------------------------------------------------------------------------------------------
// 使用 load 方法从 ClickHouse 读取表数据
//---------------------------------------------------------------------------------------------------
val df2: Dataset[Row] = spark.read
.format("jdbc")
.option("url", jdbcURL)
.option("user", "default")
.option("password", "123456")
.option("query", query)
.load()
df2.show()
// 停止 Spark 会话
spark.stop()
}
from pyspark.sql import SparkSession
jar_files = [
"jars/clickhouse-jdbc-X.X.X-SNAPSHOT-all.jar"
]
# 使用 JAR 文件初始化 Spark 会话
spark = SparkSession.builder \
.appName("example") \
.master("local") \
.config("spark.jars", ",".join(jar_files)) \
.getOrCreate()
url = "jdbc:ch://localhost:8123/default"
user = "your_user"
password = "your_password"
query = "select * from example_table where id > 2"
driver = "com.clickhouse.jdbc.ClickHouseDriver"
df = (spark.read
.format('jdbc')
.option('driver', driver)
.option('url', url)
.option('user', user)
.option('password', password).option(
'query', query).load())
df.show()
CREATE TEMPORARY VIEW jdbcTable
USING org.apache.spark.sql.jdbc
OPTIONS (
url "jdbc:ch://localhost:8123/default",
dbtable "schema.tablename",
user "username",
password "password",
driver "com.clickhouse.jdbc.ClickHouseDriver"
);
SELECT * FROM jdbcTable;
写入数据
- Java
- Scala
- Python
- Spark SQL
public static void main(String[] args) {
// 初始化 Spark 会话
SparkSession spark = SparkSession.builder().appName("example").master("local").getOrCreate();
// JDBC 连接详细信息
String jdbcUrl = "jdbc:ch://localhost:8123/default";
Properties jdbcProperties = new Properties();
jdbcProperties.put("user", "default");
jdbcProperties.put("password", "123456");
// 创建示例 DataFrame
StructType schema = new StructType(new StructField[]{
DataTypes.createStructField("id", DataTypes.IntegerType, false),
DataTypes.createStructField("name", DataTypes.StringType, false)
});
List<Row> rows = new ArrayList<Row>();
rows.add(RowFactory.create(1, "John"));
rows.add(RowFactory.create(2, "Doe"));
Dataset<Row> df = spark.createDataFrame(rows, schema);
//---------------------------------------------------------------------------------------------------
// 使用 jdbc 方法将 DataFrame 写入 ClickHouse
//---------------------------------------------------------------------------------------------------
df.write()
.mode(SaveMode.Append)
.jdbc(jdbcUrl, "example_table", jdbcProperties);
//---------------------------------------------------------------------------------------------------
// 使用 save 方法将 DataFrame 写入 ClickHouse
//---------------------------------------------------------------------------------------------------
df.write()
.format("jdbc")
.mode("append")
.option("url", jdbcUrl)
.option("dbtable", "example_table")
.option("user", "default")
.option("password", "123456")
.save();
// 停止 Spark 会话
spark.stop();
}
object WriteData extends App {
val spark: SparkSession = SparkSession.builder.appName("example").master("local").getOrCreate
// JDBC 连接详细信息
val jdbcUrl: String = "jdbc:ch://localhost:8123/default"
val jdbcProperties: Properties = new Properties
jdbcProperties.put("user", "default")
jdbcProperties.put("password", "123456")
// 创建示例 DataFrame
val rows = Seq(Row(1, "John"), Row(2, "Doe"))
val schema = List(
StructField("id", DataTypes.IntegerType, nullable = false),
StructField("name", StringType, nullable = true)
)
val df: DataFrame = spark.createDataFrame(
spark.sparkContext.parallelize(rows),
StructType(schema)
)
//---------------------------------------------------------------------------------------------------
// 使用 jdbc 方法将 DataFrame 写入 ClickHouse
//---------------------------------------------------------------------------------------------------
df.write
.mode(SaveMode.Append)
.jdbc(jdbcUrl, "example_table", jdbcProperties)
//---------------------------------------------------------------------------------------------------
// 使用 save 方法将 DataFrame 写入 ClickHouse
//---------------------------------------------------------------------------------------------------
df.write
.format("jdbc")
.mode("append")
.option("url", jdbcUrl)
.option("dbtable", "example_table")
.option("user", "default")
.option("password", "123456")
.save()
// 停止 Spark 会话
spark.stop()
}
from pyspark.sql import SparkSession
from pyspark.sql import Row
jar_files = [
"jars/clickhouse-jdbc-X.X.X-SNAPSHOT-all.jar"
]
# 使用 JAR 包初始化 Spark 会话
spark = SparkSession.builder \
.appName("example") \
.master("local") \
.config("spark.jars", ",".join(jar_files)) \
.getOrCreate()
# 创建 DataFrame
data = [Row(id=11, name="John"), Row(id=12, name="Doe")]
df = spark.createDataFrame(data)
url = "jdbc:ch://localhost:8123/default"
user = "your_user"
password = "your_password"
driver = "com.clickhouse.jdbc.ClickHouseDriver"
# 将 DataFrame 写入 ClickHouse
df.write \
.format("jdbc") \
.option("driver", driver) \
.option("url", url) \
.option("user", user) \
.option("password", password) \
.option("dbtable", "example_table") \
.mode("append") \
.save()
CREATE TEMPORARY VIEW jdbcTable
USING org.apache.spark.sql.jdbc
OPTIONS (
url "jdbc:ch://localhost:8123/default",
dbtable "schema.tablename",
user "username",
password "password",
driver "com.clickhouse.jdbc.ClickHouseDriver"
);
-- resultTable 可以通过 df.createTempView 或 Spark SQL 创建
INSERT INTO TABLE jdbcTable
SELECT * FROM resultTable;
并行度
使用 Spark JDBC 时,Spark 默认只使用单个分区读取数据。要获得更高的并行度,你需要指定
partitionColumn、lowerBound、upperBound 和 numPartitions,用于定义在由多个 worker 并行读取时如何对表进行分区。
如需了解更多信息,请参阅 Apache Spark 官方文档中的
JDBC 配置。
JDBC 限制
- 截至目前,通过 JDBC 只能将数据插入到已存在的表中(目前无法像 Spark 使用其他连接器那样,在插入 DataFrame 时自动创建表)。