主流的golang clickhouse客户端写入性能测试

clickhouse 性能对比

Posted by jarvis on Fri, Mar 10, 2023

Clickhouse(以下简称ck)是战斗民族Yandex公司在2016年开源的专为在线数据分析而设计的高性能列式存储数据库,感兴趣的同学可以去ck官网学习一下。 由于在项目中的使用,需要支撑大流量下的ck写入,所以最近几天测试了一下使用go客户端的ck写入性能,主流的ck客户端有以下几种:

  • clickhouse-go
  • ch-go
  • database/sql
  • sqlx
  • gorm


Selecting a client library depends on your usage patterns and need for optimal performance. For insert heavy use cases, where millions of inserts are required per second, we recommend using the low level client ch-go. This client avoids the associated overhead of pivoting the data from a row-orientated format to columns, as the ClickHouse native format requires. Furthermore, it avoids any reflection or use of the interface{} (any) type to simplify usage.

For query workloads focused on aggregations or lower throughput insert workloads, the clickhouse-go provides a familiar database/sql interface and more straightforward row semantics. Users can also optionally use HTTP for the transport protocol and take advantage of helper functions to marshal rows to and from structs.


Client Version Golang Versions
=> 2.0 <= 2.2 1.17, 1.18
>= 2.3 1.18.4+, 1.19



1	host = flag.String("host", "locakhouse:9000", "clickhouse host")
2	user = flag.String("user", "root", "clichouse user")
3	pass = flag.String("password", "123456", "clickhouse password")
4	db = flag.String("db", "test", "clickhouse database")
5	g = flag.Int("g", 1, "write goroutine")
6	batch = flag.Int("batch", 100000, "clickhouse batch")
7	tb = flag.String("tb", "event", "clickhouse table")
8	debug = flag.Bool("debug", false, "clickhouse debug")
9	flag.Parse()



Write by clickhouse api


 1	conn, err := clickhouse.Open(&clickhouse.Options{
 2		Addr: []string{*host},
 3		Auth: clickhouse.Auth{
 4			Database: *db,
 5			Username: *user,
 6			Password: *pass,
 7		},
 8		Settings: clickhouse.Settings{
 9			"max_execution_time": 60,
10		},
11		MaxOpenConns: 50,
12		MaxIdleConns: 10,
13		DialTimeout:  5 * time.Second,
14		Debug:        *debug,
15	})
16	if err != nil {
17		panic(any(err))
18	}
19	if err := conn.Ping(context.Background()); err != nil {
20		panic(any(err))
21	}
22	for i:=0; i<*g; i++ {
23		go func() {
24			for {
25				insert(conn, *batch)
26			}
27		}()
28	}


 1func insert(conn driver.Conn, batch int) {
 2	start := time.Now().UnixNano()
 3	ctx := context.Background()
 4	bt, err := conn.PrepareBatch(ctx, data.InsertSql)
 5	if err != nil {
 6		log.Fatal(err)
 7	}
 8	s := time.Now().UnixNano()
 9	for i := 0; i < batch; i++ {
10		err := bt.Append(data.InsertData.Ccollectorip, data.InsertData.Cdevip, data.InsertData.Cdevtype, data.InsertData.Cdstip, data.InsertData.Ceventmsg,
11			data.InsertData.Ceventname, data.InsertData.Ceventtype, data.InsertData.Cflowdir, data.InsertData.Cobject, data.InsertData.Coperation,
12			data.InsertData.Csrcip, data.InsertData.Csrclogtype, data.InsertData.LogypeTag, data.InsertData.Manufacturer, data.InsertData.Reason,
13			data.InsertData.Iappprotocol, data.InsertData.Icollecttype, data.InsertData.Idstport, data.InsertData.Ieventlevel, data.InsertData.Imergecount,
14			data.InsertData.Iprotocol, data.InsertData.Isrcport, data.InsertData.Lduration, data.InsertData.Lrecivepack, data.InsertData.Lsendpack,
15			data.InsertData.RequestBodyLen, data.InsertData.ResponseBodyLen, data.InsertData.Lendtime, data.InsertData.Lid, time.Now().UnixNano()/1e9,
16			data.InsertData.Lrecepttime, data.InsertData.Lstartime)
17		if err != nil {
18			log.Fatal(err)
19		}
20	}
21	e := time.Now().UnixNano()
22	bt.Send()
23	end := time.Now().UnixNano()
24	fmt.Printf("记录数: %d, 耗时: %d, 循环准备耗时: %d, 速率: %d(条/秒) \n", batch, (end-start)/1e6, (e-s)/1e6, int64(batch*1000000*1000)/(end-start))




Write by column


 1var (
 2	ccollectorip, cdevip, cdevtype, cdstip, ceventmsg, ceventname, ceventtype, cflowdir, cobject, coperation, csrcip, csrclogtype,
 3	logtype_tag, manufacturer, reason []string
 4	iappprotocol, icollecttype, idstport, ieventlevel, imergecount, iprotocol, isrcport                                       []int32
 5	lduration, lrecivepack, lsendpack, request_body_len, response_body_len, lendtime, lid, loccurtime, lrecepttime, lstartime []int64
 8for i := 0; i < *batch; i++ {
 9		ccollectorip = append(ccollectorip, data.InsertData.Ccollectorip)
10		cdevip = append(cdevip, data.InsertData.Cdevip)
11		cdevtype = append(cdevtype, data.InsertData.Cdevtype)
12		cdstip = append(cdstip, data.InsertData.Cdstip)
13		ceventmsg = append(ceventmsg, data.InsertData.Ceventmsg)
14		ceventname = append(ceventname, data.InsertData.Ceventname)
15		ceventtype = append(ceventtype, data.InsertData.Ceventtype)
16		cflowdir = append(cflowdir, data.InsertData.Cflowdir)
17		cobject = append(cobject, data.InsertData.Cobject)
18		coperation = append(coperation, data.InsertData.Coperation)
19		csrcip = append(csrcip, data.InsertData.Csrcip)
20		csrclogtype = append(csrclogtype, data.InsertData.Csrclogtype)
21		logtype_tag = append(logtype_tag, data.InsertData.LogypeTag)
22		manufacturer = append(manufacturer, data.InsertData.Manufacturer)
23		reason = append(reason, data.InsertData.Reason)
24		iappprotocol = append(iappprotocol, data.InsertData.Iappprotocol)
25		icollecttype = append(icollecttype, data.InsertData.Icollecttype)
26		idstport = append(idstport, data.InsertData.Idstport)
27		ieventlevel = append(ieventlevel, data.InsertData.Ieventlevel)
28		imergecount = append(imergecount, data.InsertData.Imergecount)
29		iprotocol = append(iprotocol, data.InsertData.Iprotocol)
30		isrcport = append(isrcport, data.InsertData.Iprotocol)
31		lduration = append(lduration, data.InsertData.Lduration)
32		lrecivepack = append(lrecivepack, data.InsertData.Lrecivepack)
33		lsendpack = append(lsendpack, data.InsertData.Lsendpack)
34		request_body_len = append(request_body_len, data.InsertData.RequestBodyLen)
35		response_body_len = append(response_body_len, data.InsertData.ResponseBodyLen)
36		lendtime = append(lendtime, data.InsertData.Lendtime)
37		lid = append(lid, data.InsertData.Lid)
38		loccurtime = append(loccurtime, data.InsertData.Loccurtime)
39		lrecepttime = append(lrecepttime, data.InsertData.Lrecepttime)
40		lstartime = append(lstartime, data.InsertData.Lstartime)
41	}


 1func insert(conn driver.Conn, batch int) {
 2	start := time.Now().UnixNano()
 3	ctx := context.Background()
 4	bt, err := conn.PrepareBatch(ctx, data.InsertSql)
 5	if err != nil {
 6		log.Fatal(err)
 7	}
 8	s := time.Now().UnixNano()
10	bt.Column(0).Append(ccollectorip)
11	bt.Column(1).Append(cdevip)
12	bt.Column(2).Append(cdevtype)
13	bt.Column(3).Append(cdstip)
14	bt.Column(4).Append(ceventmsg)
15	bt.Column(5).Append(ceventname)
16	bt.Column(6).Append(ceventtype)
17	bt.Column(7).Append(cflowdir)
18	bt.Column(8).Append(cobject)
19	bt.Column(9).Append(coperation)
20	bt.Column(10).Append(csrcip)
21	bt.Column(11).Append(csrclogtype)
22	bt.Column(12).Append(logtype_tag)
23	bt.Column(13).Append(manufacturer)
24	bt.Column(14).Append(reason)
25	bt.Column(15).Append(iappprotocol)
26	bt.Column(16).Append(icollecttype)
27	bt.Column(17).Append(idstport)
28	bt.Column(18).Append(ieventlevel)
29	bt.Column(19).Append(imergecount)
30	bt.Column(20).Append(iprotocol)
31	bt.Column(21).Append(isrcport)
32	bt.Column(22).Append(lduration)
33	bt.Column(23).Append(lrecivepack)
34	bt.Column(24).Append(lsendpack)
35	bt.Column(25).Append(request_body_len)
36	bt.Column(26).Append(response_body_len)
37	bt.Column(27).Append(lendtime)
38	bt.Column(28).Append(lid)
39	bt.Column(29).Append(loccurtime)
40	bt.Column(30).Append(lrecepttime)
41	bt.Column(31).Append(lstartime)
42	e := time.Now().UnixNano()
43	bt.Send()
44	end := time.Now().UnixNano()
45	fmt.Printf("记录数: %d, 耗时: %d, 循环准备耗时: %d, 速率: %d(条/秒) \n", batch, (end-start)/1e6, (e-s)/1e6, int64(batch*1000000*1000)/(end-start))





 1func insert(conn *ch.Client, batch int) {
 2	start := time.Now().UnixNano()
 3	var (
 4		ccollectorip, cdevip, cdevtype, cdstip, ceventmsg, ceventname, ceventtype, cflowdir, cobject, coperation, csrcip, csrclogtype,
 5		logtype_tag, manufacturer, reason proto.ColStr
 6		iappprotocol, icollecttype, idstport, ieventlevel, imergecount, iprotocol, isrcport                                       proto.ColInt32
 7		lduration, lrecivepack, lsendpack, request_body_len, response_body_len, lendtime, lid, loccurtime, lrecepttime, lstartime proto.ColInt64
 8	)
 9	for i := 0; i < batch; i++ {
10		ccollectorip.Append(data.InsertData.Ccollectorip)
11		cdevip.Append(data.InsertData.Cdevip)
12		cdevtype.Append(data.InsertData.Cdevtype)
13		cdstip.Append(data.InsertData.Cdstip)
14		ceventmsg.Append(data.InsertData.Ceventmsg)
15		ceventname.Append(data.InsertData.Ceventname)
16		ceventtype.Append(data.InsertData.Ceventtype)
17		cflowdir.Append(data.InsertData.Cflowdir)
18		cobject.Append(data.InsertData.Cobject)
19		coperation.Append(data.InsertData.Coperation)
20		csrcip.Append(data.InsertData.Csrcip)
21		csrclogtype.Append(data.InsertData.Csrclogtype)
22		logtype_tag.Append(data.InsertData.LogypeTag)
23		manufacturer.Append(data.InsertData.Manufacturer)
24		reason.Append(data.InsertData.Reason)
25		iappprotocol.Append(data.InsertData.Iappprotocol)
26		icollecttype.Append(data.InsertData.Icollecttype)
27		idstport.Append(data.InsertData.Idstport)
28		ieventlevel.Append(data.InsertData.Ieventlevel)
29		imergecount.Append(data.InsertData.Imergecount)
30		iprotocol.Append(data.InsertData.Iprotocol)
31		isrcport.Append(data.InsertData.Isrcport)
32		lduration.Append(data.InsertData.Lduration)
33		lrecivepack.Append(data.InsertData.Lrecivepack)
34		lsendpack.Append(data.InsertData.Lsendpack)
35		request_body_len.Append(data.InsertData.RequestBodyLen)
36		response_body_len.Append(data.InsertData.ResponseBodyLen)
37		lendtime.Append(data.InsertData.Lendtime)
38		lid.Append(data.InsertData.Lid)
39		loccurtime.Append(data.InsertData.Loccurtime)
40		lrecepttime.Append(data.InsertData.Lrecepttime)
41		lstartime.Append(data.InsertData.Lstartime)
42	}
43	input := proto.Input{
44		{Name: "ccollectorip", Data: &ccollectorip},
45		{Name: "cdevip", Data: &cdevip},
46		{Name: "cdevtype", Data: &cdevtype},
47		{Name: "cdstip", Data: &cdstip},
48		{Name: "ceventmsg", Data: &ceventmsg},
49		{Name: "ceventname", Data: &ceventname},
50		{Name: "ceventtype", Data: &ceventtype},
51		{Name: "cflowdir", Data: &cflowdir},
52		{Name: "cobject", Data: &cobject},
53		{Name: "coperation", Data: &coperation},
54		{Name: "csrcip", Data: &csrcip},
55		{Name: "csrclogtype", Data: &csrclogtype},
56		{Name: "logtype_tag", Data: &logtype_tag},
57		{Name: "manufacturer", Data: &manufacturer},
58		{Name: "reason", Data: &reason},
59		{Name: "iappprotocol", Data: &iappprotocol},
60		{Name: "icollecttype", Data: &icollecttype},
61		{Name: "idstport", Data: &idstport},
62		{Name: "ieventlevel", Data: &ieventlevel},
63		{Name: "imergecount", Data: &imergecount},
64		{Name: "iprotocol", Data: &iprotocol},
65		{Name: "isrcport", Data: &isrcport},
66		{Name: "lduration", Data: &lduration},
67		{Name: "lrecivepack", Data: &lrecivepack},
68		{Name: "lsendpack", Data: &lsendpack},
69		{Name: "request_body_len", Data: &request_body_len},
70		{Name: "response_body_len", Data: &response_body_len},
71		{Name: "lendtime", Data: &lendtime},
72		{Name: "lid", Data: &lid},
73		{Name: "loccurtime", Data: &loccurtime},
74		{Name: "lrecepttime", Data: &lrecepttime},
75		{Name: "lstartime", Data: &lstartime},
76	}
77	s := time.Now().UnixNano()
78	if err := conn.Do(context.Background(), ch.Query{
79		Body:  insertSql,
80		Input: input,
81	}); err != nil {
82		log.Fatal(err)
83	}
84	end := time.Now().UnixNano()
85	fmt.Printf("记录数: %d, 耗时: %d, 循环准备耗时: %d, 速率: %d(条/秒) \n", batch, (end-start)/1e6, (s-start)/1e6, int64(batch*1000000*1000)/(end-start))






code: 81, message: Database default doesn’t exist


 1	conn := clickhouse.OpenDB(&clickhouse.Options{
 2		Addr: []string{*host},
 3		Auth: clickhouse.Auth{
 4			Database: *db,
 5			Username: *user,
 6			Password: *pass,
 7		},
 8		Settings: clickhouse.Settings{
 9			"max_execution_time": 60,
10		},
11		DialTimeout: 5 * time.Second,
12		Debug:       true,
13	})
 1func insert(conn *sql.DB, batch int) {
 2	start := time.Now().UnixNano()
 3	tx, err := conn.Begin()
 4	if err != nil {
 5		log.Fatal(err)
 6	}
 7	stmt, err := tx.Prepare(data.InsertSql)
 8	if err != nil {
 9		log.Fatal(err)
10	}
11	s := time.Now().UnixNano()
12	for i := 0; i < batch; i++ {
13		_, err := stmt.Exec(data.InsertData.Ccollectorip, data.InsertData.Cdevip, data.InsertData.Cdevtype, data.InsertData.Cdstip, data.InsertData.Ceventmsg,
14			data.InsertData.Ceventname, data.InsertData.Ceventtype, data.InsertData.Cflowdir, data.InsertData.Cobject, data.InsertData.Coperation,
15			data.InsertData.Csrcip, data.InsertData.Csrclogtype, data.InsertData.LogypeTag, data.InsertData.Manufacturer, data.InsertData.Reason,
16			data.InsertData.Iappprotocol, data.InsertData.Icollecttype, data.InsertData.Idstport, data.InsertData.Ieventlevel, data.InsertData.Imergecount,
17			data.InsertData.Iprotocol, data.InsertData.Isrcport, data.InsertData.Lduration, data.InsertData.Lrecivepack, data.InsertData.Lsendpack,
18			data.InsertData.RequestBodyLen, data.InsertData.ResponseBodyLen, data.InsertData.Lendtime, data.InsertData.Lid, data.InsertData.Loccurtime,
19			data.InsertData.Lrecepttime, data.InsertData.Lstartime)
20		if err != nil {
21			log.Fatal(err)
22		}
23	}
24	e := time.Now().UnixNano()
25	tx.Commit()
26	stmt.Close()
27	end := time.Now().UnixNano()
28	fmt.Printf("记录数: %d, 耗时: %d, 循环准备耗时: %d, 速率: %d(条/秒) \n", batch, (end-start)/1e6, (e-s)/1e6, int64(batch*1000000*1000)/(end-start))





1	conn, err := sqlx.Connect("clickhouse", fmt.Sprintf("tcp://%s?username=%s&password=%s&database=%s&debug=false", *host, *user, *pass, *db))
2	if err != nil {
3		panic(any(err))
4	}



gorm是一个应用广泛的golang orm框架,因其功能全,支持原生sql,文档健全而备受好评,接下来我们看一下gorm在ck写入方面的性能如何。gorm的连接和写入也非常简单和暴力,就一个Creat方法就搞定,对于开发人员来说,更加喜欢这种简洁只记得方式。

1	dsn := fmt.Sprintf("clickhouse://%s:%s@%s/%s", *user, *pass, *host, *db)
2	db, err := gorm.Open(clickhouse.Open(dsn), &gorm.Config{})
3	if err != nil {
4		panic(any(err))
5	}
 1func insert(db *gorm.DB, batch int) {
 2	start := time.Now().UnixNano()
 3	result := make([]data.Data, 0, batch)
 4	for i := 0; i < 100000; i++ {
 5		result = append(result, data.InsertData)
 6	}
 7	db.Table("event").Create(&result)
 8	end := time.Now().UnixNano()
 9	fmt.Printf("记录数: %d, 耗时: %d, 速率: %d(条/秒) \n", 100000, (end-start)/1e6, int64(100000*1000000*1000)/(end-start))




clickhouse-go ch-go database/sql Sql grom
6.5w/s (column) 6.7w/s 5.5w/s 6w/s 4w/s


