generated from bool64/go-template
-
Notifications
You must be signed in to change notification settings - Fork 2
Expand file tree
/
Copy pathinvalidator.go
More file actions
195 lines (151 loc) · 4.53 KB
/
invalidator.go
File metadata and controls
195 lines (151 loc) · 4.53 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
package cache
import (
"context"
"errors"
"fmt"
"sync"
"time"
)
// Invalidator is a registry of cache expiration triggers.
type Invalidator struct {
sync.Mutex
// SkipInterval defines minimal duration between two cache invalidations (flood protection).
SkipInterval time.Duration
// Callbacks contains a list of functions to call on invalidate.
Callbacks []func(ctx context.Context)
lastRun time.Time
}
// Invalidate triggers cache expiration.
func (i *Invalidator) Invalidate(ctx context.Context) error {
if i.Callbacks == nil {
return ErrNothingToInvalidate
}
i.Lock()
defer i.Unlock()
if i.SkipInterval == 0 {
i.SkipInterval = 15 * time.Second
}
if time.Since(i.lastRun) < i.SkipInterval {
return fmt.Errorf("%w at %s, %s did not pass",
ErrAlreadyInvalidated, i.lastRun.String(), i.SkipInterval.String())
}
i.lastRun = time.Now()
for _, cb := range i.Callbacks {
cb(ctx)
}
return nil
}
// InvalidationIndex keeps index of keys labeled for future invalidation.
type InvalidationIndex struct {
deleters map[string][]Deleter // By name.
mu sync.Mutex
labeledKeysByName map[string]map[string][]string
}
// NewInvalidationIndex creates new instance of label-based invalidator.
func NewInvalidationIndex(deleters ...Deleter) *InvalidationIndex {
ds := make(map[string][]Deleter)
if len(deleters) > 0 {
ds["default"] = deleters
}
return &InvalidationIndex{
deleters: ds,
labeledKeysByName: make(map[string]map[string][]string), // name -> labeledKeys
}
}
// AddCache adds a named instance of cache with deletable entries.
func (i *InvalidationIndex) AddCache(name string, deleter Deleter) {
i.mu.Lock()
defer i.mu.Unlock()
i.deleters[name] = append(i.deleters[name], deleter)
}
// AddInvalidationLabels registers invalidation labels to a cache key in default cache.
func (i *InvalidationIndex) AddInvalidationLabels(key []byte, labels ...string) {
i.AddLabels("default", key, labels...)
}
// AddLabels registers invalidation labels to a cache key.
func (i *InvalidationIndex) AddLabels(cacheName string, key []byte, labels ...string) {
i.mu.Lock()
defer i.mu.Unlock()
labeledKeys := i.labeledKeysByName[cacheName]
if labeledKeys == nil {
labeledKeys = make(map[string][]string)
i.labeledKeysByName[cacheName] = labeledKeys
}
ks := string(key)
for _, label := range labels {
labeledKeys[label] = append(labeledKeys[label], ks)
}
}
// InvalidateByLabels deletes keys from cache that have any of provided labels and returns number of deleted entries.
// If delete fails, function puts unprocessed keys back in the index and returns.
func (i *InvalidationIndex) InvalidateByLabels(ctx context.Context, labels ...string) (int, error) {
i.mu.Lock()
labeledKeysByName := make(map[string]map[string][]string)
deleters := make(map[string][]Deleter)
for name, labeledKeys := range i.labeledKeysByName {
labeledKeysByName[name] = labeledKeys
deleters[name] = i.deleters[name]
}
i.mu.Unlock()
cnt := 0
for name, labeledKeys := range i.labeledKeysByName {
n, err := i.invalidateByLabels(ctx, labeledKeys, deleters[name], labels...)
cnt += n
if err != nil {
return cnt, err
}
}
return cnt, nil
}
func (i *InvalidationIndex) invalidateByLabels(ctx context.Context, labeledKeys map[string][]string, deleters []Deleter, labels ...string) (int, error) {
cutKeys := i.cutKeys(labeledKeys, labels...)
deleted := make(map[string]bool) // Deduplication index to avoid multiple deletes for keys with multiple labels.
defer func() {
// Return unprocessed keys back.
if len(cutKeys) > 0 {
i.mu.Lock()
defer i.mu.Unlock()
for label, keys := range cutKeys {
// Cut keys already deleted in other labels.
for j, k := range keys {
if deleted[k] {
keys[j] = keys[len(keys)-1]
keys = keys[:len(keys)-1]
}
}
labeledKeys[label] = append(labeledKeys[label], keys...)
}
}
}()
cnt := 0
for _, label := range labels {
for _, k := range cutKeys[label] {
if deleted[k] {
continue
}
for _, d := range deleters {
err := d.Delete(ctx, []byte(k))
if err != nil {
if !errors.Is(err, ErrNotFound) {
return cnt, err
}
} else {
cnt++
}
}
deleted[k] = true
}
delete(cutKeys, label)
}
return cnt, nil
}
func (i *InvalidationIndex) cutKeys(labeledKeys map[string][]string, labels ...string) map[string][]string {
res := make(map[string][]string, len(labels))
i.mu.Lock()
defer i.mu.Unlock()
for _, label := range labels {
res[label] = labeledKeys[label]
delete(labeledKeys, label)
}
return res
}