Skip to content

Commit 399a1ed

Browse files
committed
First commit
1 parent fb1c84a commit 399a1ed

File tree

6 files changed

+358
-3
lines changed

6 files changed

+358
-3
lines changed

.gitignore

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,5 +11,5 @@
1111
# Output of the go coverage tool, specifically when used with LiteIDE
1212
*.out
1313

14-
# Dependency directories (remove the comment below to include it)
15-
# vendor/
14+
# Dependency directories
15+
vendor/

README.md

Lines changed: 87 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1,87 @@
1-
# clickhouse-tickstore
1+
# clickhouse-tickstore
2+
3+
Go package to store real time [streaming websocket data](https://kite.trade/docs/connect/v3/websocket/) in [clickhouse](https://clickhouse.tech/) using queuing and bulk insert based on go-routine and channels.
4+
5+
## Installation
6+
7+
```
8+
go get -u github.com/ranjanrak/clickhouse-tickstore
9+
```
10+
11+
## Usage
12+
13+
```go
14+
package main
15+
16+
import (
17+
tickstore "github.com/ranjanrak/clickhouse-tickstore"
18+
)
19+
func main() {
20+
// Create new ticker instance
21+
tickClient := tickstore.New(tickstore.ClientParam{
22+
// Send DSN as per your clickhouse DB setup.
23+
// visit https://github.com/ClickHouse/clickhouse-go#dsn to know more
24+
DBSource: "",
25+
ApiKey: "your_api_key",
26+
AccessToken: "your_access_token",
27+
TokenList: []uint32{633601, 895745, 1723649, 3050241, 975873, 969473, 3721473, 738561, 969473},
28+
DumpSize: 5000,
29+
})
30+
// Start the ticker instance
31+
tickClient.StartTicker()
32+
}
33+
34+
```
35+
36+
## Example
37+
38+
```sql
39+
SELECT *
40+
FROM tickdata
41+
FINAL
42+
WHERE (instrument_token = 633601) AND
43+
(timestamp >= toDateTime('2022-04-22 13:23:00', 'Asia/Calcutta')) AND
44+
(timestamp <= toDateTime('2022-04-22 13:25:00', 'Asia/Calcutta'))
45+
ORDER BY timestamp ASC
46+
```
47+
48+
```sql
49+
50+
Query id: 8e356516-107c-4012-948b-df90e49e9906
51+
52+
┌─instrument_token─┬───────────timestamp─┬──price─┐
53+
6336012022-04-22 13:23:00174.65
54+
6336012022-04-22 13:23:01174.7
55+
6336012022-04-22 13:23:02174.65
56+
6336012022-04-22 13:23:04174.7
57+
6336012022-04-22 13:23:05174.65
58+
6336012022-04-22 13:23:06174.7
59+
6336012022-04-22 13:23:08174.7
60+
6336012022-04-22 13:23:09174.7
61+
6336012022-04-22 13:23:10174.7
62+
6336012022-04-22 13:23:13174.7
63+
6336012022-04-22 13:23:14174.65
64+
6336012022-04-22 13:23:15174.65
65+
6336012022-04-22 13:23:16174.7
66+
6336012022-04-22 13:23:17174.65
67+
6336012022-04-22 13:23:19174.7
68+
6336012022-04-22 13:23:21174.65
69+
6336012022-04-22 13:23:24174.65
70+
6336012022-04-22 13:23:25174.7
71+
6336012022-04-22 13:23:26174.7
72+
6336012022-04-22 13:23:27174.65
73+
6336012022-04-22 13:23:28174.65
74+
6336012022-04-22 13:23:29174.7
75+
6336012022-04-22 13:23:31174.7
76+
6336012022-04-22 13:23:32174.7
77+
6336012022-04-22 13:23:33174.7
78+
6336012022-04-22 13:23:35174.7
79+
6336012022-04-22 13:23:36174.7
80+
| ...... | .................. | ...... |
81+
82+
84 rows in set. Elapsed: 0.006 sec. Processed 8.19 thousand rows, 98.30 KB (1.28 million rows/s., 15.37 MB/s.)
83+
```
84+
85+
### To-do
86+
87+
Add multiple interval(minutes, hours, etc) candle formation query.

config.go

Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,68 @@
1+
package tickstore
2+
3+
import (
4+
"database/sql"
5+
"fmt"
6+
"log"
7+
8+
"github.com/ClickHouse/clickhouse-go"
9+
)
10+
11+
// Client represents clickhouse DB client connection
12+
type Client struct {
13+
dbClient *sql.DB
14+
apiKey string
15+
accessToken string
16+
tokenList []uint32
17+
dumpSize int
18+
}
19+
20+
// ClientParam represents interface to connect clickhouse and kite ticker stream
21+
type ClientParam struct {
22+
DBSource string
23+
ApiKey string
24+
AccessToken string
25+
TokenList []uint32
26+
DumpSize int
27+
}
28+
29+
// Creates a new DB connection client
30+
func New(userParam ClientParam) *Client {
31+
if userParam.DBSource == "" {
32+
userParam.DBSource = "tcp://127.0.0.1:9000?debug=true"
33+
}
34+
connect, err := sql.Open("clickhouse", userParam.DBSource)
35+
if err = connect.Ping(); err != nil {
36+
if exception, ok := err.(*clickhouse.Exception); ok {
37+
fmt.Printf("[%d] %s \n%s\n", exception.Code, exception.Message, exception.StackTrace)
38+
} else {
39+
fmt.Println(err)
40+
}
41+
}
42+
// Set default dump size to 5 times of the token list length
43+
if userParam.DumpSize == 0 {
44+
userParam.DumpSize = len(userParam.TokenList) * 5
45+
}
46+
// Create tickdata table for fresh instance
47+
// Replacingmergetree engine removes all duplicate entries with the same timestamp and price
48+
// As those won't be useful for candle creation
49+
_, err = connect.Exec(`
50+
CREATE TABLE IF NOT EXISTS tickdata (
51+
instrument_token UInt32,
52+
timestamp DateTime('Asia/Calcutta'),
53+
price FLOAT()
54+
) engine=ReplacingMergeTree()
55+
ORDER BY (timestamp, instrument_token, price)
56+
`)
57+
if err != nil {
58+
log.Fatal(err)
59+
}
60+
61+
return &Client{
62+
dbClient: connect,
63+
apiKey: userParam.ApiKey,
64+
accessToken: userParam.AccessToken,
65+
tokenList: userParam.TokenList,
66+
dumpSize: userParam.DumpSize,
67+
}
68+
}

go.mod

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
module github.com/ranjanrak/clickhouse-tickstore
2+
3+
go 1.16
4+
5+
require (
6+
github.com/ClickHouse/clickhouse-go v1.5.4
7+
github.com/zerodha/gokiteconnect/v4 v4.0.7
8+
)

go.sum

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
github.com/ClickHouse/clickhouse-go v1.5.4 h1:cKjXeYLNWVJIx2J1K6H2CqyRmfwVJVY1OV1coaaFcI0=
2+
github.com/ClickHouse/clickhouse-go v1.5.4/go.mod h1:EaI/sW7Azgz9UATzd5ZdZHRUhHgv5+JMS9NSr2smCJI=
3+
github.com/bkaradzic/go-lz4 v1.0.0 h1:RXc4wYsyz985CkXXeX04y4VnZFGG8Rd43pRaHsOXAKk=
4+
github.com/bkaradzic/go-lz4 v1.0.0/go.mod h1:0YdlkowM3VswSROI7qDxhRvJ3sLhlFrRRwjwegp5jy4=
5+
github.com/cloudflare/golz4 v0.0.0-20150217214814-ef862a3cdc58 h1:F1EaeKL/ta07PY/k9Os/UFtwERei2/XzGemhpGnBKNg=
6+
github.com/cloudflare/golz4 v0.0.0-20150217214814-ef862a3cdc58/go.mod h1:EOBUe0h4xcZ5GoxqC5SDxFQ8gwyZPKQoEzownBlhI80=
7+
github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8=
8+
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
9+
github.com/go-sql-driver/mysql v1.4.0/go.mod h1:zAC/RDZ24gD3HViQzih4MyKcchzm+sOG5ZlKdlhCg5w=
10+
github.com/gocarina/gocsv v0.0.0-20180809181117-b8c38cb1ba36 h1:IlBbYij72r3CoD3fKTbP5jD0NJjrvemKsaxkW/QUdGE=
11+
github.com/gocarina/gocsv v0.0.0-20180809181117-b8c38cb1ba36/go.mod h1:/oj50ZdPq/cUjA02lMZhijk5kR31SEydKyqah1OgBuo=
12+
github.com/google/go-querystring v1.0.0 h1:Xkwi/a1rcvNg1PPYe5vI8GbeBY/jrVuDX5ASuANWTrk=
13+
github.com/google/go-querystring v1.0.0/go.mod h1:odCYkC5MyYFN7vkCjXpyrEuKhc/BUO6wN/zVPAxq5ck=
14+
github.com/gorilla/websocket v1.4.2 h1:+/TMaTYc4QFitKJxsQ7Yye35DkWvkdLcvGKqM+x0Ufc=
15+
github.com/gorilla/websocket v1.4.2/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
16+
github.com/jmoiron/sqlx v1.2.0/go.mod h1:1FEQNm3xlJgrMD+FBdI9+xvCksHtbpVBBw5dYhBSsks=
17+
github.com/lib/pq v1.0.0/go.mod h1:5WUZQaWbwv1U+lTReE5YruASi9Al49XbQIvNi/34Woo=
18+
github.com/mattn/go-sqlite3 v1.9.0/go.mod h1:FPy6KqzDD04eiIsT53CuJW3U88zkxoIYsOqkbpncsNc=
19+
github.com/pierrec/lz4 v2.0.5+incompatible h1:2xWsjqPFWcplujydGg4WmhC/6fZqK42wMM8aXeqhl0I=
20+
github.com/pierrec/lz4 v2.0.5+incompatible/go.mod h1:pdkljMzZIN41W+lC3N2tnIh5sFi+IEE17M5jbnwPHcY=
21+
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
22+
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
23+
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
24+
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
25+
github.com/stretchr/testify v1.7.0 h1:nwc3DEeHmmLAfoZucVR881uASk0Mfjw8xYJ99tb5CcY=
26+
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
27+
github.com/zerodha/gokiteconnect/v4 v4.0.7 h1:2mGOGQOUu6/l4jr++WuRni9uGHViBFMAQJD4W1DuPZA=
28+
github.com/zerodha/gokiteconnect/v4 v4.0.7/go.mod h1:ym/xXldKyPzkpN7JZpg6Cbjs+nGfqvMC5X9BsHEil9s=
29+
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
30+
gopkg.in/jarcoal/httpmock.v1 v1.0.0-20180719183105-8007e27cdb32 h1:30DLrQoRqdUHslVMzxuKUnY4GKJGk1/FJtKy3yx4TKE=
31+
gopkg.in/jarcoal/httpmock.v1 v1.0.0-20180719183105-8007e27cdb32/go.mod h1:d3R+NllX3X5e0zlG1Rful3uLvsGC/Q3OHut5464DEQw=
32+
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c h1:dUUwHk2QECo/6vqA44rthZ8ie2QXMNeKRTHCNY2nXvo=
33+
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=

tick_channel.go

Lines changed: 160 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,160 @@
1+
package tickstore
2+
3+
import (
4+
"database/sql"
5+
"fmt"
6+
"log"
7+
"sync"
8+
"time"
9+
10+
kitemodels "github.com/zerodha/gokiteconnect/v4/models"
11+
kiteticker "github.com/zerodha/gokiteconnect/v4/ticker"
12+
)
13+
14+
// tickData is struct to store streaming tick data in clickhouse
15+
type tickData struct {
16+
Token uint32
17+
TimeStamp time.Time
18+
LastPrice float64
19+
}
20+
21+
var (
22+
dbConnect *sql.DB
23+
ticker *kiteticker.Ticker
24+
wg sync.WaitGroup
25+
isBulkReady sync.Mutex
26+
dumpSize int
27+
tokens []uint32
28+
pipeline chan tickData
29+
)
30+
31+
// Triggered when any error is raised
32+
func onError(err error) {
33+
fmt.Println("Error: ", err)
34+
}
35+
36+
// Triggered when websocket connection is closed
37+
func onClose(code int, reason string) {
38+
fmt.Println("Close: ", code, reason)
39+
}
40+
41+
// Triggered when connection is established and ready to send and accept data
42+
func onConnect() {
43+
fmt.Println("Connected")
44+
err := ticker.Subscribe(tokens)
45+
if err != nil {
46+
fmt.Println("err: ", err)
47+
}
48+
// Set subscription mode for given list of tokens
49+
err = ticker.SetMode(kiteticker.ModeFull, tokens)
50+
if err != nil {
51+
fmt.Println("err: ", err)
52+
}
53+
}
54+
55+
// Triggered when tick is received
56+
func onTick(tick kitemodels.Tick) {
57+
wg.Add(1)
58+
//go routine that adds tick to channel
59+
go passChannel(tick, pipeline, &wg)
60+
// non-blocking the onTick callback
61+
wg.Wait()
62+
}
63+
64+
// Triggered when reconnection is attempted which is enabled by default
65+
func onReconnect(attempt int, delay time.Duration) {
66+
fmt.Printf("Reconnect attempt %d in %fs\n", attempt, delay.Seconds())
67+
}
68+
69+
// Triggered when maximum number of reconnect attempt is made and the program is terminated
70+
func onNoReconnect(attempt int) {
71+
fmt.Printf("Maximum no of reconnect attempt reached: %d", attempt)
72+
}
73+
74+
// Insert tick data to channel
75+
func passChannel(tick kitemodels.Tick, pipeline chan tickData, wg *sync.WaitGroup) {
76+
// Send {token, timestamp, lastprice} struct to channel
77+
pipeline <- tickData{tick.InstrumentToken, tick.Timestamp.Time, tick.LastPrice}
78+
wg.Done()
79+
isBulkReady.Lock()
80+
// Send for bulk insertion only if channel msg length is greater than defined dumpSize
81+
if len(pipeline) >= dumpSize {
82+
createBulkDump()
83+
}
84+
isBulkReady.Unlock()
85+
}
86+
87+
// Group all available channel messages and bulk insert to clickhouse
88+
// At periodic interval depending on users input channel buffer size
89+
func createBulkDump() {
90+
s := make([]tickData, 0)
91+
for i := range pipeline {
92+
// create array of ticks to do bulk insert
93+
s = append(s, i)
94+
if len(s) > dumpSize {
95+
// Send message array for the bulk dump
96+
insertDB(s)
97+
// Remove all the element from the array that is dumped to DB
98+
s = nil
99+
}
100+
}
101+
}
102+
103+
// Insert tick data to clickhouse periodically
104+
func insertDB(tickArray []tickData) {
105+
tx, err := dbConnect.Begin()
106+
if err != nil {
107+
log.Fatal(err)
108+
}
109+
110+
sqlstmt := "INSERT INTO tickdata (instrument_token, timestamp, price) VALUES (?, ?, ?)"
111+
112+
stmt, err := tx.Prepare(sqlstmt)
113+
if err != nil {
114+
log.Fatal(err)
115+
}
116+
117+
// Bulk write
118+
for _, tick := range tickArray {
119+
if _, err := stmt.Exec(
120+
tick.Token,
121+
tick.TimeStamp,
122+
tick.LastPrice,
123+
); err != nil {
124+
log.Fatal(err)
125+
}
126+
}
127+
128+
if err := tx.Commit(); err != nil {
129+
log.Fatal(err)
130+
}
131+
}
132+
133+
// Start ticker stream
134+
func (c *Client) StartTicker() {
135+
136+
dbConnect = c.dbClient
137+
138+
dumpSize = c.dumpSize
139+
140+
tokens = c.tokenList
141+
142+
// Channel to store all upcoming streams of ticks
143+
pipeline = make(chan tickData, dumpSize)
144+
145+
// Create new Kite ticker instance
146+
ticker = kiteticker.New(c.apiKey, c.accessToken)
147+
148+
ticker.SetReconnectMaxRetries(5)
149+
150+
// Assign callbacks
151+
ticker.OnError(onError)
152+
ticker.OnClose(onClose)
153+
ticker.OnConnect(onConnect)
154+
ticker.OnReconnect(onReconnect)
155+
ticker.OnNoReconnect(onNoReconnect)
156+
ticker.OnTick(onTick)
157+
158+
// Start the connection
159+
ticker.Serve()
160+
}

0 commit comments

Comments
 (0)