Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
feat: add aws s3 storage based registry store
Signed-off-by: iamcodingcat <[email protected]>
  • Loading branch information
younghun-jo-levit-com authored and iamcodingcat committed May 13, 2025
commit 466cdd016f2dce84badb1f6ee3426e5fcd3955db
95 changes: 95 additions & 0 deletions go/internal/feast/registry/local.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,19 @@
package registry

import (
"context"
"errors"
"github.com/aws/aws-sdk-go-v2/aws"
awsConfig "github.com/aws/aws-sdk-go-v2/config"
"io/ioutil"
"os"
"path/filepath"
"strings"
"time"

"github.com/aws/aws-sdk-go-v2/service/s3"
"github.com/google/uuid"

"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/types/known/timestamppb"

Expand All @@ -17,6 +25,12 @@ type FileRegistryStore struct {
filePath string
}

// A S3RegistryStore is a S3 object storage-based implementation of the RegistryStore interface
type S3RegistryStore struct {
filePath string
s3Client *s3.Client
}

// NewFileRegistryStore creates a FileRegistryStore with the given configuration and infers
// the file path from the repo path and registry path.
func NewFileRegistryStore(config *RegistryConfig, repoPath string) *FileRegistryStore {
Expand All @@ -30,6 +44,26 @@ func NewFileRegistryStore(config *RegistryConfig, repoPath string) *FileRegistry
return &lr
}

// NewS3RegistryStore creates a S3RegistryStore with the given configuration
func NewS3RegistryStore(config *RegistryConfig, repoPath string) *S3RegistryStore {
var lr S3RegistryStore
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()

cfg, err := awsConfig.LoadDefaultConfig(ctx)
if err != nil {
lr = S3RegistryStore{
filePath: config.Path,
}
} else {
lr = S3RegistryStore{
filePath: config.Path,
s3Client: s3.NewFromConfig(cfg),
}
}
return &lr
}

// GetRegistryProto reads and parses the registry proto from the file path.
func (r *FileRegistryStore) GetRegistryProto() (*core.Registry, error) {
registry := &core.Registry{}
Expand Down Expand Up @@ -64,3 +98,64 @@ func (r *FileRegistryStore) writeRegistry(rp *core.Registry) error {
}
return nil
}

func (r *S3RegistryStore) GetRegistryProto() (*core.Registry, error) {
bucket, key, err := r.parseS3Path()
if err != nil {
return nil, err
}

ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
output, err := r.s3Client.GetObject(ctx,
&s3.GetObjectInput{
Bucket: aws.String(bucket),
Key: aws.String(key),
})
if err != nil {
return nil, err
}
defer output.Body.Close()

data, err := ioutil.ReadAll(output.Body)
if err != nil {
return nil, err
}

registry := &core.Registry{}
if err := proto.Unmarshal(data, registry); err != nil {
return nil, err
}
return registry, nil
}

func (r *S3RegistryStore) UpdateRegistryProto(rp *core.Registry) error {
return errors.New("not implemented in S3RegistryStore")
}

func (r *S3RegistryStore) Teardown() error {
bucket, key, err := r.parseS3Path()
if err != nil {
return err
}
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
_, err = r.s3Client.DeleteObject(ctx,
&s3.DeleteObjectInput{
Bucket: aws.String(bucket),
Key: aws.String(key),
})
if err != nil {
return err
}
return nil
}

func (r *S3RegistryStore) parseS3Path() (string, string, error) {
path := strings.TrimPrefix(r.filePath, "s3://")
parts := strings.SplitN(path, "/", 2)
if len(parts) != 2 {
return "", "", errors.New("invalid S3 file path format")
}
return parts[0], parts[1], nil
}
4 changes: 3 additions & 1 deletion go/internal/feast/registry/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ func NewRegistry(registryConfig *RegistryConfig, repoPath string, project string
func (r *Registry) InitializeRegistry() error {
_, err := r.getRegistryProto()
if err != nil {
if _, ok := r.registryStore.(*FileRegistryStore); ok {
if _, ok := r.registryStore.(*FileRegistryStore); ok { // S3에는 굳이 연동할 필요 없어 보임. 오히려 이로 인해 정상적이던 s3 내의 레지스트리 파일이 초기화된 파일로 덮어쓰기 될 수도 있지 않음?
log.Error().Err(err).Msg("Registry Initialization Failed")
return err
}
Expand Down Expand Up @@ -364,6 +364,8 @@ func getRegistryStoreFromType(registryStoreType string, registryConfig *Registry
switch registryStoreType {
case "FileRegistryStore":
return NewFileRegistryStore(registryConfig, repoPath), nil
case "S3RegistryStore":
return NewS3RegistryStore(registryConfig, repoPath), nil
}
return nil, errors.New("only FileRegistryStore as a RegistryStore is supported at this moment")
}