Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions cmd/cql-observer/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,16 @@ func newPaginationFromReq(r *http.Request) (op *paginationOps) {
return
}

func (a *explorerAPI) GetAllSubscriptions(rw http.ResponseWriter, r *http.Request) {
subscriptions, err := a.service.getAllSubscriptions()
if err != nil {
sendResponse(500, false, err, nil, rw)
return
}

sendResponse(200, true, "", subscriptions, rw)
}

func (a *explorerAPI) GetAck(rw http.ResponseWriter, r *http.Request) {
vars := mux.Vars(r)

Expand Down Expand Up @@ -677,6 +687,7 @@ func startAPI(service *Service, listenAddr string) (server *http.Server, err err
v3Router.HandleFunc("/count/{db}/{count:[0-9]+}", api.GetBlockByCountV3).Methods("GET")
v3Router.HandleFunc("/height/{db}/{height:[0-9]+}", api.GetBlockByHeightV3).Methods("GET")
v3Router.HandleFunc("/head/{db}", api.GetHighestBlockV3).Methods("GET")
v3Router.HandleFunc("/subscriptions", api.GetAllSubscriptions).Methods("GET")

server = &http.Server{
Addr: listenAddr,
Expand Down
2 changes: 2 additions & 0 deletions cmd/cql-observer/config_test.go
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
// +build !testbinary

/*
* Copyright 2018 The CovenantSQL Authors.
*
Expand Down
10 changes: 4 additions & 6 deletions cmd/cql-observer/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ import (
"github.com/CovenantSQL/CovenantSQL/crypto/asymmetric"
"github.com/CovenantSQL/CovenantSQL/crypto/kms"
"github.com/CovenantSQL/CovenantSQL/proto"
"github.com/CovenantSQL/CovenantSQL/rpc"
"github.com/CovenantSQL/CovenantSQL/utils"
"github.com/CovenantSQL/CovenantSQL/utils/log"
)
Expand Down Expand Up @@ -85,15 +84,14 @@ func main() {

kms.InitBP()

// start rpc
var server *rpc.Server
if server, err = initNode(); err != nil {
// init node
if err = initNode(); err != nil {
log.WithError(err).Fatal("init node failed")
}

// start service
var service *Service
if service, err = startService(server); err != nil {
if service, err = startService(); err != nil {
log.WithError(err).Fatal("start observation failed")
}

Expand Down Expand Up @@ -144,7 +142,7 @@ func main() {
}

// stop subscriptions
if err = stopService(service, server); err != nil {
if err = stopService(service); err != nil {
log.WithError(err).Fatal("stop service failed")
}

Expand Down
24 changes: 1 addition & 23 deletions cmd/cql-observer/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,18 +18,16 @@ package main

import (
"fmt"
"os"
"syscall"

"github.com/CovenantSQL/CovenantSQL/conf"
"github.com/CovenantSQL/CovenantSQL/crypto/kms"
"github.com/CovenantSQL/CovenantSQL/route"
"github.com/CovenantSQL/CovenantSQL/rpc"
"github.com/CovenantSQL/CovenantSQL/utils/log"
"golang.org/x/crypto/ssh/terminal"
)

func initNode() (server *rpc.Server, err error) {
func initNode() (err error) {
var masterKey []byte
if !conf.GConf.IsTestMode {
fmt.Print("Type in Master key to continue:")
Expand All @@ -50,25 +48,5 @@ func initNode() (server *rpc.Server, err error) {
// init kms routing
route.InitKMS(conf.GConf.PubKeyStoreFile)

// init server
if server, err = createServer(
conf.GConf.PrivateKeyFile, conf.GConf.PubKeyStoreFile, masterKey, conf.GConf.ListenAddr); err != nil {
log.WithError(err).Error("create server failed")
return
}

return
}

func createServer(privateKeyPath, pubKeyStorePath string, masterKey []byte, listenAddr string) (server *rpc.Server, err error) {
os.Remove(pubKeyStorePath)

server = rpc.NewServer()
if err != nil {
return
}

err = server.InitRPCServer(listenAddr, privateKeyPath, masterKey)

return
}
33 changes: 31 additions & 2 deletions cmd/cql-observer/observation_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -500,7 +500,7 @@ func TestFullProcess(t *testing.T) {
observerCmd.Cmd.Wait()
}()

// wait for the observer to collect blocks, two periods is enough
// wait for the observer to collect blocks
time.Sleep(conf.GConf.SQLChainPeriod * 5)

// test get genesis block by height
Expand Down Expand Up @@ -686,18 +686,47 @@ func TestFullProcess(t *testing.T) {
})
So(err, ShouldBeNil)

// wait for the observer to be enabled query by miner, and collect blocks
time.Sleep(conf.GConf.SQLChainPeriod * 5)

// test get genesis block by height
res, err = getJSON("v3/head/%v", dbID2)
So(err, ShouldBeNil)
So(ensureSuccess(res.Interface("block")), ShouldNotBeNil)
So(ensureSuccess(res.Int("block", "height")), ShouldEqual, 0)
So(ensureSuccess(res.Int("block", "height")), ShouldBeGreaterThanOrEqualTo, 0)
log.Info(err, res)

err = client.Drop(dsn)
So(err, ShouldBeNil)

err = client.Drop(dsn2)
So(err, ShouldBeNil)

observerCmd.Cmd.Process.Signal(os.Interrupt)
observerCmd.Cmd.Wait()

// start observer again
observerCmd, err = utils.RunCommandNB(
FJ(baseDir, "./bin/cql-observer.test"),
[]string{"-config", FJ(testWorkingDir, "./observation/node_observer/config.yaml"),
"-database", string(dbID), "-reset", "oldest",
"-test.coverprofile", FJ(baseDir, "./cmd/cql-observer/observer.cover.out"),
},
"observer", testWorkingDir, logDir, false,
)
So(err, ShouldBeNil)

// call observer subscription status
// wait for observer to start
time.Sleep(time.Second * 3)

res, err = getJSON("v3/subscriptions")
So(err, ShouldBeNil)
subscriptions, err := res.Object()
So(subscriptions, ShouldContainKey, string(dbID))
So(subscriptions, ShouldContainKey, string(dbID2))
So(subscriptions[string(dbID)], ShouldBeGreaterThanOrEqualTo, 1)
So(subscriptions[string(dbID2)], ShouldBeGreaterThanOrEqualTo, 0)
})
}

Expand Down
16 changes: 2 additions & 14 deletions cmd/cql-observer/observer.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (
"github.com/CovenantSQL/CovenantSQL/conf"
"github.com/CovenantSQL/CovenantSQL/crypto/kms"
"github.com/CovenantSQL/CovenantSQL/proto"
"github.com/CovenantSQL/CovenantSQL/route"
"github.com/CovenantSQL/CovenantSQL/rpc"
)

Expand All @@ -41,33 +40,22 @@ func registerNode() (err error) {
return
}

func startService(server *rpc.Server) (service *Service, err error) {
func startService() (service *Service, err error) {
// register observer service to rpc server
service, err = NewService()
if err != nil {
return
}

if err = server.RegisterService(route.ObserverRPCName, service); err != nil {
return
}

// start service rpc, observer acts as client role but listen to
go server.Serve()

// start observer service
service.start()

return
}

func stopService(service *Service, server *rpc.Server) (err error) {
func stopService(service *Service) (err error) {
// stop subscription
service.stop()

// stop rpc service
server.Listener.Close()
server.Stop()

return
}
Loading