Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion xenomint/query_sanitizer.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ var (

func convertQueryAndBuildArgs(pattern string, args []types.NamedArg) (containsDDL bool, p string, ifs []interface{}, err error) {
if lower := strings.ToLower(pattern); strings.Contains(lower, "begin") ||
strings.Contains(lower, "rollback") {
strings.Contains(lower, "rollback") || strings.Contains(lower, "commit") {
return false, pattern, nil, nil
}
var (
Expand Down
67 changes: 67 additions & 0 deletions xenomint/state_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"fmt"
"os"
"path"
"reflect"
"sync"
"testing"

Expand Down Expand Up @@ -702,6 +703,72 @@ func TestSerializableState(t *testing.T) {
_, resp, err = state.Query(req, true)
So(err, ShouldBeNil)
So(resp, ShouldNotBeNil)
Convey("The state should keep consistent with committed transaction", func(c C) {
var (
count = 1000
insertQueries = make([]types.Query, count+2)
deleteQueries = make([]types.Query, count+2)
iReq, dReq *types.Request
)
insertQueries[0] = buildQuery(`BEGIN`)
deleteQueries[0] = buildQuery(`BEGIN`)
for i := 0; i < count; i++ {
insertQueries[i+1] = buildQuery(
`INSERT INTO t1(k, v) VALUES (?, ?)`, i, fmt.Sprintf("v%d", i),
)
deleteQueries[i+1] = buildQuery(`DELETE FROM t1 WHERE k=?`, i)
}
insertQueries[count+1] = buildQuery(`COMMIT`)
deleteQueries[count+1] = buildQuery(`COMMIT`)
iReq = buildRequest(types.WriteQuery, insertQueries)
dReq = buildRequest(types.WriteQuery, deleteQueries)

var (
wg = &sync.WaitGroup{}
ctx, cancel = context.WithCancel(context.Background())
)
defer func() {
cancel()
wg.Wait()
}()
wg.Add(1)
go func() {
defer wg.Done()
var (
resp *types.Response
err error
)
for {
_, resp, err = state.Query(iReq, true)
c.So(err, ShouldBeNil)
c.Printf("insert affected rows: %d\n", resp.Header.AffectedRows)
_, resp, err = state.Query(dReq, true)
c.So(err, ShouldBeNil)
c.Printf("delete affected rows: %d\n", resp.Header.AffectedRows)
select {
case <-ctx.Done():
return
default:
}
}
}()

for i := 0; i < count; i++ {
_, resp, err = state.Query(buildRequest(types.ReadQuery, []types.Query{
buildQuery(`SELECT COUNT(1) AS cnt FROM t1`),
}), true)
So(reflect.DeepEqual(resp.Payload, types.ResponsePayload{
Columns: []string{"cnt"},
DeclTypes: []string{""},
Rows: []types.ResponseRow{{Values: []interface{}{int64(0)}}},
}) || reflect.DeepEqual(resp.Payload, types.ResponsePayload{
Columns: []string{"cnt"},
DeclTypes: []string{""},
Rows: []types.ResponseRow{{Values: []interface{}{int64(count)}}},
}), ShouldBeTrue)
Printf("index = %d, count = %v\n", i, resp)
}
})
Convey("The state should not see uncommitted changes", func(c C) {
// Build transaction query
var (
Expand Down