Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
42 commits
Select commit Hold shift + click to select a range
cae71f2
Fix log field name typo
leventeliu Jan 10, 2019
40ca3c3
Fix comment typo
leventeliu Jan 10, 2019
0105c4f
Complete kayak trace logs
leventeliu Jan 10, 2019
133add2
Add backward compatible trace lib
Jan 11, 2019
656cece
Add timer feature for performance analysis
Jan 11, 2019
ecd7e7d
Add comment to runtime/trace wrapper
Jan 11, 2019
db099cf
Add trace to kayak
Jan 11, 2019
a5b3d5f
Add more trace points
Jan 11, 2019
3c5d3b5
Fix rpc trace point and split followerApply task types
Jan 11, 2019
287a5b7
Calc follower real commitcycle cost
Jan 13, 2019
eb50216
Separate commit wait and commit cycle cost
Jan 13, 2019
7e4a0c0
Fix waitForLastCommit region overlapping
Jan 13, 2019
6d4f87b
Fix rollback nil pointer bug
Jan 13, 2019
59dc00b
Fix shutdown block bug
Jan 13, 2019
951e879
Fix chain produce block bug
Jan 14, 2019
5752a42
Fix chainbus service code typo
Jan 14, 2019
57ae1fe
Increase provide service interval and send provide service request af…
Jan 14, 2019
5014de4
Add kayak fetch log recover and make zero commitThreshold configurable
Jan 16, 2019
649947e
Adjust log wait timeout
Jan 16, 2019
0a4f691
Use direct function call instead of rpc call in kayak test
Jan 17, 2019
78f406c
Add rpc parallel request test
Jan 17, 2019
4bd20e8
Fix shared secret cache bug
Jan 17, 2019
ba2231c
Use back the fake pipe rpc feature
Jan 18, 2019
2450952
Add more metric tests and fix race and test case bug
Jan 21, 2019
80c8f38
Disable manual trace in runtime test
Jan 21, 2019
fb4d453
Even more traces
Jan 22, 2019
af4da18
Use client pool and connection pool for multiple physical connections
Jan 24, 2019
20f4f85
Drop off nested signature in embedded request/response/ack structure
Jan 28, 2019
26546d2
Remove response signature check
Jan 29, 2019
cb3a864
Remove useless traces
Jan 29, 2019
e7153ca
Disable trace on pprof
Jan 29, 2019
9c90ce6
Golint issues
Jan 29, 2019
2748d31
Move magic number to conf/limit
Jan 29, 2019
17ab353
Remove response verify test case
Jan 29, 2019
a00c0e7
Parallel ci test
Jan 29, 2019
134bf79
Disable coverpkg all
Jan 29, 2019
aadc90d
Cover special packages for api module
Jan 29, 2019
149ce72
Update per client pool physical connection limits
Jan 29, 2019
bef3bf3
Separate read/write stream iv in etls cryptoConn
Jan 30, 2019
8a4e350
Rollback rpc client pool
Jan 30, 2019
aa562ba
Remove client pool caller
Jan 30, 2019
19eb4fe
Update response structure, replace signature/key with simple hash
Jan 30, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 3 additions & 18 deletions alltest.sh
Original file line number Diff line number Diff line change
Expand Up @@ -4,30 +4,15 @@ set -o errexit
set -o pipefail
set -o nounset

test::package() {
local package="${1:-notset}"

if [[ "${package}" == "notset" ]]; then
&>2 echo "empty package name"
exit 1
fi

local coverage_file="${package//\//.}.cover.out"
echo "[TEST] package=${package}, coverage=${coverage_file}"
go test -tags "$UNITTESTTAGS" -race -failfast -parallel 16 -cpu 16 -coverpkg="github.com/CovenantSQL/CovenantSQL/..." -coverprofile "${coverage_file}" "${package}"
}

main() {
make clean
make -j6 bp miner observer

# test package by package
for package in $(go list ./... | grep -v "/vendor/"); do
test::package "${package}"
done
go test -tags "$UNITTESTTAGS" -race -failfast -parallel 16 -cpu 16 -coverprofile main.cover.out $(go list ./... | grep -v CovenantSQL/api)
go test -tags "$UNITTESTTAGS" -race -failfast -parallel 16 -cpu 16 -coverpkg ./api/...,./rpc/jsonrpc -coverprofile api.cover.out ./api/...

set -x
gocovmerge *.cover.out $(find cmd -name "*.cover.out") | grep -F -v '_gen.go' > coverage.txt && rm -f *.cover.out
gocovmerge main.cover.out api.cover.out $(find cmd -name "*.cover.out") | grep -F -v '_gen.go' > coverage.txt && rm -f *.cover.out
bash <(curl -s https://codecov.io/bash)

# some benchmarks
Expand Down
48 changes: 26 additions & 22 deletions client/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"github.com/CovenantSQL/CovenantSQL/rpc"
"github.com/CovenantSQL/CovenantSQL/types"
"github.com/CovenantSQL/CovenantSQL/utils/log"
"github.com/CovenantSQL/CovenantSQL/utils/trace"
"github.com/pkg/errors"
)

Expand Down Expand Up @@ -150,7 +151,7 @@ ackWorkerLoop:
oneTime.Do(func() {
pc = rpc.NewPersistentCaller(c.pCaller.TargetID)
})
if err = ack.Sign(c.parent.privKey, false); err != nil {
if err = ack.Sign(c.parent.privKey); err != nil {
log.WithField("target", pc.TargetID).WithError(err).Error("failed to sign ack")
continue
}
Expand All @@ -164,7 +165,7 @@ ackWorkerLoop:
}

if pc != nil {
pc.CloseStream()
pc.Close()
}

log.Debug("ack worker quiting")
Expand All @@ -173,7 +174,7 @@ ackWorkerLoop:
func (c *pconn) close() error {
c.stopAckWorkers()
if c.pCaller != nil {
c.pCaller.CloseStream()
c.pCaller.Close()
}
return nil
}
Expand Down Expand Up @@ -237,6 +238,8 @@ func (c *conn) PrepareContext(ctx context.Context, query string) (driver.Stmt, e

// ExecContext implements the driver.ExecerContext.ExecContext method.
func (c *conn) ExecContext(ctx context.Context, query string, args []driver.NamedValue) (result driver.Result, err error) {
defer trace.StartRegion(ctx, "dbExec").End()

if atomic.LoadInt32(&c.closed) != 0 {
err = driver.ErrBadConn
return
Expand All @@ -246,7 +249,7 @@ func (c *conn) ExecContext(ctx context.Context, query string, args []driver.Name
sq := convertQuery(query, args)

var affectedRows, lastInsertID int64
if affectedRows, lastInsertID, _, err = c.addQuery(types.WriteQuery, sq); err != nil {
if affectedRows, lastInsertID, _, err = c.addQuery(ctx, types.WriteQuery, sq); err != nil {
return
}

Expand All @@ -260,14 +263,16 @@ func (c *conn) ExecContext(ctx context.Context, query string, args []driver.Name

// QueryContext implements the driver.QueryerContext.QueryContext method.
func (c *conn) QueryContext(ctx context.Context, query string, args []driver.NamedValue) (rows driver.Rows, err error) {
defer trace.StartRegion(ctx, "dbQuery").End()

if atomic.LoadInt32(&c.closed) != 0 {
err = driver.ErrBadConn
return
}

// TODO(xq262144): make use of the ctx argument
sq := convertQuery(query, args)
_, _, rows, err = c.addQuery(types.ReadQuery, sq)
_, _, rows, err = c.addQuery(ctx, types.ReadQuery, sq)

return
}
Expand All @@ -289,7 +294,7 @@ func (c *conn) Commit() (err error) {

if len(c.queries) > 0 {
// send query
if _, _, _, err = c.sendQuery(types.WriteQuery, c.queries); err != nil {
if _, _, _, err = c.sendQuery(context.Background(), types.WriteQuery, c.queries); err != nil {
return
}
}
Expand Down Expand Up @@ -319,7 +324,7 @@ func (c *conn) Rollback() error {
return nil
}

func (c *conn) addQuery(queryType types.QueryType, query *types.Query) (affectedRows int64, lastInsertID int64, rows driver.Rows, err error) {
func (c *conn) addQuery(ctx context.Context, queryType types.QueryType, query *types.Query) (affectedRows int64, lastInsertID int64, rows driver.Rows, err error) {
if c.inTransaction {
// check query type, enqueue query
if queryType == types.ReadQuery {
Expand All @@ -344,10 +349,10 @@ func (c *conn) addQuery(queryType types.QueryType, query *types.Query) (affected
"args": query.Args,
}).Debug("execute query")

return c.sendQuery(queryType, []types.Query{*query})
return c.sendQuery(ctx, queryType, []types.Query{*query})
}

func (c *conn) sendQuery(queryType types.QueryType, queries []types.Query) (affectedRows int64, lastInsertID int64, rows driver.Rows, err error) {
func (c *conn) sendQuery(ctx context.Context, queryType types.QueryType, queries []types.Query) (affectedRows int64, lastInsertID int64, rows driver.Rows, err error) {
var uc *pconn // peer connection used to execute the queries

uc = c.leader
Expand Down Expand Up @@ -399,11 +404,6 @@ func (c *conn) sendQuery(queryType types.QueryType, queries []types.Query) (affe
if err = uc.pCaller.Call(route.DBSQuery.String(), req, &response); err != nil {
return
}

// verify response
if err = response.Verify(); err != nil {
return
}
rows = newRows(&response)

if queryType == types.WriteQuery {
Expand All @@ -412,15 +412,19 @@ func (c *conn) sendQuery(queryType types.QueryType, queries []types.Query) (affe
}

// build ack
uc.ackCh <- &types.Ack{
Header: types.SignedAckHeader{
AckHeader: types.AckHeader{
Response: response.Header,
NodeID: c.localNodeID,
Timestamp: getLocalTime(),
func() {
defer trace.StartRegion(ctx, "ackEnqueue").End()
uc.ackCh <- &types.Ack{
Header: types.SignedAckHeader{
AckHeader: types.AckHeader{
Response: response.Header.ResponseHeader,
ResponseHash: response.Header.Hash(),
NodeID: c.localNodeID,
Timestamp: getLocalTime(),
},
},
},
}
}
}()

return
}
Expand Down
9 changes: 5 additions & 4 deletions cmd/cql-minerd/dbms.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,16 +38,17 @@ import (

var rootHash = hash.Hash{}

func startDBMS(server *rpc.Server) (dbms *worker.DBMS, err error) {
func startDBMS(server *rpc.Server, onCreateDB func()) (dbms *worker.DBMS, err error) {
if conf.GConf.Miner == nil {
err = errors.New("invalid database config")
return
}

cfg := &worker.DBMSConfig{
RootDir: conf.GConf.Miner.RootDir,
Server: server,
MaxReqTimeGap: conf.GConf.Miner.MaxReqTimeGap,
RootDir: conf.GConf.Miner.RootDir,
Server: server,
MaxReqTimeGap: conf.GConf.Miner.MaxReqTimeGap,
OnCreateDatabase: onCreateDB,
}

if dbms, err = worker.NewDBMS(cfg); err != nil {
Expand Down
15 changes: 12 additions & 3 deletions cmd/cql-minerd/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ import (
"github.com/CovenantSQL/CovenantSQL/types"
"github.com/CovenantSQL/CovenantSQL/utils"
"github.com/CovenantSQL/CovenantSQL/utils/log"
"github.com/CovenantSQL/CovenantSQL/utils/trace"
sqlite3 "github.com/CovenantSQL/go-sqlite3-encrypt"
. "github.com/smartystreets/goconvey/convey"
yaml "gopkg.in/yaml.v2"
Expand Down Expand Up @@ -647,20 +648,25 @@ func benchDB(b *testing.B, db *sql.DB, createDB bool) {
ii := atomic.AddInt64(&i, 1)
index := ROWSTART + ii
//start := time.Now()
_, err = db.Exec("INSERT INTO "+TABLENAME+" ( k, v1 ) VALUES"+

ctx, task := trace.NewTask(context.Background(), "BenchInsert")

_, err = db.ExecContext(ctx, "INSERT INTO "+TABLENAME+" ( k, v1 ) VALUES"+
"(?, ?)", index, ii,
)
//log.Warnf("insert index = %d %v", index, time.Since(start))
for err != nil && err.Error() == sqlite3.ErrBusy.Error() {
// retry forever
log.Warnf("index = %d retried", index)
_, err = db.Exec("INSERT INTO "+TABLENAME+" ( k, v1 ) VALUES"+
_, err = db.ExecContext(ctx, "INSERT INTO "+TABLENAME+" ( k, v1 ) VALUES"+
"(?, ?)", index, ii,
)
}
if err != nil {
b.Fatal(err)
}

task.End()
}
})
})
Expand Down Expand Up @@ -690,16 +696,19 @@ func benchDB(b *testing.B, db *sql.DB, createDB bool) {
} else { //has data before ROWSTART
index = rand.Int63n(count - 1)
}

ctx, task := trace.NewTask(context.Background(), "BenchSelect")
//log.Debugf("index = %d", index)
//start := time.Now()
row := db.QueryRow("SELECT v1 FROM "+TABLENAME+" WHERE k = ? LIMIT 1", index)
row := db.QueryRowContext(ctx, "SELECT v1 FROM "+TABLENAME+" WHERE k = ? LIMIT 1", index)
//log.Warnf("select index = %d %v", index, time.Since(start))
var result []byte
err = row.Scan(&result)
if err != nil || (len(result) == 0) {
log.Errorf("index = %d", index)
b.Fatal(err)
}
task.End()
}
})
})
Expand Down
48 changes: 24 additions & 24 deletions cmd/cql-minerd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,19 +26,17 @@ import (
"os"
"os/signal"
"runtime"

"github.com/CovenantSQL/CovenantSQL/metric"

//"runtime/trace"
"syscall"
"time"

"github.com/CovenantSQL/CovenantSQL/conf"
"github.com/CovenantSQL/CovenantSQL/crypto/asymmetric"
"github.com/CovenantSQL/CovenantSQL/crypto/kms"
"github.com/CovenantSQL/CovenantSQL/metric"
"github.com/CovenantSQL/CovenantSQL/rpc"
"github.com/CovenantSQL/CovenantSQL/utils"
"github.com/CovenantSQL/CovenantSQL/utils/log"
"github.com/CovenantSQL/CovenantSQL/utils/trace"
"github.com/CovenantSQL/CovenantSQL/worker"
graphite "github.com/cyberdelia/go-metrics-graphite"
metrics "github.com/rcrowley/go-metrics"
Expand Down Expand Up @@ -199,11 +197,11 @@ func main() {
}
}

// start prometheus collector
reg := metric.StartMetricCollector()

// start period provide service transaction generator
go func() {
// start prometheus collector
reg := metric.StartMetricCollector()

tick := time.NewTicker(conf.GConf.Miner.ProvideServiceInterval)
defer tick.Stop()

Expand All @@ -220,7 +218,9 @@ func main() {

// start dbms
var dbms *worker.DBMS
if dbms, err = startDBMS(server); err != nil {
if dbms, err = startDBMS(server, func() {
sendProvideService(reg)
}); err != nil {
log.WithError(err).Fatal("start dbms failed")
}

Expand Down Expand Up @@ -257,22 +257,22 @@ func main() {
go graphite.Graphite(metrics.DefaultRegistry, 5*time.Second, minerName, addr)
}

//if traceFile != "" {
// f, err := os.Create(traceFile)
// if err != nil {
// log.WithError(err).Fatal("failed to create trace output file")
// }
// defer func() {
// if err := f.Close(); err != nil {
// log.WithError(err).Fatal("failed to close trace file")
// }
// }()

// if err := trace.Start(f); err != nil {
// log.WithError(err).Fatal("failed to start trace")
// }
// defer trace.Stop()
//}
if traceFile != "" {
f, err := os.Create(traceFile)
if err != nil {
log.WithError(err).Fatal("failed to create trace output file")
}
defer func() {
if err := f.Close(); err != nil {
log.WithError(err).Fatal("failed to close trace file")
}
}()

if err := trace.Start(f); err != nil {
log.WithError(err).Fatal("failed to start trace")
}
defer trace.Stop()
}

<-signalCh
utils.StopProfile()
Expand Down
Loading