Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions cmd/cql-adapter/storage/sqlite3.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"math/rand"
"os"
"path/filepath"

// Import sqlite3 manually.
_ "github.com/CovenantSQL/go-sqlite3-encrypt"
)
Expand Down
1 change: 1 addition & 0 deletions cmd/cql-faucet/persistence.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/CovenantSQL/CovenantSQL/conf"
"github.com/CovenantSQL/CovenantSQL/utils/log"
uuid "github.com/satori/go.uuid"

// Load sqlite3 database driver.
_ "github.com/CovenantSQL/go-sqlite3-encrypt"
)
Expand Down
1 change: 1 addition & 0 deletions cmd/cql-minerd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"os"
"os/signal"
"runtime"

//"runtime/trace"
"syscall"
"time"
Expand Down
1 change: 1 addition & 0 deletions crypto/hash/hashfuncs.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package hash
import (
"encoding/binary"
"hash/fnv"

// "crypto/sha256" benchmark is at least 10% faster on
// i7-4870HQ CPU @ 2.50GHz than "github.com/minio/sha256-simd"
"crypto/sha256"
Expand Down
18 changes: 10 additions & 8 deletions rpc/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,29 +102,30 @@ func toSession(id proto.NodeID, conn net.Conn) (sess *Session, err error) {
func (p *SessionPool) LoadOrStore(id proto.NodeID, newSess *Session) (sess *Session, loaded bool) {
// NO Blocking operation in this function
p.Lock()
defer p.Unlock()
sess, exist := p.sessions[id]
if exist {
p.Unlock()
log.WithField("node", id).Debug("load session for target node")
loaded = true
} else {
sess = newSess
p.sessions[id] = newSess
p.Unlock()
sess = newSess
}
return
}

func (p *SessionPool) getSessionFromPool(id proto.NodeID) (sess *Session, ok bool) {
p.RLock()
defer p.RUnlock()
sess, ok = p.sessions[id]
return
}

// Get returns existing session to the node, if not exist try best to create one
func (p *SessionPool) Get(id proto.NodeID) (conn net.Conn, err error) {
// first try to get one session from pool
p.Lock()
cachedConn, ok := p.getSessionFromPool(id)
p.Unlock()
if ok {
conn, err = cachedConn.Sess.OpenStream()
if err == nil {
Expand Down Expand Up @@ -168,14 +169,15 @@ func (p *SessionPool) Set(id proto.NodeID, conn net.Conn) (exist bool) {

// Remove the node sessions in the pool
func (p *SessionPool) Remove(id proto.NodeID) {
p.Lock()
sess, ok := p.getSessionFromPool(id)
if ok {
delete(p.sessions, id)
p.Unlock()
sess.Close()
} else {
p.Unlock()
}

p.Lock()
defer p.Unlock()
delete(p.sessions, id)
}

// Close closes all sessions in the pool
Expand Down
71 changes: 41 additions & 30 deletions rpc/pool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,12 @@ import (
"sync"
"testing"

. "github.com/smartystreets/goconvey/convey"
mux "github.com/xtaci/smux"

"github.com/CovenantSQL/CovenantSQL/proto"
"github.com/CovenantSQL/CovenantSQL/utils"
"github.com/CovenantSQL/CovenantSQL/utils/log"
. "github.com/smartystreets/goconvey/convey"
mux "github.com/xtaci/smux"
)

const (
Expand All @@ -44,12 +45,10 @@ var (

var FJ = filepath.Join

func server(c C, localAddr string, wg *sync.WaitGroup, p *SessionPool, n int) error {
func server(c C, localAddr string, n int) error {
// Accept a TCP connection
listener, err := net.Listen("tcp", localAddr)
wg.Add(1)
go func() {
defer wg.Done()
conn, err := listener.Accept()
c.So(err, ShouldBeNil)

Expand All @@ -59,21 +58,22 @@ func server(c C, localAddr string, wg *sync.WaitGroup, p *SessionPool, n int) er
c.So(err, ShouldBeNil)

for i := 0; i < concurrency; i++ {
wg.Add(1)
go func(i int, c2 C) {
// Accept a stream
//c2.So(err, ShouldBeNil)
// Stream implements net.Conn
// Listen for a message
//c2.So(string(buf1), ShouldEqual, "ping")
defer wg.Done()
log.Println("accepting stream")
stream, err := session.AcceptStream()
if err == nil {
buf1 := make([]byte, 4)
for i := 0; i < n; i++ {
stream.Read(buf1)
c2.So(string(buf1), ShouldEqual, "ping")
for i := 0; i < n; {
n, err := stream.Read(buf1)
if n == 4 && err == nil {
i++
c2.So(string(buf1), ShouldEqual, "ping")
}
}
log.Debugf("buf#%d read done", i)
}
Expand All @@ -93,18 +93,21 @@ func BenchmarkSessionPool_Get(b *testing.B) {

wg := &sync.WaitGroup{}

server(c, localAddr, wg, p, b.N)
server(c, localAddr, b.N)
b.ResetTimer()
wg.Add(concurrency)
for i := 0; i < concurrency; i++ {
wg.Add(1)
go func(c2 C, n int) {
// Open a new stream
// Stream implements net.Conn
defer wg.Done()
stream, err := p.Get(proto.NodeID(localAddr))
c2.So(err, ShouldBeNil)
for i := 0; i < n; i++ {
_, err = stream.Write([]byte("ping"))
for i := 0; i < n; {
n, err := stream.Write([]byte("ping"))
if n == 4 && err == nil {
i++
}
}
}(c, b.N)
}
Expand All @@ -122,9 +125,11 @@ func TestNewSessionPool(t *testing.T) {

wg := &sync.WaitGroup{}

server(c, localAddr, wg, p, packetCount)
server(c, localAddr, packetCount)
p.Get(proto.NodeID(localAddr))

wg.Add(concurrency)
for i := 0; i < concurrency; i++ {
wg.Add(1)
go func(c2 C, n int) {
// Open a new stream
// Stream implements net.Conn
Expand All @@ -135,26 +140,29 @@ func TestNewSessionPool(t *testing.T) {
return
}
c2.So(err, ShouldBeNil)
for i := 0; i < n; i++ {
_, err = stream.Write([]byte("ping"))
for i := 0; i < n; {
n, err := stream.Write([]byte("ping"))
if n == 4 && err == nil {
i++
}
}
}(c, packetCount)
}

wg.Wait()
c.So(p.Len(), ShouldEqual, 1)
So(p.Len(), ShouldEqual, 1)

wg2 := &sync.WaitGroup{}
server(c, localAddr2, wg2, p, packetCount)
server(c, localAddr2, packetCount)
conn, _ := net.Dial("tcp", localAddr2)
exists := p.Set(proto.NodeID(localAddr2), conn)
c.So(exists, ShouldBeFalse)
So(exists, ShouldBeFalse)
exists = p.Set(proto.NodeID(localAddr2), conn)
c.So(exists, ShouldBeTrue)
c.So(p.Len(), ShouldEqual, 2)
So(exists, ShouldBeTrue)
So(p.Len(), ShouldEqual, 2)

wg2 := &sync.WaitGroup{}
wg2.Add(concurrency)
for i := 0; i < concurrency; i++ {
wg2.Add(1)
go func(c2 C, n int) {
// Open a new stream
// Stream implements net.Conn
Expand All @@ -165,20 +173,23 @@ func TestNewSessionPool(t *testing.T) {
return
}
c2.So(err, ShouldBeNil)
for i := 0; i < n; i++ {
_, err = stream.Write([]byte("ping"))
for i := 0; i < n; {
n, err := stream.Write([]byte("ping"))
if n == 4 && err == nil {
i++
}
}
}(c, packetCount)
}

wg2.Wait()
c.So(p.Len(), ShouldEqual, 2)
So(p.Len(), ShouldEqual, 2)

p.Remove(proto.NodeID(localAddr2))
c.So(p.Len(), ShouldEqual, 1)
So(p.Len(), ShouldEqual, 1)

p.Close()
c.So(p.Len(), ShouldEqual, 0)
So(p.Len(), ShouldEqual, 0)

})

Expand Down
1 change: 1 addition & 0 deletions storage/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (

"github.com/CovenantSQL/CovenantSQL/twopc"
"github.com/CovenantSQL/CovenantSQL/utils/log"

// Register CovenantSQL/go-sqlite3-encrypt engine.
_ "github.com/CovenantSQL/go-sqlite3-encrypt"
)
Expand Down
14 changes: 7 additions & 7 deletions test/service/node_c/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ WorkingRoot: "./"
PubKeyStoreFile: "public.keystore"
PrivateKeyFile: "private.key"
DHTFileName: "dht.db"
ListenAddr: "172.254.1.4:4661"
ListenAddr: "127.0.0.1:4661"
ThisNodeID: "00000f3b43288fe99831eb533ab77ec455d13e11fc38ec35a42d4edd17aa320d"
ValidDNSKeys:
koPbw9wmYZ7ggcjnQ6ayHyhHaDNMYELKTqT+qRGrZpWSccr/lBcrm10Z1PuQHB3Azhii+sb0PYFkH1ruxLhe5g==: cloudflare.com
Expand Down Expand Up @@ -41,7 +41,7 @@ KnownNodes:
b: 0
c: 0
d: 0
Addr: 172.254.1.2:4661
Addr: 127.0.0.1:11099 #172.254.1.2:4661
PublicKey: "02c76216704d797c64c58bc11519fb68582e8e63de7e5b3b2dbbbe8733efe5fd24"
Role: Leader
- ID: 00000381d46fd6cf7742d7fb94e2422033af989c0e348b5781b3219599a3af35
Expand All @@ -50,7 +50,7 @@ KnownNodes:
b: 0
c: 0
d: 2305843009893772025
Addr: 172.254.1.3:4661
Addr: 127.0.0.1:11100 #172.254.1.3:4661
PublicKey: "02c76216704d797c64c58bc11519fb68582e8e63de7e5b3b2dbbbe8733efe5fd24"
Role: Follower
- ID: 000000172580063ded88e010556b0aca2851265be8845b1ef397e8fce6ab5582
Expand All @@ -59,7 +59,7 @@ KnownNodes:
b: 0
c: 0
d: 2305843012544226372
Addr: 172.254.1.4:4661
Addr: 127.0.0.1:11101 #172.254.1.4:4661
PublicKey: "02c76216704d797c64c58bc11519fb68582e8e63de7e5b3b2dbbbe8733efe5fd24"
Role: Follower
- ID: 00000f3b43288fe99831eb533ab77ec455d13e11fc38ec35a42d4edd17aa320d
Expand All @@ -77,7 +77,7 @@ KnownNodes:
b: 0
c: 0
d: 3104982049
Addr: 172.254.1.5:4661
Addr: 127.0.0.1:11102 #172.254.1.5:4661
PublicKey: 0367aa51809a7c1dc0f82c02452fec9557b3e1d10ce7c919d8e73d90048df86d20
Role: Miner
- ID: 000005f4f22c06f76c43c4f48d5a7ec1309cc94030cbf9ebae814172884ac8b5
Expand All @@ -86,7 +86,7 @@ KnownNodes:
b: 0
c: 0
d: 2305843010430351476
Addr: 172.254.1.6:4661
Addr: 127.0.0.1:11103 #172.254.1.6:4661
PublicKey: 02914bca0806f040dd842207c44474ab41ecd29deee7f2d355789c5c78d448ca16
Role: Miner
- ID: 000003f49592f83d0473bddb70d543f1096b4ffed5e5f942a3117e256b7052b8
Expand All @@ -95,7 +95,7 @@ KnownNodes:
b: 0
c: 0
d: 13835058056920509601
Addr: 172.254.1.7:4661
Addr: 127.0.0.1:11104 #172.254.1.7:4661
PublicKey: 03ae859eac5b72ee428c7a85f10b2ce748d9de5e480aefbb70f6597dfa8b2175e5
Role: Miner
Adapter:
Expand Down
1 change: 1 addition & 0 deletions xenomint/xxx_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"os"
"path"
"sync/atomic"

//"runtime/trace"
"sync"
"syscall"
Expand Down