ClickHouse2HBase组件_JavaSqlResultSet转化为SparkSqlDataFrame

ClickHouse2HBase组件_JavaSqlResultSet转化为SparkSqlDataFrame

1.背景

当前预计算任务主要基于Spark集群进行数据计算再推到HBase等kv数据库,Spark离线计算集群按资源使用率进行计费,费用高昂;且在分区数据量级1亿场景下,Spark引擎计算效率不如ClickHouse引擎,考虑通过ClickHouse凌晨生产时段进行预计算加工既可以节约成本,又可以提高时效。

但是直接在ClickHouse客户端提交一个对所有数据进行Cube预计算的命令很难成功,主要会面临两个问题:1)计算超时,当数据量和计算量较大时,单条命令执行时间较长,一般平台设置即席查询超限时间20s,沟通后最多提升到600s,对大批量有聚合计算的查询仍有瓶颈;2)本地节点内存超限,当数据量较大时,从远程节点返回给本地节点进行聚合计算的数据量也会很大,易导致内存超限。

为了解决上述业务场景问题,想到了一个拆分方案:选取聚合维度中维度枚举值最多的那个维度,查询出其所有维值列表,然后将这些维值分批切片,按批次来执行预计算,每批只筛选其中部分维值对应数据进行预计算,依次执行完所有维值数据。那么要获取并存储高基维值,并适当使用并发,Spark引擎是不二之选,在开发过程中遇到一个问题,我们需要把ClickHouse分片分批执行的ResultSet结果合并起来,并转化为DataFrame方便后续继续进行各种并发的转换和执行操作。

2.java.sql.ResultSet转化为spark.sql.DataFrame解析

2.1 从java.sql.ResultSet中获取查询结果schema

在Java里,可以使用ResultSet.getMetaData()方法获得ResultSetMetaData对象,ResultSetMetaData对象中保存了ResultSet中数据的schema信息。

ResultSetMetaData类提供了一系列的方法来获取关于列的详细信息,比如列名、列类型、列的数量等,以下是一些常用方法:

getColumnCount():返回ResultSet中列的数量。

getColumnName(int index):根据索引获取指定列的名称。

getColumnTypeName(int index):根据索引获取指定列的数据库特定的类型名称。

getColumnType(int index):根据索引返回指定列的SQL类型。

isNullable(int index):根据索引表示指定列的值是否可以为 null。

getPrecision(int index):根据索引返回指定列的精度。

getScale(int index):根据索引获取指定列的小数点右边的位数。

2.2 手动转换步骤

要将一个JDBC查询结果集并入Spark数据集中,需要将ResultSet中的数据逐行转化为Row对象,还需要获取到ResultSet中数据的Schema,然后使用spark.createDataFrame()创建DataFrame对象。如下代码是一个简单的手动转换案例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
public class ConvertResultSetToSparkRow {

public List<Row> resultSetToSparkRowList(ResultSet rs) throws Exception {
List<Row> rowsList = new ArrayList<>();
ResultSetMetaData metaData = rs.getMetaData();
int columnCount = metaData.getColumnCount();

// 把结果集中的每一行转换为一个 Spark Row 对象
while (rs.next()) {
Object[] rowFields = new Object[columnCount];
for (int i = 0; i < columnCount; i++) {
rowFields[i] = rs.getObject(i + 1);
}
Row row = RowFactory.create(rowFields);
rowsList.add(row);
}

return rowsList;
}

public StructType createSchemaFromResultSet(ResultSetMetaData metaData) throws Exception {
List<StructField> fields = new ArrayList<>();
int columnCount = metaData.getColumnCount();

// 从结果集元数据中创建 Spark DataFrame 的 schema
for (int i = 1; i <= columnCount; i++) {
// 这里假设所有列都是 StringType,您需要根据实际列的类型来创建对应的数据类型
StructField field = DataTypes.createStructField(metaData.getColumnName(i), DataTypes.StringType, true);
fields.add(field);
}

return DataTypes.createStructType(fields);
}

public static void main(String[] args) {

Connection connection = null;
// ... 设置数据库连接参数 ...

try {
// 建立数据库连接
connection = DriverManager.getConnection("jdbc:mysql://localhost:3306/yourdatabase", "username", "password");

// 创建 statement 对象并执行查询
Statement statement = connection.createStatement();
ResultSet rs = statement.executeQuery("SELECT * FROM yourtable");

// 转换为 Spark Rows
ConvertResultSetToSparkRow converter = new ConvertResultSetToSparkRow();
List<Row> sparkRows = converter.resultSetToSparkRowList(rs);

// 创建 schema
StructType schema = converter.createSchemaFromResultSet(rs.getMetaData());

// 在这一点上,你现在可以使用 sparkRows 列表和 schema 创建 Spark DataFrame
// 例如: spark.createDataFrame(sparkRows, schema);

} catch (Exception e) {
e.printStackTrace();
}
}
}

2.3 Spark快捷转换方法

Spark提供的DataFrameReader.jdbc()方法对上述转换步骤进行了封装,自动处理了ResultSet到Spark的Row对象的转换,同时根据数据查询结果自动推断出schema,无须我们再手动实现上述代码,可以直接从JDBC源读取数据的schema和dataframe。使用案例如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
public class JdbcToDataFrameExample {
public static void main(String[] args) {
// 初始化 SparkSession
SparkSession spark = SparkSession.builder()
.appName("JdbcToDataFrameExample")
.master("local[*]") // 这里使用本地模式运行,适合演示和测试
.getOrCreate();

// JDBC 连接参数
String jdbcUrl = "jdbc:mysql://localhost:3306/yourdatabase";
String tableName = "yourtable";

// 数据库认证信息
Properties connectionProperties = new Properties();
connectionProperties.put("user", "username");
connectionProperties.put("password", "password");

// 使用 DataFrameReader 读取 JDBC 数据源
Dataset<Row> dataframe = spark.read()
.jdbc(jdbcUrl, tableName, connectionProperties);

// 打印 schema 信息
dataframe.printSchema();

// 显示前 10 行数据
dataframe.show(10);

// 停止 SparkSession
spark.stop();
}
}

Spark.read.load()方法本质同样也是借助DataFrameReader类进行自动处理,使用案例如下:

1
2
3
4
5
6
7
8
9
10
//通过标准维度枚举值的第一行获取最终结果的schema
val firstRow = dimensionListDF.first()
val firstDimensionValue = firstRow.get(firstRow.fieldIndex(dimensionName)).toString
val firstDimensionSql = ckSql.split(ckDatabase + "." + ckTable)(0) + ckDatabase + "." + ckTable +
ckSql.split(ckDatabase + "." + ckTable)(1).
replaceFirst("WHERE", " WHERE " + dimensionName + " in ('" + firstDimensionValue + "') AND")
println("用于获取最终结果schema查询sql如下:")
println(firstDimensionSql)
var firstDimensionResultDF = spark.read.jdbc(s"jdbc:clickhouse://$ckHost:$ckPort/$ckDatabase", s"($firstDimensionSql) temp", connectionProperties)
val schema = firstDimensionResultDF.schema

3.手动将java.sql.ResultSet转化为spark.sql.DataFrame实战

在第一节背景下,在分布式场景中,直接使用Spark的DataFrameReader对ClickHouse预计算结果数据进行获取和处理会有超时和超内存问题,现在需要手动将分批分片执行结果转化为DataFrame,方便后续并发转换和写入其他引擎操作。

1.首先还是借助DataFrameReader执行一行预计算数据获取到ResultSet的schema,后续多处都需要用到:

1
2
3
4
5
6
7
8
9
10
//通过标准维度枚举值的第一行获取最终结果的schema
val firstRow = dimensionListDF.first()
val firstDimensionValue = firstRow.get(firstRow.fieldIndex(dimensionName)).toString
val firstDimensionSql = ckSql.split(ckDatabase + "." + ckTable)(0) + ckDatabase + "." + ckTable +
ckSql.split(ckDatabase + "." + ckTable)(1).
replaceFirst("WHERE", " WHERE " + dimensionName + " in ('" + firstDimensionValue + "') AND")
println("用于获取最终结果schema查询sql如下:")
println(firstDimensionSql)
var firstDimensionResultDF = spark.read.jdbc(s"jdbc:clickhouse://$ckHost:$ckPort/$ckDatabase", s"($firstDimensionSql) temp", connectionProperties)
val schema = firstDimensionResultDF.schema

2.在分布式partition内部将每行数据转化为Row对象并添加到ListBuffer[Row]对象中,最后将所有分区输出结果Iterator合并为一个数据集RDD:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
//分布式获取最终结果
val resultRDD = dimensionListDF.rdd.mapPartitionsWithIndex(
(partitionKey,rows) => {
var key:Int = partitionKey
val hostAddress:String = clusterInfoMap.get(partitionKey).map(_._1).getOrElse("")
val port:Int =clusterInfoMap.get(partitionKey).map(_._2).getOrElse(0)

val conn = clickhouseClient.getConnection(hostAddress, port, ckUser, ckPassword, ckDatabase)
val statement = conn.createStatement()
//批次条数计数器
var counter = 0
val dimensionValueList = ListBuffer[String]()
val resultRows = ListBuffer[Row]()

//依次处理每条记录
for (row <- rows) {
counter += 1
dimensionValueList.append(row.get(row.fieldIndex(dimensionName)).toString)
if (counter >= ckFetchsize) {
val executeSql = ckSql.split(ckDatabase + "." + ckTable)(0) + ckDatabase + "." + ckTable +
ckSql.split(ckDatabase + "." + ckTable)(1).
replaceFirst("WHERE", " WHERE " + dimensionName + " in (" + dimensionValueList.map("'" + _ + "'").mkString(",") + ") AND")
println(executeSql)
val resultSet = statement.executeQuery(executeSql)
while (resultSet.next()) {
val rowData = (1 to schema.length).map(i => resultSet.getObject(i))
resultRows += new GenericRowWithSchema(rowData.toArray, schema)
}
counter = 0
dimensionValueList.clear()
}
}

if (counter > 0) {
val executeSql = ckSql.split(ckDatabase + "." + ckTable)(0) + ckDatabase + "." + ckTable +
ckSql.split(ckDatabase + "." + ckTable)(1).
replaceFirst("WHERE", " WHERE " + dimensionName + " in (" + dimensionValueList.map("'" + _ + "'").mkString(",") + ") AND")
println(executeSql)
val resultSet = statement.executeQuery(executeSql)
while (resultSet.next()) {
val rowData = (1 to schema.length).map(i => resultSet.getObject(i))
resultRows += new GenericRowWithSchema(rowData.toArray, schema)
}
}

conn.close()
resultRows.toIterator
}
)