Skip to content

Commit cc77b33

Browse files
committed
refactor: tighten kv entry ownership and key semantics
1 parent 3ce9e26 commit cc77b33

File tree

14 files changed

+162
-148
lines changed

14 files changed

+162
-148
lines changed

db.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -652,7 +652,7 @@ func (db *DB) loadBorrowedEntry(internalKey []byte) (*kv.Entry, error) {
652652
vp.Decode(entry.Value)
653653
result, cb, readErr := db.vlog.read(&vp)
654654
if cb != nil {
655-
defer kv.RunCallback(cb)
655+
defer cb()
656656
}
657657
if readErr != nil {
658658
entry.DecrRef()
@@ -722,7 +722,7 @@ func (db *DB) detachValuePointerEntry(src *kv.Entry) (*kv.Entry, error) {
722722
vp.Decode(src.Value)
723723
result, cb, err := db.vlog.read(&vp)
724724
if cb != nil {
725-
defer kv.RunCallback(cb)
725+
defer cb()
726726
}
727727
if err != nil {
728728
return nil, err

iterator.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,7 @@ func (it *Item) ValueCopy(dst []byte) ([]byte, error) {
7575
vp.Decode(val)
7676
fetched, cb, err := it.vlog.read(&vp)
7777
if cb != nil {
78-
defer kv.RunCallback(cb)
78+
defer cb()
7979
}
8080
if err != nil {
8181
return nil, err
@@ -504,7 +504,7 @@ func (iter *DBIterator) materializeDecoded(src *kv.Entry, cf kv.ColumnFamily, us
504504
vp.Decode(src.Value)
505505
val, cb, err := iter.vlog.read(&vp)
506506
if cb != nil {
507-
defer kv.RunCallback(cb)
507+
defer cb()
508508
}
509509
if err != nil {
510510
return false, errors.Wrapf(err, "value-log read failed for key %q", userKey)

kv/entry.go

Lines changed: 45 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ import (
77
"time"
88
)
99

10-
var EntryPool = sync.Pool{
10+
var entryPool = sync.Pool{
1111
New: func() any {
1212
return &Entry{}
1313
},
@@ -30,7 +30,10 @@ var EntryPool = sync.Pool{
3030
//
3131
// Entry itself does not validate key/value semantics. Callers that persist
3232
// entries are responsible for ensuring Key and Value already use the expected
33-
// internal encodings for that path.
33+
// bytes for that path:
34+
// - public APIs usually start from user keys
35+
// - engine-internal storage paths usually carry internal keys
36+
// - range/filter helpers often operate on base keys
3437
type Entry struct {
3538
Key []byte
3639
Value []byte
@@ -72,7 +75,7 @@ func (e *Entry) DecrRef() {
7275
}
7376
if current == 1 {
7477
e.reset()
75-
EntryPool.Put(e)
78+
entryPool.Put(e)
7679
}
7780
return
7881
}
@@ -91,38 +94,64 @@ func (e *Entry) reset() {
9194
e.ref = 0
9295
}
9396

94-
// NewEntry creates a lightweight entry from the provided key/value.
97+
func acquireEntry() *Entry {
98+
e := entryPool.Get().(*Entry)
99+
e.IncrRef()
100+
return e
101+
}
102+
103+
// NewEntry creates a lightweight entry from arbitrary key/value bytes.
95104
//
96-
// It does not parse or validate key layout. CF is initialized to CFDefault and
97-
// Version remains unset (0) until filled by a caller that knows version context.
98-
func NewEntry(key, value []byte) *Entry {
99-
e := EntryPool.Get().(*Entry)
100-
e.Key = key
101-
e.Value = value
105+
// It does not parse or validate key layout; keyBytes may be a user key, base
106+
// key, internal key, or path-local scratch bytes. CF is initialized to
107+
// CFDefault and Version remains unset (0) until filled by a caller that knows
108+
// MVCC context.
109+
func NewEntry(keyBytes, valueBytes []byte) *Entry {
110+
e := acquireEntry()
111+
e.Key = keyBytes
112+
e.Value = valueBytes
102113
e.CF = CFDefault
103-
e.IncrRef()
104114
return e
105115
}
106116

107-
// NewInternalEntry creates an Entry whose key is encoded as an internal key.
117+
// NewInternalEntry creates an Entry whose Key is encoded from the supplied
118+
// userKey into canonical internal-key layout.
108119
//
109120
// Ownership note: userKey is encoded into a newly allocated internal-key buffer,
110121
// while value is referenced directly (no deep copy). Callers must keep value
111122
// immutable until the entry is no longer used.
112123
//
113124
// This helper also sets CF and Version to the supplied MVCC context.
114-
func NewInternalEntry(cf ColumnFamily, userKey []byte, version uint64, value []byte, meta byte, expiresAt uint64) *Entry {
125+
func NewInternalEntry(cf ColumnFamily, userKey []byte, version uint64, valueBytes []byte, meta byte, expiresAt uint64) *Entry {
115126
if !cf.Valid() {
116127
cf = CFDefault
117128
}
118-
e := EntryPool.Get().(*Entry)
129+
e := acquireEntry()
119130
e.Key = InternalKey(cf, userKey, version)
120-
e.Value = value
131+
e.Value = valueBytes
121132
e.CF = cf
122133
e.Version = version
123134
e.Meta = meta
124135
e.ExpiresAt = expiresAt
125-
e.IncrRef()
136+
return e
137+
}
138+
139+
// NewValueStructEntry wraps an internal-key/value-struct lookup result in a
140+
// pooled Entry. internalKey is expected to already use canonical internal-key
141+
// layout; CF and Version are derived when parsing succeeds and left at defaults
142+
// otherwise.
143+
func NewValueStructEntry(internalKey []byte, vs ValueStruct) *Entry {
144+
e := acquireEntry()
145+
e.Key = internalKey
146+
e.Value = vs.Value
147+
e.ExpiresAt = vs.ExpiresAt
148+
e.Meta = vs.Meta
149+
e.CF = CFDefault
150+
e.Version = 0
151+
if cf, _, version, ok := SplitInternalKey(internalKey); ok {
152+
e.CF = cf
153+
e.Version = version
154+
}
126155
return e
127156
}
128157

kv/entry_codec.go

Lines changed: 15 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -160,27 +160,28 @@ func EncodeEntry(buf *bytes.Buffer, e *Entry) ([]byte, error) {
160160

161161
// EncodeEntryTo is the core streaming encoder.
162162
// It serializes the entry fields currently stored in e and writes them directly
163-
// to w. The encoder does not validate whether Key is an internal key or whether
164-
// Value already uses ValueStruct/ValuePtr layout; callers that persist entries
165-
// are responsible for supplying bytes in the correct internal format.
163+
// to w. The encoder treats e.Key and e.Value as raw bytes and does not validate
164+
// whether e.Key is a user key, base key, or internal key, nor whether e.Value
165+
// already uses ValueStruct/ValuePtr layout; callers that persist entries are
166+
// responsible for supplying bytes in the correct internal format.
166167
//
167168
// The encoded layout is: | header | key | value | crc32 |
168169
// - header: Varint-encoded, contains Key/Value lengths, Meta, and ExpiresAt.
169170
// - crc32: 4 bytes, BigEndian, checksums the header, key, and value.
170-
func EncodeEntryTo(w io.Writer, e *Entry) (int, error) {
171+
func EncodeEntryTo(w io.Writer, entry *Entry) (int, error) {
171172
header := EntryHeader{
172-
KeyLen: uint32(len(e.Key)),
173-
ValueLen: uint32(len(e.Value)),
174-
Meta: e.Meta,
175-
ExpiresAt: e.ExpiresAt,
173+
KeyLen: uint32(len(entry.Key)),
174+
ValueLen: uint32(len(entry.Value)),
175+
Meta: entry.Meta,
176+
ExpiresAt: entry.ExpiresAt,
176177
}
177178

178179
baseHeaderBuf := headerPool.Get().(*[]byte)
179180
headerBuf := (*baseHeaderBuf)[:MaxEntryHeaderSize]
180181
sz := header.Encode(headerBuf)
181182
if sz > len(headerBuf) {
182183
headerPool.Put(baseHeaderBuf)
183-
return 0, fmt.Errorf("entry header overflow: sz=%d cap=%d key=%d val=%d meta=%d expires=%d", sz, len(headerBuf), len(e.Key), len(e.Value), header.Meta, header.ExpiresAt)
184+
return 0, fmt.Errorf("entry header overflow: sz=%d cap=%d key=%d val=%d meta=%d expires=%d", sz, len(headerBuf), len(entry.Key), len(entry.Value), header.Meta, header.ExpiresAt)
184185
}
185186
headerBuf = headerBuf[:sz]
186187

@@ -221,10 +222,10 @@ func EncodeEntryTo(w io.Writer, e *Entry) (int, error) {
221222
return 0, err
222223
}
223224
headerPool.Put(baseHeaderBuf)
224-
if err := writeSection(e.Key); err != nil {
225+
if err := writeSection(entry.Key); err != nil {
225226
return 0, err
226227
}
227-
if err := writeSection(e.Value); err != nil {
228+
if err := writeSection(entry.Value); err != nil {
228229
return 0, err
229230
}
230231

@@ -240,7 +241,8 @@ func EncodeEntryTo(w io.Writer, e *Entry) (int, error) {
240241
// DecodeEntryFrom is the core streaming decoder.
241242
// It reads one encoded entry record from r into an Entry object and performs a
242243
// full CRC32 checksum verification. The decoder restores raw Key/Value bytes;
243-
// callers that require richer semantics must interpret them separately.
244+
// callers that require richer semantics must interpret whether Entry.Key is a
245+
// user key, base key, or internal key separately.
244246
//
245247
// It expects the data stream to have the layout: | header | key | value | crc32 |
246248
//
@@ -278,8 +280,7 @@ func DecodeEntryFrom(r io.Reader) (*Entry, uint32, error) {
278280
return nil, 0, ErrPartialEntry
279281
}
280282

281-
entry := EntryPool.Get().(*Entry)
282-
entry.IncrRef()
283+
entry := acquireEntry()
283284
entry.CF = CFDefault
284285
entry.Version = 0
285286
entry.ValThreshold = 0

kv/entry_iterator.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,9 @@ import (
1111
// | header(varint fields) | key bytes | value bytes | crc32 (4B) |
1212
//
1313
// The header encodes key length, value length, meta, and expiresAt via
14-
// uvarint encoding (see EntryHeader).
14+
// uvarint encoding (see EntryHeader). Entry.Key is returned as raw bytes; the
15+
// caller is responsible for knowing whether a given stream stores user keys,
16+
// base keys, or internal keys.
1517
type EntryIterator struct {
1618
reader *bufio.Reader
1719
current *Entry

kv/entry_test.go

Lines changed: 17 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -230,22 +230,6 @@ func TestValueHelpers(t *testing.T) {
230230
back := BytesToU32Slice(raw)
231231
require.Equal(t, u32s, back)
232232

233-
called := false
234-
RunCallback(nil)
235-
RunCallback(func() { called = true })
236-
if !called {
237-
t.Fatalf("expected callback to run")
238-
}
239-
240-
vs := &Entry{Meta: BitValuePointer, Value: []byte("vp"), ExpiresAt: uint64(time.Now().Add(time.Hour).Unix())}
241-
if DiscardEntry(vs) {
242-
t.Fatalf("expected pointer entry to be retained")
243-
}
244-
vs.Meta = 0
245-
if !DiscardEntry(vs) {
246-
t.Fatalf("expected inline entry to be discarded")
247-
}
248-
249233
var nilEntry *Entry
250234
if !nilEntry.IsDeletedOrExpired() {
251235
t.Fatalf("expected nil entry to be treated as deleted")
@@ -267,6 +251,23 @@ func TestValueHelpers(t *testing.T) {
267251
}
268252
}
269253

254+
func TestNewValueStructEntry(t *testing.T) {
255+
key := InternalKey(CFWrite, []byte("user-key"), 42)
256+
vs := ValueStruct{
257+
Meta: BitValuePointer,
258+
Value: []byte("value"),
259+
ExpiresAt: 123,
260+
}
261+
e := NewValueStructEntry(key, vs)
262+
t.Cleanup(func() { e.DecrRef() })
263+
require.Equal(t, key, e.Key)
264+
require.Equal(t, vs.Value, e.Value)
265+
require.Equal(t, vs.Meta, e.Meta)
266+
require.Equal(t, vs.ExpiresAt, e.ExpiresAt)
267+
require.Equal(t, CFWrite, e.CF)
268+
require.Equal(t, uint64(42), e.Version)
269+
}
270+
270271
func encodeValueStruct(v ValueStruct) []byte {
271272
buf := make([]byte, v.EncodedSize())
272273
v.EncodeValue(buf)

0 commit comments

Comments
 (0)