ClickHouse2HBase组件_JavaSql中的PreparedStatement原理与应用实战

ClickHouse2HBase组件_JavaSql中的PreparedStatement原理与应用实战

1.简介

PreparedStatement是statement的子类,它的实例对象可以通过调用Connection.preparedStatement()方法来获得。PreparedStatement相比于statement主要有两个优点:PreparedStatement更加安全的的,使用了带?占位符的SQL语句,防止SQL注入,减少语法错误;PreparedStatement可以对SQL进行预编译,支持批量更新和插入,需要执行大量相似的SQL语句时,它提供更好的性能,另外Statement会使得数据库频繁编译SQL,有可能会造成数据库缓冲区溢出。

PreparedStatement的常见sql如下:

1
2
3
String sql1 = "select * from  student where StudentNo=?and LoginPwd =?";

String sql2 = "insert into student (StudentNo,LoginPwd,StudentName,Sex,GradeId,Phone,Address,BornDate,Email,IdentityCard)values(?,?,?,?,?,?,?,?,?,?)";

2.PreparedStatement的主要方法

2.1 设置?占位符对应参数值

setInt(index, value):设置sql语句中指定索引位置的参数值为一个int类型。

setString(index, value):设置sql语句中指定索引位置的参数值为一个String类型。

setDouble(index, value):设置sql语句中指定索引位置的参数值为一个double类型。

setObject(index, value):它是个多态方法,允许设置任何Java对象到sql语句中指定索引位置的参数值,该对象的类型必须与数据库中的字段类型兼容,如果数据库不理解对象类型,驱动程序将使用Object.toString()方法将其序列化为一个字符串。

2.2 执行方法

executeQuery():执行SELECT类型的SQL语句,返回ResultSet对象,该对象包含了查询的结果。

executeUpdate():执行更新数据库的SQL语句(如 INSERT、UPDATE或DELETE),返回一个int值,表示影响的行数。

execute():在不确定 SQL 语句类型时使用(即可能是 SELECT、INSERT、UPDATE、DELETE 等类型),如果返回true,说明执行的是查询操作,并且可以通过getResultSet()获取数据;如果返回false,说明执行的是更新操作。

2.3 批量处理方法

批处理方法addBatch()和executeBatch()主要用于执行大量更新操作,比起单条更新可以大幅度提高性能。

addBatch():将一组参数添加到PreparedStatement的批处理命令中,注意在执行addBatch()之前,必须先用setXXX方法来设置参数。

executeBatch():向数据库发送一批命令以进行批处理执行,注意在执行executeBatch()之前,必须先用addBatch() 方法将一些拼装完成的命令添加到批处理中。

2.4 其他方法

clearParameters():清除当前所有已设置的参数值。

getMetaData():如果PreparedStatement对象是执行查询操作,则通过此方法可以获得关于返回ResultSet对象列的信息。注意需要在调用executeQuery()等执行SELECT方法之后才能调用getMetaData()方法获取元数据,否则可能会得到SQLException,或者返回null,或者返回不完整的元数据信息,具体取决于 JDBC 驱动程序的具体实现。

3.ClickHouse推数组件中PreparedStatement的应用实战

1.先使用schema和?组装SQL语句:

1
2
3
4
def generateInsertStatement(schema: StructType, dbName: String, tableName: String): String = {
val columns = schema.map(f => f.name).toList
val values = (1 to columns.length) map (_ => "?")
s"INSERT INTO $dbName.$tableName (${columns.mkString(",")}) VALUES (${values.mkString(",")})"

2.用带占位符的SQL语句作为入参,初始化构造PreparedStatement实例对象:

1
val currentPreparedStatement = connection.prepareStatement(insertSql)

3.轮询从Hive表中读取到的数据DataFrame,根据查询结果的schema、数据类型、数据值,使用setObject()依次将数据和数据类型填入InsertSQL命令中:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
schema.foreach { f =>
val fieldName = f.name
val fieldType = f.dataType
val fieldIdx = row.fieldIndex(fieldName)
var fieldVal = row.get(fieldIdx)

if (fieldVal != null) {
try{
if(fieldVal.isInstanceOf[mutable.WrappedArray.ofRef[_]]){
val dataArray = fieldVal.asInstanceOf[mutable.WrappedArray.ofRef[_]].toArray
currentPreparedStatement.setObject(fieldIdx + 1, dataArray)
}else if (fieldType.isInstanceOf[MapType]) {
val data_map=fieldVal.asInstanceOf[scala.collection.immutable.Map[_,_]].toMap
currentPreparedStatement.setObject(fieldIdx + 1, scala.collection.JavaConversions.mapAsJavaMap(data_map))
}else{
currentPreparedStatement.setObject(fieldIdx + 1, fieldVal)
}
}catch {
case e: Exception => {
e.printStackTrace()
throw new Exception("Hive字段:" + fieldName + ":" + fieldVal + ":" + f.dataType + "写入异常")
}
}
} else {
val defVal = client.defaultNullValue(f.dataType, fieldVal)
currentPreparedStatement.setObject(fieldIdx + 1, defVal)
}
}

4.在每行insert命令添加完参数之后,使用addBatch()将命令提交到批处理中:

1
currentPreparedStatement.addBatch()

5.按设定提交批次,执行executeBatch()批量提交处理方法:

1
2
3
4
5
if (counter >= batchSize) {
val r = currentPreparedStatement.executeBatch()
totalInsert += r.sum
counter = 0
}

6.注意在最后还要将批处理中没有达到batchSize的剩余insert命令,再执行一次executeBatch()批量处理。