Clickhouse(以下简称ck)是战斗民族Yandex公司在2016年开源的专为在线数据分析而设计的高性能列式存储数据库,感兴趣的同学可以去ck官网学习一下。 由于在项目中的使用,需要支撑大流量下的ck写入,所以最近几天测试了一下使用go客户端的ck写入性能,主流的ck客户端有以下几种:
- clickhouse-go
- ch-go
- database/sql
- sqlx
- gorm
Clickhouse官方推荐了两种客户端,分别是clickhouse-go和ch-go,clickhouse-go是更高级别的api,但是选择哪种需要根据使用的模式和对性能的需求,对于每秒百万级别的插入,更推荐使用ch-go,对于低吞吐量和专注于数据分析来说,推荐使用前者。以下是ck官方文档中的一段话,可以参考一下。
Selecting a client library depends on your usage patterns and need for optimal performance. For insert heavy use cases, where millions of inserts are required per second, we recommend using the low level client ch-go. This client avoids the associated overhead of pivoting the data from a row-orientated format to columns, as the ClickHouse native format requires. Furthermore, it avoids any reflection or use of the
interface{}
(any
) type to simplify usage.For query workloads focused on aggregations or lower throughput insert workloads, the clickhouse-go provides a familiar
database/sql
interface and more straightforward row semantics. Users can also optionally use HTTP for the transport protocol and take advantage of helper functions to marshal rows to and from structs.
但开发不能单凭文档推荐,要根据实际情况,做性能测试之后,根据测试结果选择最优的方案。在做测试的过程中,笔者所用的go版本为1.19.6,因为clickhouse-go对go版本有要求:
Client Version | Golang Versions |
---|---|
=> 2.0 <= 2.2 | 1.17, 1.18 |
>= 2.3 | 1.18.4+, 1.19 |
性能测试
所有的测试都是用的同一个数据样本,单条数据1460字节大约1.4KB,数据库表字段689个,写入字段32个,每次最少写入80000条数据,因为ck官方说为了保证写入的性能,1KB左右的数据在5w-20w/s,不过写入的性能跟很多因素都挂钩,比如字段的多少,连接模式等等。用的测试机器是Linux系统,48核。测试结果我们用Grafana和Prometheus来监控。我们通过命令行参数的方式来来控制连接、数据量和协程数量等。
1 host = flag.String("host", "locakhouse:9000", "clickhouse host")
2 user = flag.String("user", "root", "clichouse user")
3 pass = flag.String("password", "123456", "clickhouse password")
4 db = flag.String("db", "test", "clickhouse database")
5 g = flag.Int("g", 1, "write goroutine")
6 batch = flag.Int("batch", 100000, "clickhouse batch")
7 tb = flag.String("tb", "event", "clickhouse table")
8 debug = flag.Bool("debug", false, "clickhouse debug")
9 flag.Parse()
Clickhouse-go
clickhouse-go有两个版本v1和v2,从官方benchmark结果来看,v2版本的性能要比v1版本的性能好,本着用新不用旧的原则😄,我们直接用v2版本的开始测试。有两种批量写入方式,一种是本机的api,一种是通过字段映射批量写入,我们先来看第一种方式。
Write by clickhouse api
首先需要建立连接,然后根据命令行参数启动写入的协程:
1 conn, err := clickhouse.Open(&clickhouse.Options{
2 Addr: []string{*host},
3 Auth: clickhouse.Auth{
4 Database: *db,
5 Username: *user,
6 Password: *pass,
7 },
8 Settings: clickhouse.Settings{
9 "max_execution_time": 60,
10 },
11 MaxOpenConns: 50,
12 MaxIdleConns: 10,
13 DialTimeout: 5 * time.Second,
14 Debug: *debug,
15 })
16 if err != nil {
17 panic(any(err))
18 }
19 if err := conn.Ping(context.Background()); err != nil {
20 panic(any(err))
21 }
22 for i:=0; i<*g; i++ {
23 go func() {
24 for {
25 insert(conn, *batch)
26 }
27 }()
28 }
写入的过程也比较简单,首先需要解析我们的插入语句得到一个batch,然后将这一批的数据追加到batch中,最后通过Send方法将数据写入到ck,总体来说真正的写入是在Send中实现的,在此之前的操作都会放到内存中处理。最后通过整体用时跟数据量计算得出一个速率。
1func insert(conn driver.Conn, batch int) {
2 start := time.Now().UnixNano()
3 ctx := context.Background()
4 bt, err := conn.PrepareBatch(ctx, data.InsertSql)
5 if err != nil {
6 log.Fatal(err)
7 }
8 s := time.Now().UnixNano()
9 for i := 0; i < batch; i++ {
10 err := bt.Append(data.InsertData.Ccollectorip, data.InsertData.Cdevip, data.InsertData.Cdevtype, data.InsertData.Cdstip, data.InsertData.Ceventmsg,
11 data.InsertData.Ceventname, data.InsertData.Ceventtype, data.InsertData.Cflowdir, data.InsertData.Cobject, data.InsertData.Coperation,
12 data.InsertData.Csrcip, data.InsertData.Csrclogtype, data.InsertData.LogypeTag, data.InsertData.Manufacturer, data.InsertData.Reason,
13 data.InsertData.Iappprotocol, data.InsertData.Icollecttype, data.InsertData.Idstport, data.InsertData.Ieventlevel, data.InsertData.Imergecount,
14 data.InsertData.Iprotocol, data.InsertData.Isrcport, data.InsertData.Lduration, data.InsertData.Lrecivepack, data.InsertData.Lsendpack,
15 data.InsertData.RequestBodyLen, data.InsertData.ResponseBodyLen, data.InsertData.Lendtime, data.InsertData.Lid, time.Now().UnixNano()/1e9,
16 data.InsertData.Lrecepttime, data.InsertData.Lstartime)
17 if err != nil {
18 log.Fatal(err)
19 }
20 }
21 e := time.Now().UnixNano()
22 bt.Send()
23 end := time.Now().UnixNano()
24 fmt.Printf("记录数: %d, 耗时: %d, 循环准备耗时: %d, 速率: %d(条/秒) \n", batch, (end-start)/1e6, (e-s)/1e6, int64(batch*1000000*1000)/(end-start))
25}
测试结果:
从打印结果来看差不多速率在6w-7w/s之间,再来看一下grafana也是平均6.5w/s左右。
最终结果:单线程下,clickhouse-go的写入速率在6w/s。
Write by column
连接跟之前都是一样的,写入的处理略有区别,需要预先定义插入字段的类型,我们提前将数据放到定义好的字段中。
1var (
2 ccollectorip, cdevip, cdevtype, cdstip, ceventmsg, ceventname, ceventtype, cflowdir, cobject, coperation, csrcip, csrclogtype,
3 logtype_tag, manufacturer, reason []string
4 iappprotocol, icollecttype, idstport, ieventlevel, imergecount, iprotocol, isrcport []int32
5 lduration, lrecivepack, lsendpack, request_body_len, response_body_len, lendtime, lid, loccurtime, lrecepttime, lstartime []int64
6)
7
8for i := 0; i < *batch; i++ {
9 ccollectorip = append(ccollectorip, data.InsertData.Ccollectorip)
10 cdevip = append(cdevip, data.InsertData.Cdevip)
11 cdevtype = append(cdevtype, data.InsertData.Cdevtype)
12 cdstip = append(cdstip, data.InsertData.Cdstip)
13 ceventmsg = append(ceventmsg, data.InsertData.Ceventmsg)
14 ceventname = append(ceventname, data.InsertData.Ceventname)
15 ceventtype = append(ceventtype, data.InsertData.Ceventtype)
16 cflowdir = append(cflowdir, data.InsertData.Cflowdir)
17 cobject = append(cobject, data.InsertData.Cobject)
18 coperation = append(coperation, data.InsertData.Coperation)
19 csrcip = append(csrcip, data.InsertData.Csrcip)
20 csrclogtype = append(csrclogtype, data.InsertData.Csrclogtype)
21 logtype_tag = append(logtype_tag, data.InsertData.LogypeTag)
22 manufacturer = append(manufacturer, data.InsertData.Manufacturer)
23 reason = append(reason, data.InsertData.Reason)
24 iappprotocol = append(iappprotocol, data.InsertData.Iappprotocol)
25 icollecttype = append(icollecttype, data.InsertData.Icollecttype)
26 idstport = append(idstport, data.InsertData.Idstport)
27 ieventlevel = append(ieventlevel, data.InsertData.Ieventlevel)
28 imergecount = append(imergecount, data.InsertData.Imergecount)
29 iprotocol = append(iprotocol, data.InsertData.Iprotocol)
30 isrcport = append(isrcport, data.InsertData.Iprotocol)
31 lduration = append(lduration, data.InsertData.Lduration)
32 lrecivepack = append(lrecivepack, data.InsertData.Lrecivepack)
33 lsendpack = append(lsendpack, data.InsertData.Lsendpack)
34 request_body_len = append(request_body_len, data.InsertData.RequestBodyLen)
35 response_body_len = append(response_body_len, data.InsertData.ResponseBodyLen)
36 lendtime = append(lendtime, data.InsertData.Lendtime)
37 lid = append(lid, data.InsertData.Lid)
38 loccurtime = append(loccurtime, data.InsertData.Loccurtime)
39 lrecepttime = append(lrecepttime, data.InsertData.Lrecepttime)
40 lstartime = append(lstartime, data.InsertData.Lstartime)
41 }
写入的时候也是需要先解析为batch,然后再将我们预先定义好的字段映射到batch的column上,最后再通过Send写入到ck。
1func insert(conn driver.Conn, batch int) {
2 start := time.Now().UnixNano()
3 ctx := context.Background()
4 bt, err := conn.PrepareBatch(ctx, data.InsertSql)
5 if err != nil {
6 log.Fatal(err)
7 }
8 s := time.Now().UnixNano()
9
10 bt.Column(0).Append(ccollectorip)
11 bt.Column(1).Append(cdevip)
12 bt.Column(2).Append(cdevtype)
13 bt.Column(3).Append(cdstip)
14 bt.Column(4).Append(ceventmsg)
15 bt.Column(5).Append(ceventname)
16 bt.Column(6).Append(ceventtype)
17 bt.Column(7).Append(cflowdir)
18 bt.Column(8).Append(cobject)
19 bt.Column(9).Append(coperation)
20 bt.Column(10).Append(csrcip)
21 bt.Column(11).Append(csrclogtype)
22 bt.Column(12).Append(logtype_tag)
23 bt.Column(13).Append(manufacturer)
24 bt.Column(14).Append(reason)
25 bt.Column(15).Append(iappprotocol)
26 bt.Column(16).Append(icollecttype)
27 bt.Column(17).Append(idstport)
28 bt.Column(18).Append(ieventlevel)
29 bt.Column(19).Append(imergecount)
30 bt.Column(20).Append(iprotocol)
31 bt.Column(21).Append(isrcport)
32 bt.Column(22).Append(lduration)
33 bt.Column(23).Append(lrecivepack)
34 bt.Column(24).Append(lsendpack)
35 bt.Column(25).Append(request_body_len)
36 bt.Column(26).Append(response_body_len)
37 bt.Column(27).Append(lendtime)
38 bt.Column(28).Append(lid)
39 bt.Column(29).Append(loccurtime)
40 bt.Column(30).Append(lrecepttime)
41 bt.Column(31).Append(lstartime)
42 e := time.Now().UnixNano()
43 bt.Send()
44 end := time.Now().UnixNano()
45 fmt.Printf("记录数: %d, 耗时: %d, 循环准备耗时: %d, 速率: %d(条/秒) \n", batch, (end-start)/1e6, (e-s)/1e6, int64(batch*1000000*1000)/(end-start))
46}
测试结果:
从测试结果来看,ck的写入速度差不多在每秒6w到7万之间,平均在6.5w/s,对比本机的api测试结果略有提升,这是为什么呢?在扒了一些文档和看了一下底层带吗后,发现通过字段写入这样做的好处是避免了行列的转换,我们都知道ck是一个列式存储数据库,并且因为我们预先定义好了字段的类型,在数据处理上也避免了一些类型判断和反射。
ch-go
建立连接都是大同小异,我们就不在细看了,直接来看写入的阶段
1func insert(conn *ch.Client, batch int) {
2 start := time.Now().UnixNano()
3 var (
4 ccollectorip, cdevip, cdevtype, cdstip, ceventmsg, ceventname, ceventtype, cflowdir, cobject, coperation, csrcip, csrclogtype,
5 logtype_tag, manufacturer, reason proto.ColStr
6 iappprotocol, icollecttype, idstport, ieventlevel, imergecount, iprotocol, isrcport proto.ColInt32
7 lduration, lrecivepack, lsendpack, request_body_len, response_body_len, lendtime, lid, loccurtime, lrecepttime, lstartime proto.ColInt64
8 )
9 for i := 0; i < batch; i++ {
10 ccollectorip.Append(data.InsertData.Ccollectorip)
11 cdevip.Append(data.InsertData.Cdevip)
12 cdevtype.Append(data.InsertData.Cdevtype)
13 cdstip.Append(data.InsertData.Cdstip)
14 ceventmsg.Append(data.InsertData.Ceventmsg)
15 ceventname.Append(data.InsertData.Ceventname)
16 ceventtype.Append(data.InsertData.Ceventtype)
17 cflowdir.Append(data.InsertData.Cflowdir)
18 cobject.Append(data.InsertData.Cobject)
19 coperation.Append(data.InsertData.Coperation)
20 csrcip.Append(data.InsertData.Csrcip)
21 csrclogtype.Append(data.InsertData.Csrclogtype)
22 logtype_tag.Append(data.InsertData.LogypeTag)
23 manufacturer.Append(data.InsertData.Manufacturer)
24 reason.Append(data.InsertData.Reason)
25 iappprotocol.Append(data.InsertData.Iappprotocol)
26 icollecttype.Append(data.InsertData.Icollecttype)
27 idstport.Append(data.InsertData.Idstport)
28 ieventlevel.Append(data.InsertData.Ieventlevel)
29 imergecount.Append(data.InsertData.Imergecount)
30 iprotocol.Append(data.InsertData.Iprotocol)
31 isrcport.Append(data.InsertData.Isrcport)
32 lduration.Append(data.InsertData.Lduration)
33 lrecivepack.Append(data.InsertData.Lrecivepack)
34 lsendpack.Append(data.InsertData.Lsendpack)
35 request_body_len.Append(data.InsertData.RequestBodyLen)
36 response_body_len.Append(data.InsertData.ResponseBodyLen)
37 lendtime.Append(data.InsertData.Lendtime)
38 lid.Append(data.InsertData.Lid)
39 loccurtime.Append(data.InsertData.Loccurtime)
40 lrecepttime.Append(data.InsertData.Lrecepttime)
41 lstartime.Append(data.InsertData.Lstartime)
42 }
43 input := proto.Input{
44 {Name: "ccollectorip", Data: &ccollectorip},
45 {Name: "cdevip", Data: &cdevip},
46 {Name: "cdevtype", Data: &cdevtype},
47 {Name: "cdstip", Data: &cdstip},
48 {Name: "ceventmsg", Data: &ceventmsg},
49 {Name: "ceventname", Data: &ceventname},
50 {Name: "ceventtype", Data: &ceventtype},
51 {Name: "cflowdir", Data: &cflowdir},
52 {Name: "cobject", Data: &cobject},
53 {Name: "coperation", Data: &coperation},
54 {Name: "csrcip", Data: &csrcip},
55 {Name: "csrclogtype", Data: &csrclogtype},
56 {Name: "logtype_tag", Data: &logtype_tag},
57 {Name: "manufacturer", Data: &manufacturer},
58 {Name: "reason", Data: &reason},
59 {Name: "iappprotocol", Data: &iappprotocol},
60 {Name: "icollecttype", Data: &icollecttype},
61 {Name: "idstport", Data: &idstport},
62 {Name: "ieventlevel", Data: &ieventlevel},
63 {Name: "imergecount", Data: &imergecount},
64 {Name: "iprotocol", Data: &iprotocol},
65 {Name: "isrcport", Data: &isrcport},
66 {Name: "lduration", Data: &lduration},
67 {Name: "lrecivepack", Data: &lrecivepack},
68 {Name: "lsendpack", Data: &lsendpack},
69 {Name: "request_body_len", Data: &request_body_len},
70 {Name: "response_body_len", Data: &response_body_len},
71 {Name: "lendtime", Data: &lendtime},
72 {Name: "lid", Data: &lid},
73 {Name: "loccurtime", Data: &loccurtime},
74 {Name: "lrecepttime", Data: &lrecepttime},
75 {Name: "lstartime", Data: &lstartime},
76 }
77 s := time.Now().UnixNano()
78 if err := conn.Do(context.Background(), ch.Query{
79 Body: insertSql,
80 Input: input,
81 }); err != nil {
82 log.Fatal(err)
83 }
84 end := time.Now().UnixNano()
85 fmt.Printf("记录数: %d, 耗时: %d, 循环准备耗时: %d, 速率: %d(条/秒) \n", batch, (end-start)/1e6, (s-start)/1e6, int64(batch*1000000*1000)/(end-start))
86}
写入的处理看起来跟clickhouse-go的字段写入很像,也是先定义好了数据结构,把结构和字段映射到一起,最终通过Do的方法写入ck。
测试结果:
ch-go的写入性能跟clickhouse-go的性能很接近,也是平均6.7w-6.8w/s,但是ch-go的写入速率非常平稳,相比来看,在大数据量写入的情况下ch-go是比较稳定的。
database/sql
sql库在连接ck的时候需要一个ck的引擎,用的也是clickhouse-go的引擎,本来想用v2版本进行测试,但是在连接的过程中出现了这个报错:
code: 81, message: Database default doesn’t exist
即使数据库存在也会报这个错误,所以在进行测试的时候选了v1版本的clickhouse-go引擎,从连接看就能够知道使用了clickhouse-go的引擎,写入的时候是使用了事务。
1 conn := clickhouse.OpenDB(&clickhouse.Options{
2 Addr: []string{*host},
3 Auth: clickhouse.Auth{
4 Database: *db,
5 Username: *user,
6 Password: *pass,
7 },
8 Settings: clickhouse.Settings{
9 "max_execution_time": 60,
10 },
11 DialTimeout: 5 * time.Second,
12 Debug: true,
13 })
1func insert(conn *sql.DB, batch int) {
2 start := time.Now().UnixNano()
3 tx, err := conn.Begin()
4 if err != nil {
5 log.Fatal(err)
6 }
7 stmt, err := tx.Prepare(data.InsertSql)
8 if err != nil {
9 log.Fatal(err)
10 }
11 s := time.Now().UnixNano()
12 for i := 0; i < batch; i++ {
13 _, err := stmt.Exec(data.InsertData.Ccollectorip, data.InsertData.Cdevip, data.InsertData.Cdevtype, data.InsertData.Cdstip, data.InsertData.Ceventmsg,
14 data.InsertData.Ceventname, data.InsertData.Ceventtype, data.InsertData.Cflowdir, data.InsertData.Cobject, data.InsertData.Coperation,
15 data.InsertData.Csrcip, data.InsertData.Csrclogtype, data.InsertData.LogypeTag, data.InsertData.Manufacturer, data.InsertData.Reason,
16 data.InsertData.Iappprotocol, data.InsertData.Icollecttype, data.InsertData.Idstport, data.InsertData.Ieventlevel, data.InsertData.Imergecount,
17 data.InsertData.Iprotocol, data.InsertData.Isrcport, data.InsertData.Lduration, data.InsertData.Lrecivepack, data.InsertData.Lsendpack,
18 data.InsertData.RequestBodyLen, data.InsertData.ResponseBodyLen, data.InsertData.Lendtime, data.InsertData.Lid, data.InsertData.Loccurtime,
19 data.InsertData.Lrecepttime, data.InsertData.Lstartime)
20 if err != nil {
21 log.Fatal(err)
22 }
23 }
24 e := time.Now().UnixNano()
25 tx.Commit()
26 stmt.Close()
27 end := time.Now().UnixNano()
28 fmt.Printf("记录数: %d, 耗时: %d, 循环准备耗时: %d, 速率: %d(条/秒) \n", batch, (end-start)/1e6, (e-s)/1e6, int64(batch*1000000*1000)/(end-start))
29}
测试结果:
从测试结果来看,每秒写入的速度平均为5.5w/s,消耗时间多的地方在Exec的方法上,也就是数据处理上。
sqlx
sqlx是在标准库database/sql上进行了扩展,提供了自己的一些查询或事务的方法,连接也非常简单。
1 conn, err := sqlx.Connect("clickhouse", fmt.Sprintf("tcp://%s?username=%s&password=%s&database=%s&debug=false", *host, *user, *pass, *db))
2 if err != nil {
3 panic(any(err))
4 }
数据的写入就跟database/sql是一样的,所以性能跟其很像,在这就不多做赘述了,有兴趣的可以自行研究一下。
Gorm
gorm是一个应用广泛的golang orm框架,因其功能全,支持原生sql,文档健全而备受好评,接下来我们看一下gorm在ck写入方面的性能如何。gorm的连接和写入也非常简单和暴力,就一个Creat方法就搞定,对于开发人员来说,更加喜欢这种简洁只记得方式。
1 dsn := fmt.Sprintf("clickhouse://%s:%s@%s/%s", *user, *pass, *host, *db)
2 db, err := gorm.Open(clickhouse.Open(dsn), &gorm.Config{})
3 if err != nil {
4 panic(any(err))
5 }
1func insert(db *gorm.DB, batch int) {
2 start := time.Now().UnixNano()
3 result := make([]data.Data, 0, batch)
4 for i := 0; i < 100000; i++ {
5 result = append(result, data.InsertData)
6 }
7 db.Table("event").Create(&result)
8 end := time.Now().UnixNano()
9 fmt.Printf("记录数: %d, 耗时: %d, 速率: %d(条/秒) \n", 100000, (end-start)/1e6, int64(100000*1000000*1000)/(end-start))
10
11}
测试结果:
从测试结果来看,gorm的ck写入性能却不尽人意,只能做到4w/s,不过也没有对gorm抱有太大希望,因为gorm的优势在于便利的api,在项目中也用gorm对mysql进行查询和写入,也是非常丝滑,性能并没有什么问题。对于ck性能低,也许是跟gorm的ck社区不是很活跃,关注的重点不在这上面。
测试结果
clickhouse-go | ch-go | database/sql | Sql | grom |
---|---|---|---|---|
6.5w/s (column) | 6.7w/s | 5.5w/s | 6w/s | 4w/s |
结论
从以上的测试结果来看,官方推荐的clickhouse-go和ch-go更具有优势,所以选择其中任意一种均可,但如果在字段数量小的情况下,笔者任务ch-go的写入更具有优势,毕竟clickhouse-go更专注于低吞吐量和数据分析查询。因此这两个库需要看自己的使用而定。
以上测试并不代表任何情况下的ck写入性能,具体情况还需要自己亲自试过才要决定。