ClickHouse_行业模块ClickHouse明细数据上卷实战

ClickHouse_行业模块ClickHouse明细数据上卷实战

一、前置操作

1.函数原理

uniqState()

uniqState(element):该函数用于将传入的元素转化为去重中间聚合状态AggregateFunction state,就是将去重指标的字段元素以bitmap的形式存储起来,以便在此基础上再次进行聚合计算。一般输入就是string类型,输出就是AggregateFunction中间态类型。

uniqMergeState()

uniqMergeState(states):该函数用于将通过uniqState函数获得的多个中间状态合并为一个中间状态,就是再次去重再以bitmap的形式存储起来。输入是AggregateFunction中间态类型,输出也是AggregateFunction中间态类型。

uniqMerge()

uniqMerge(state):该函数用于合并由 uniqState() 产生的多个聚合状态,并计算出最终的去重计数结果,就是输出最终的去重结果数。输入是AggregateFunction中间态类型,输出通常是UInt64类型。

2.建表

明细基表建表语句

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
CREATE TABLE bc_online.ck_zh_industry_product_union_all
(
`dateTime` String,
`secondIndId` String,
`thirdIndId` String,
`skuId` String,
`skuName` String,
`sonBrandId` String,
`sonBrandName` String,
`priceRangeId` String,
`secondPriceRangeId` String,
`priceRangeName` String,
`secondPriceRangeName` String,
`shopId` String,
`shopName` String,
`terminalId` String,
`shopType` String,
`pv` Int64,
`browseAcct` String,
`dealAmt` Float64,
`dealProNum` Int64,
`dealAcct` String,
`cartAcct` String,
`followAcct` String,
`searchClickNum` Int64,
`browseSku` String,
`dealSku` String
)
ENGINE = ReplicatedMergeTree('/ckpub18.olap.jd.com/tables/bc_online/ck_zh_industry_product_union_all/{shard}', '{replica}')
PARTITION BY dateTime
ORDER BY (dateTime, secondIndId, thirdIndId, shopType, terminalId, priceRangeId, secondPriceRangeId)
SETTINGS storage_policy = 'jdob_ha', index_granularity = 8192

上卷表建表语句

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
CREATE TABLE bc_online.ck_zh_industry_product_union_all_agg
(
`dateTime` String,
`skuId` String,
`skuName` String,
`shopId` String,
`shopName` String,
`sonBrandId` String,
`sonBrandName` String,
`terminalId` String,
`shopType` String,
`secondIndId` String,
`thirdIndId` String,
`priceRangeId` String,
`secondPriceRangeId` String,
`priceRangeName` String,
`secondPriceRangeName` String,
`pv` AggregateFunction(sum, Int64),
`browseAcct` AggregateFunction(uniq, Nullable(String)),
`dealAmt` AggregateFunction(sum, Float64),
`dealProNum` AggregateFunction(sum, Int64),
`dealAcct` AggregateFunction(uniq, Nullable(String)),
`cartAcct` AggregateFunction(uniq, Nullable(String)),
`followAcct` AggregateFunction(uniq, Nullable(String)),
`searchClickNum` AggregateFunction(sum, Int64),
`browseSku` AggregateFunction(uniq, Nullable(String)),
`dealSku` AggregateFunction(uniq, Nullable(String))
)
ENGINE = ReplicatedMergeTree('/ckpub18.olap.jd.com/tables/bc_online/ck_zh_industry_product_union_all_agg/{shard}', '{replica}')
PARTITION BY dateTime
ORDER BY (dateTime, secondIndId, thirdIndId, shopType, terminalId, priceRangeId, secondPriceRangeId)
SETTINGS index_granularity = 8192

二、明细上卷脚本逻辑步骤

1.获取集群所有节点列表

1
curl -u ads:1qaz^RFV 'http://ckpub18.olap.jd.com:2000/?' -d "select host_name from (select shard_num,concat(host_name,':',(case when port=9600 then '8623' when port=9700 then '8723' when port=9800 then '8823' when port=9900 then '8923' else '8123' end)) as host_name from system.clusters where cluster='LF0_CK_Pub_18' group by shard_num,host_name,port order by shard_num)"
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
select
host_name
from
(
select
shard_num,
concat(host_name,
':',
(case
when port = 9600 then '8623'
when port = 9700 then '8723'
when port = 9800 then '8823'
when port = 9900 then '8923'
else '8123'
end)) as host_name
from
system.clusters
where
cluster = 'LF0_CK_Pub_18'
group by
shard_num,
host_name,
port
order by
shard_num);

通过一下两个语句可以看出,实际上只有30台物理服务器,但是通过不同端口,可以实现一个逻辑上达到120个节点的集群:

1
2
3
SELECT * FROM system.clusters;

SELECT host_name FROM system.clusters group by host_name;

注意:此处获取的是分片+副本去重的所有节点,用于建表。

2.在每个节点上建立聚合表本地表

1
curl -u ads:1qaz^RFV 'http://10.203.23.163:8623/?' -d "CREATE TABLE IF NOT EXISTS bc_online.ck_zh_industry_product_union_all_agg (dateTime String, skuId String, skuName String, shopId String, shopName String, sonBrandId String, sonBrandName String, terminalId String, shopType String,secondIndId String, thirdIndId String, priceRangeId String,secondPriceRangeId String, priceRangeName String,secondPriceRangeName String, pv AggregateFunction(sum, Int64), browseAcct AggregateFunction(uniq, Nullable(String)),dealAmt AggregateFunction(sum, Float64), dealProNum AggregateFunction(sum, Int64), dealAcct AggregateFunction(uniq, Nullable(String)), cartAcct AggregateFunction(uniq, Nullable(String)), followAcct AggregateFunction(uniq, Nullable(String)), searchClickNum AggregateFunction(sum, Int64), browseSku AggregateFunction(uniq, Nullable(String)), dealSku AggregateFunction(uniq, Nullable(String))) ENGINE = ReplicatedMergeTree('/ckpub18.olap.jd.com/tables/bc_online/ck_zh_industry_product_union_all_agg/{shard}', '{replica}') PARTITION BY dateTime ORDER BY (dateTime,secondIndId,thirdIndId,shopType,terminalId,priceRangeId,secondPriceRangeId) SETTINGS index_granularity = 8192"
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
CREATE TABLE IF NOT EXISTS bc_online.ck_zh_industry_product_union_all_agg 
(
dateTime String,
skuId String,
skuName String,
shopId String,
shopName String,
sonBrandId String,
sonBrandName String,
terminalId String,
shopType String,
secondIndId String,
thirdIndId String,
priceRangeId String,
secondPriceRangeId String,
priceRangeName String,
secondPriceRangeName String,
pv AggregateFunction(sum,Int64),
browseAcct AggregateFunction(uniq,Nullable(String)),
dealAmt AggregateFunction(sum,Float64),
dealProNum AggregateFunction(sum,Int64),
dealAcct AggregateFunction(uniq,Nullable(String)),
cartAcct AggregateFunction(uniq,Nullable(String)),
followAcct AggregateFunction(uniq,Nullable(String)),
searchClickNum AggregateFunction(sum,Int64),
browseSku AggregateFunction(uniq,Nullable(String)),
dealSku AggregateFunction(uniq,Nullable(String))
)
ENGINE = ReplicatedMergeTree('/ckpub18.olap.jd.com/tables/bc_online/ck_zh_industry_product_union_all_agg/{shard}','{replica}')
PARTITION BY dateTime
ORDER BY (dateTime,secondIndId,thirdIndId,shopType,terminalId,priceRangeId,secondPriceRangeId) SETTINGS index_granularity = 8192

3.在每个节点上创建聚合表分布式表

1
curl -u ads:1qaz^RFV 'http://10.203.23.163:8623/?' -d "CREATE TABLE IF NOT EXISTS bc_online.ck_zh_industry_product_union_all_agg_d (dateTime String, skuId String, skuName String, shopId String, shopName String, sonBrandId String, sonBrandName String, terminalId String, shopType String,secondIndId String, thirdIndId String, priceRangeId String,secondPriceRangeId String, priceRangeName String,secondPriceRangeName String, pv AggregateFunction(sum, Int64), browseAcct AggregateFunction(uniq, Nullable(String)),dealAmt AggregateFunction(sum, Float64), dealProNum AggregateFunction(sum, Int64), dealAcct AggregateFunction(uniq, Nullable(String)), cartAcct AggregateFunction(uniq, Nullable(String)), followAcct AggregateFunction(uniq, Nullable(String)), searchClickNum AggregateFunction(sum, Int64), browseSku AggregateFunction(uniq, Nullable(String)), dealSku AggregateFunction(uniq, Nullable(String))) ENGINE = Distributed(LF0_CK_Pub_18, bc_online, ck_zh_industry_product_union_all_agg_d, rand())"

4.再次获取集群节点列表

1
curl -u ads:1qaz^RFV 'http://ckpub18.olap.jd.com:2000/?' -d "select host_name from (select shard_num,concat(host_name,':',(case when port=9600 then '8623' when port=9700 then '8723' when port=9800 then '8823' when port=9900 then '8923' else '8123' end)) as host_name from system.clusters where cluster='LF0_CK_Pub_18' group by shard_num,host_name,port order by shard_num)"
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
select
host_name
from
(
select
shard_num,
concat(host_name,
':',
(case
when port = 9600 then '8623'
when port = 9700 then '8723'
when port = 9800 then '8823'
when port = 9900 then '8923'
else '8123'
end)) as host_name
from
system.clusters
where
cluster = 'LF0_CK_Pub_18'
group by
shard_num,
host_name,
port
order by
shard_num);

注意:此处获取的是分片+副本去重的所有节点,用于删除分区。对于ReplicatedMergeTree引擎的表,其实drop、move、replace、attach等分区操作也都只需要在相同分片的一个副本上执行,对应的操作会被记录在ZooKeeper中,其他副本会监控zk中的指令并应用相同的操作,确保所有副本数据一致。如果在多个副本上都执行这些操作,反而可能引发错误导致数据问题,此处drop分区操作倒是不会导致数据问题,大不了多删几次,但是move、replace、attach等增改数据可能会导致数据问题。甚至建表和删表操作都只需要在一个副本上执行就行。

5.先删除聚合表中的当天分区

1
curl -u ads:1qaz^RFV 'http://10.203.23.163:8623/?' -d "alter table bc_online.ck_zh_industry_product_union_all_agg drop partition '2022-12-05' ;" &
1
alter table bc_online.ck_zh_industry_product_union_all_agg drop partition '2022-12-05';

6.再次获取集群节点列表

1
curl -u ads:1qaz^RFV 'http://ckpub18.olap.jd.com:2000/?' -d "select host_name from (select shard_num,any(concat(host_name,':',(case when port=9600 then '8623' when port=9700 then '8723' when port=9800 then '8823' when port=9900 then '8923' else '8123' end))) as host_name from system.%s where cluster='LF0_CK_Pub_18' group by shard_num order by shard_num)"
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
select
host_name
from
(
select
shard_num,
concat(host_name,
':',
(case
when port = 9600 then '8623'
when port = 9700 then '8723'
when port = 9800 then '8823'
when port = 9900 then '8923'
else '8123'
end)) as host_name
from
system.clusters
where
cluster = 'LF0_CK_Pub_18'
group by
shard_num,
host_name,
port
order by
shard_num);

注意:此处获取的是按分片去重的节点,也就是每个分片选出一个副本即可,用于插入数据,其他副本借助clickhouse的ReplicatedMergeTree引擎的特性自动迁移即可。

7.查询基表将数据插入聚合表

本地到本地插入

1
curl -u ads:1qaz^RFV 'http://10.203.23.163:8623/?' -d "insert into bc_online.ck_zh_industry_product_union_all_agg select dateTime,skuId,any(skuName) as skuName,any(shopId) as shopId,any(shopName) as shopName,any(sonBrandId) as sonBrandId,any(sonBrandName) as sonBrandName,terminalId,shopType,any(secondIndId) as secondIndId,any(thirdIndId) as thirdIndId,any(priceRangeId) as priceRangeId,any(secondPriceRangeId) as secondPriceRangeId,any(priceRangeName) as priceRangeName,any(secondPriceRangeName) as secondPriceRangeName,sumState(pv) as pv,uniqState(if(browseAcct = '',null,browseAcct)) as browseAcct,sumState(dealAmt) as dealAmt,sumState(dealProNum) as dealProNum,uniqState(if(dealAcct = '',null,dealAcct)) as dealAcct,uniqState(if(cartAcct = '',null,cartAcct)) as cartAcct,uniqState(if(followAcct = '',null,followAcct)) as followAcct,sumState(searchClickNum) as searchClickNum,uniqState(if(browseSku = '',null,browseSku)) as browseSku,uniqState(if(dealSku = '',null,dealSku)) as dealSku from bc_online.ck_zh_industry_product_union_all where dateTime>='2022-12-05' and dateTime<='2022-12-05' group by dateTime,skuId,terminalId,shopType;" &
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
insert
into
bc_online.ck_zh_industry_product_union_all_agg
select
dateTime,
skuId,
any(skuName) as skuName,
any(shopId) as shopId,
any(shopName) as shopName,
any(sonBrandId) as sonBrandId,
any(sonBrandName) as sonBrandName,
terminalId,
shopType,
any(secondIndId) as secondIndId,
any(thirdIndId) as thirdIndId,
any(priceRangeId) as priceRangeId,
any(secondPriceRangeId) as secondPriceRangeId,
any(priceRangeName) as priceRangeName,
any(secondPriceRangeName) as secondPriceRangeName,
sumState(pv) as pv,
uniqState(if(browseAcct = '',null,browseAcct)) as browseAcct,
sumState(dealAmt) as dealAmt,
sumState(dealProNum) as dealProNum,
uniqState(if(dealAcct = '',null,dealAcct)) as dealAcct,
uniqState(if(cartAcct = '',null,cartAcct)) as cartAcct,
uniqState(if(followAcct = '',null,followAcct)) as followAcct,
sumState(searchClickNum) as searchClickNum,
uniqState(if(browseSku = '',null,browseSku)) as browseSku,
uniqState(if(dealSku = '',null,dealSku)) as dealSku
from
bc_online.ck_zh_industry_product_union_all
where
dateTime >= '2022-12-05'
and dateTime <= '2022-12-05'
group by
dateTime,
skuId,
terminalId,
shopType;

8.分别查询聚合表分布式表和hive表中的数据行数,验证分布式表总行数与hive表中的是否一致。

1
curl -u ads:1qaz^RFV 'http://ckpub18.olap.jd.com:2000/?' -d "select count(1) from bc_online.ck_zh_industry_product_union_all_agg_d where dateTime>='2022-12-05' and dateTime<='2022-12-05';   "
1
select count(1) from bc_online.ck_zh_industry_product_union_all_agg_d where dateTime>='2022-12-05' and dateTime<='2022-12-05';
1
hive -e "select count(1) from (select dt,item_sku_id,terminal_type,trade_type from app.app_zh_industry_sku_union_index_d  where dt>='2022-12-05' and dt <= '2022-12-05' group by dt,item_sku_id,terminal_type,trade_type) tmp;"

三、原Python脚本

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
xml="false" 
cmd = """curl -u %s:%s 'http://%s:%s/?' -d "select host_name from (select shard_num,concat(host_name,':',(case when port=9600 then '8623' when port=9700 then '8723' when port=9800 then '8823' when port=9900 then '8923' else '8123' end)) as host_name from system.%s where cluster='%s' group by shard_num,host_name,port order by shard_num)" """ % (user,password,ip,port,systemTable,clusterName)

p = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE)
buff = p.stdout.readlines()

for ipele in buff:
ip1 = str(ipele, encoding='utf-8').strip()
print("ip1=============>"+ip1)
cmd="""curl -u %s:%s 'http://%s/?' -d "CREATE TABLE IF NOT EXISTS %s.%s (dateTime String, skuId String, skuName String, shopId String, shopName String, sonBrandId String, sonBrandName String, terminalId String, shopType String,secondIndId String, thirdIndId String, priceRangeId String,secondPriceRangeId String, priceRangeName String,secondPriceRangeName String, pv AggregateFunction(sum, Int64), browseAcct AggregateFunction(uniq, Nullable(String)),dealAmt AggregateFunction(sum, Float64), dealProNum AggregateFunction(sum, Int64), dealAcct AggregateFunction(uniq, Nullable(String)), cartAcct AggregateFunction(uniq, Nullable(String)), followAcct AggregateFunction(uniq, Nullable(String)), searchClickNum AggregateFunction(sum, Int64), browseSku AggregateFunction(uniq, Nullable(String)), dealSku AggregateFunction(uniq, Nullable(String)), spuId String) ENGINE = ReplicatedMergeTree('/%s/tables/bc_online/ck_zh_industry_product_union_all_agg/{shard}', '{replica}') PARTITION BY dateTime ORDER BY (dateTime,secondIndId,thirdIndId,shopType,terminalId,priceRangeId,secondPriceRangeId) SETTINGS index_granularity = 8192" """% (user,password,ip1,clickhouse_dataBase,clickhouse_table,ip)
print(cmd)
p = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE)
returncode = p.wait()
if returncode != 0:
print("create local table Failed! try again...")
p = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE)
if p.wait()!=0:
print(ip1)
print(cmd)
print("try failed! progress exit!")
sys.exit(1)

cmd_d="""curl -u %s:%s 'http://%s/?' -d "CREATE TABLE IF NOT EXISTS %s.%s_d (dateTime String, skuId String, skuName String, shopId String, shopName String, sonBrandId String, sonBrandName String, terminalId String, shopType String,secondIndId String, thirdIndId String, priceRangeId String,secondPriceRangeId String, priceRangeName String,secondPriceRangeName String, pv AggregateFunction(sum, Int64), browseAcct AggregateFunction(uniq, Nullable(String)),dealAmt AggregateFunction(sum, Float64), dealProNum AggregateFunction(sum, Int64), dealAcct AggregateFunction(uniq, Nullable(String)), cartAcct AggregateFunction(uniq, Nullable(String)), followAcct AggregateFunction(uniq, Nullable(String)), searchClickNum AggregateFunction(sum, Int64), browseSku AggregateFunction(uniq, Nullable(String)), dealSku AggregateFunction(uniq, Nullable(String)), spuId String) ENGINE = Distributed(%s, %s, %s, rand())" """% (user,password,ip1,clickhouse_dataBase,clickhouse_table,clusterName,clickhouse_dataBase,clickhouse_table)
print("cmd_d==============>"+cmd_d)
p = subprocess.Popen(cmd_d, shell=True, stdout=subprocess.PIPE)
returncode = p.wait()
if returncode != 0:
print("create distribute table Failed! try again...")
p = subprocess.Popen(cmd_d, shell=True, stdout=subprocess.PIPE)
if p.wait()!=0:
print(ip1)
print(cmd_d)
print("try failed! progress exit!")
sys.exit(1)

cmd2 = """curl -u %s:%s 'http://%s:%s/?' -d "select host_name from (select shard_num,concat(host_name,':',(case when port=9600 then '8623' when port=9700 then '8723' when port=9800 then '8823' when port=9900 then '8923' else '8123' end)) as host_name from system.%s where cluster='%s' group by shard_num,host_name,port order by shard_num)" """ % (user,password,ip,port,systemTable,clusterName)
p = subprocess.Popen(cmd2, shell=True, stdout=subprocess.PIPE)
buff = p.stdout.readlines()
for ipele in buff:
ip1 = str(ipele, encoding='utf-8').strip()
dropcmd="""curl -u %s:%s 'http://%s/?' -d "alter table %s.%s drop partition '%s' ;" & """% (user,password,ip1,clickhouse_dataBase,clickhouse_table,startDate)
p = subprocess.Popen(dropcmd, shell=True, stdout=subprocess.PIPE)
returncode = p.wait()
if returncode != 0:
print("insert table Failed! try again...")
p = subprocess.Popen(dropcmd, shell=True, stdout=subprocess.PIPE)
if p.wait()!=0:
print(ip1)
print(dropcmd)
print("try failed! progress exit!")
sys.exit(1)

time.sleep(20)

cmd3 = """curl -u %s:%s 'http://%s:%s/?' -d "select host_name from (select shard_num,any(concat(host_name,':',(case when port=9600 then '8623' when port=9700 then '8723' when port=9800 then '8823' when port=9900 then '8923' else '8123' end))) as host_name from system.%s where cluster='%s' group by shard_num order by shard_num)" """% (user,password,ip,port,systemTable,clusterName)
p = subprocess.Popen(cmd3, shell=True, stdout=subprocess.PIPE)
buff = p.stdout.readlines()
for ipele in buff:
ip1 = str(ipele, encoding='utf-8').strip()
insert_sql="""curl -u %s:%s 'http://%s/?' -d "insert into %s.%s select dateTime,skuId,any(skuName) as skuName,any(shopId) as shopId,any(shopName) as shopName,any(sonBrandId) as sonBrandId,any(sonBrandName) as sonBrandName,terminalId,shopType,any(secondIndId) as secondIndId,any(thirdIndId) as thirdIndId,any(priceRangeId) as priceRangeId,any(secondPriceRangeId) as secondPriceRangeId,any(priceRangeName) as priceRangeName,any(secondPriceRangeName) as secondPriceRangeName,sumState(pv) as pv,uniqState(if(browseAcct = '',null,browseAcct)) as browseAcct,sumState(dealAmt) as dealAmt,sumState(dealProNum) as dealProNum,uniqState(if(dealAcct = '',null,dealAcct)) as dealAcct,uniqState(if(cartAcct = '',null,cartAcct)) as cartAcct,uniqState(if(followAcct = '',null,followAcct)) as followAcct,sumState(searchClickNum) as searchClickNum,uniqState(if(browseSku = '',null,browseSku)) as browseSku,uniqState(if(dealSku = '',null,dealSku)) as dealSku, spuId from bc_online.ck_zh_industry_product_union_all where dateTime>='%s' and dateTime<='%s' group by dateTime,skuId,terminalId,shopType,spuId;" & """% (user,password,ip1,clickhouse_dataBase,clickhouse_table,startDate,endDate)
print(insert_sql)
p = subprocess.Popen(insert_sql, shell=True, stdout=subprocess.PIPE)
returncode = p.wait()
if returncode != 0:
print("insert table Failed! try again...")
p = subprocess.Popen(insert_sql, shell=True, stdout=subprocess.PIPE)
if p.wait()!=0:
print(ip1)
print(insert_sql)
print("try failed! progress exit!")
sys.exit(1)

time.sleep(120)

#校验条数

cmdAggCnt = """curl -u %s:%s 'http://%s:%s/?' -d "select count(1) from %s.%s_d where dateTime>='%s' and dateTime<='%s'; " """% (user,password,ip,port,clickhouse_dataBase,clickhouse_table,startDate,endDate)
print("cmdAggCnt==============>"+cmdAggCnt)
pAgg = subprocess.Popen(cmdAggCnt, shell=True, stdout=subprocess.PIPE)
if pAgg.wait() != 0:
print("get agg table count Failed! try again...")
pAgg = subprocess.Popen(cmdAggCnt, shell=True, stdout=subprocess.PIPE)
if pAgg.wait()!=0:
print("try get agg table count failed! progress exit!")
sys.exit(1)
aggCntBuff = pAgg.stdout.readlines()

for aggCnts in aggCntBuff:
aggCnt = str(aggCnts, encoding='utf-8').strip()
print("aggCnt= " + aggCnt)

cmdOldCnt = """hive -e "select count(1) from (select dt,item_sku_id,terminal_type,trade_type,spu_id from %s.app_zh_industry_sku_union_index_d where dt>='%s' and dt <= '%s' group by dt,item_sku_id,terminal_type,trade_type,spu_id) tmp;" """% (APP, startDate, endDate)

print("cmdOldCnt==============>"+cmdOldCnt)
#
pOld = subprocess.Popen(cmdOldCnt, shell=True, stdout=subprocess.PIPE)
if pOld.wait() != 0:
print("get old table count Failed! try again...")
pOld = subprocess.Popen(cmdOldCnt, shell=True, stdout=subprocess.PIPE)
if pOld.wait()!=0:
print("try get old table count failed! progress exit!")
sys.exit(1)
oldCntBuff = pOld.stdout.readlines()
oldCnt = ''
for oldCnts in oldCntBuff:
oldCnt = str(oldCnts, encoding='utf-8').strip()

print("oldCnt= " + oldCnt)
if aggCnt != oldCnt:

print("data check failed! progress exit!")
sys.exit(1)
else:
print('data check Success!')