ClickHouse2HBase组件_JavaSqlResultSet转化为SparkSqlDataFrame
1.背景
当前预计算任务主要基于Spark集群进行数据计算再推到HBase等kv数据库,Spark离线计算集群按资源使用率进行计费,费用高昂;且在分区数据量级1亿场景下,Spark引擎计算效率不如ClickHouse引擎,考虑通过ClickHouse凌晨生产时段进行预计算加工既可以节约成本,又可以提高时效。
但是直接在ClickHouse客户端提交一个对所有数据进行Cube预计算的命令很难成功,主要会面临两个问题:1)计算超时,当数据量和计算量较大时,单条命令执行时间较长,一般平台设置即席查询超限时间20s,沟通后最多提升到600s,对大批量有聚合计算的查询仍有瓶颈;2)本地节点内存超限,当数据量较大时,从远程节点返回给本地节点进行聚合计算的数据量也会很大,易导致内存超限。
为了解决上述业务场景问题,想到了一个拆分方案:选取聚合维度中维度枚举值最多的那个维度,查询出其所有维值列表,然后将这些维值分批切片,按批次来执行预计算,每批只筛选其中部分维值对应数据进行预计算,依次执行完所有维值数据。那么要获取并存储高基维值,并适当使用并发,Spark引擎是不二之选,在开发过程中遇到一个问题,我们需要把ClickHouse分片分批执行的ResultSet结果合并起来,并转化为DataFrame方便后续继续进行各种并发的转换和执行操作。
2.java.sql.ResultSet转化为spark.sql.DataFrame解析
2.1 从java.sql.ResultSet中获取查询结果schema
在Java里,可以使用ResultSet.getMetaData()方法获得ResultSetMetaData对象,ResultSetMetaData对象中保存了ResultSet中数据的schema信息。
ResultSetMetaData类提供了一系列的方法来获取关于列的详细信息,比如列名、列类型、列的数量等,以下是一些常用方法:
getColumnCount():返回ResultSet中列的数量。
getColumnName(int index):根据索引获取指定列的名称。
getColumnTypeName(int index):根据索引获取指定列的数据库特定的类型名称。
getColumnType(int index):根据索引返回指定列的SQL类型。
isNullable(int index):根据索引表示指定列的值是否可以为 null。
getPrecision(int index):根据索引返回指定列的精度。
getScale(int index):根据索引获取指定列的小数点右边的位数。
2.2 手动转换步骤
要将一个JDBC查询结果集并入Spark数据集中,需要将ResultSet中的数据逐行转化为Row对象,还需要获取到ResultSet中数据的Schema,然后使用spark.createDataFrame()创建DataFrame对象。如下代码是一个简单的手动转换案例:
1 | public class ConvertResultSetToSparkRow { |
2.3 Spark快捷转换方法
Spark提供的DataFrameReader.jdbc()方法对上述转换步骤进行了封装,自动处理了ResultSet到Spark的Row对象的转换,同时根据数据查询结果自动推断出schema,无须我们再手动实现上述代码,可以直接从JDBC源读取数据的schema和dataframe。使用案例如下:
1 | public class JdbcToDataFrameExample { |
Spark.read.load()方法本质同样也是借助DataFrameReader类进行自动处理,使用案例如下:
1 | //通过标准维度枚举值的第一行获取最终结果的schema |
3.手动将java.sql.ResultSet转化为spark.sql.DataFrame实战
在第一节背景下,在分布式场景中,直接使用Spark的DataFrameReader对ClickHouse预计算结果数据进行获取和处理会有超时和超内存问题,现在需要手动将分批分片执行结果转化为DataFrame,方便后续并发转换和写入其他引擎操作。
1.首先还是借助DataFrameReader执行一行预计算数据获取到ResultSet的schema,后续多处都需要用到:
1 | //通过标准维度枚举值的第一行获取最终结果的schema |
2.在分布式partition内部将每行数据转化为Row对象并添加到ListBuffer[Row]对象中,最后将所有分区输出结果Iterator合并为一个数据集RDD:
1 | //分布式获取最终结果 |