Skip to content

Commit f344de6

Browse files
authored
Add DynamoDB storage (#74)
* Add DynamoDB storage This implements the metastore for AWS dynamodb. * Add some dynamodb URL parsing logic as per specification * Added integration tests, some logging options and table creation. * Added storage errors and some checks for atomic operations * Adjust defaults to keep configuration options for endpoint simple
1 parent 689bfce commit f344de6

File tree

4 files changed

+610
-0
lines changed

4 files changed

+610
-0
lines changed

metastore/storage.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ const (
1313
SchemeTypeUnknown SchemeType = iota
1414
SchemeTypeFile
1515
SchemeTypeEtcd
16+
SchemeTypeDynamodb
1617
)
1718

1819
// Enum value maps for SchemeType.
@@ -80,6 +81,8 @@ func NewStorageWithUri(uri string, logger *zap.Logger) (Storage, error) {
8081
return NewFileSystemStorageWithUri(uri, metastoreLogger)
8182
case SchemeType_name[SchemeTypeEtcd]:
8283
return NewEtcdStorageWithUri(uri, metastoreLogger)
84+
case SchemeType_name[SchemeTypeDynamodb]:
85+
return NewDynamodbStorage(uri, metastoreLogger)
8386
default:
8487
err := errors.ErrUnsupportedStorageType
8588
metastoreLogger.Error(err.Error(), zap.String("scheme", u.Scheme))

metastore/storage_dynamodb.go

Lines changed: 299 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,299 @@
1+
package metastore
2+
3+
import (
4+
"context"
5+
"encoding/base64"
6+
"errors"
7+
"fmt"
8+
"path/filepath"
9+
"time"
10+
11+
"github.com/aws/aws-sdk-go-v2/aws"
12+
"github.com/aws/aws-sdk-go-v2/feature/dynamodb/attributevalue"
13+
"github.com/aws/aws-sdk-go-v2/feature/dynamodb/expression"
14+
"github.com/aws/aws-sdk-go-v2/service/dynamodb"
15+
"github.com/aws/aws-sdk-go-v2/service/dynamodb/types"
16+
"github.com/aws/smithy-go/logging"
17+
"go.uber.org/zap"
18+
)
19+
20+
const (
21+
partitionKeyName = "pk"
22+
sortKeyName = "sk"
23+
partitionValue = "metadata"
24+
)
25+
26+
var (
27+
// ErrRecordNotFound returned when a get returns no results
28+
ErrRecordNotFound = errors.New("record not found")
29+
30+
// ErrDuplicateRecord returned when a conflict occurs putting a record due to duplicate
31+
ErrDuplicateRecord = errors.New("record already exists")
32+
)
33+
34+
type kv struct {
35+
Partition string `dynamodbav:"pk"`
36+
Path string `dynamodbav:"sk"`
37+
Version string `dynamodbav:"version"`
38+
Value string `dynamodbav:"value"`
39+
}
40+
41+
type DynamodbStorage struct {
42+
dynamoSvc *dynamodb.Client
43+
tableName string
44+
root string
45+
logger *zap.Logger
46+
ctx context.Context
47+
requestTimeout time.Duration
48+
}
49+
50+
func NewDynamodbStorage(uri string, logger *zap.Logger) (*DynamodbStorage, error) {
51+
metastorelogger := logger.Named("dynamodb")
52+
53+
ctx := context.Background()
54+
55+
u, awsCfg, err := buildAwsCfg(uri)
56+
if err != nil {
57+
return nil, err
58+
}
59+
60+
if u.Query().Get("aws_sdk_request_logging") == "true" {
61+
awsCfg.Logger = logging.LoggerFunc(func(classification logging.Classification, format string, v ...interface{}) {
62+
metastorelogger.Info(fmt.Sprintf(format, v...))
63+
})
64+
65+
awsCfg.ClientLogMode = aws.LogRetries | aws.LogRequest
66+
}
67+
68+
ds := &DynamodbStorage{
69+
dynamoSvc: dynamodb.NewFromConfig(awsCfg),
70+
tableName: u.Host,
71+
root: u.Path,
72+
logger: metastorelogger,
73+
ctx: ctx,
74+
requestTimeout: 3 * time.Second,
75+
}
76+
77+
// primarily used for testing locally
78+
if u.Query().Get("create_table") == "true" {
79+
err := ds.createTable()
80+
if err != nil {
81+
return nil, err
82+
}
83+
}
84+
85+
return ds, nil
86+
}
87+
88+
// Replace the path separator with '/'.
89+
func (m *DynamodbStorage) makePath(path string) string {
90+
return filepath.ToSlash(filepath.Join(filepath.ToSlash(m.root), filepath.ToSlash(path)))
91+
}
92+
93+
func (m *DynamodbStorage) Get(path string) ([]byte, error) {
94+
fullPath := m.makePath(path)
95+
96+
res, err := m.dynamoSvc.GetItem(m.ctx, &dynamodb.GetItemInput{
97+
TableName: aws.String(m.tableName),
98+
Key: map[string]types.AttributeValue{
99+
partitionKeyName: &types.AttributeValueMemberS{
100+
Value: partitionValue,
101+
},
102+
sortKeyName: &types.AttributeValueMemberS{
103+
Value: fullPath,
104+
},
105+
},
106+
ConsistentRead: aws.Bool(true), // enable consistent reads as we need this for atomic reads
107+
})
108+
if err != nil {
109+
m.logger.Error(err.Error(), zap.String("key", fullPath))
110+
return nil, err
111+
}
112+
113+
if res.Item == nil {
114+
return nil, ErrRecordNotFound
115+
}
116+
117+
m.logger.Info("get record", zap.String("fullPath", fullPath))
118+
119+
rec := new(kv)
120+
err = attributevalue.UnmarshalMap(res.Item, rec)
121+
if err != nil {
122+
return nil, err
123+
}
124+
125+
return base64.RawStdEncoding.DecodeString(rec.Value)
126+
}
127+
128+
func (m *DynamodbStorage) Put(path string, value []byte) error {
129+
fullPath := m.makePath(path)
130+
131+
rec := &kv{Partition: partitionValue, Path: fullPath, Value: base64.RawURLEncoding.EncodeToString(value)}
132+
133+
attr, err := attributevalue.MarshalMap(rec)
134+
if err != nil {
135+
m.logger.Error(err.Error(), zap.String("key", fullPath))
136+
return err
137+
}
138+
139+
// this adds a condition which checks if the sort key already exists, if it does
140+
// this operation will return a condition error.
141+
existCond := expression.AttributeNotExists(expression.Name(sortKeyName))
142+
condExpr, err := expression.NewBuilder().WithCondition(existCond).Build()
143+
if err != nil {
144+
m.logger.Error(err.Error(), zap.String("key", fullPath))
145+
return err
146+
}
147+
148+
_, err = m.dynamoSvc.PutItem(m.ctx, &dynamodb.PutItemInput{
149+
TableName: aws.String(m.tableName),
150+
Item: attr,
151+
ConditionExpression: condExpr.Condition(),
152+
ExpressionAttributeNames: condExpr.Names(),
153+
ExpressionAttributeValues: condExpr.Values(),
154+
})
155+
if err != nil {
156+
var rne *types.ConditionalCheckFailedException
157+
if errors.As(err, &rne) {
158+
return ErrDuplicateRecord
159+
}
160+
161+
m.logger.Error(err.Error(), zap.String("key", fullPath))
162+
return err
163+
}
164+
165+
m.logger.Info("put record", zap.String("fullPath", fullPath))
166+
167+
return nil
168+
}
169+
170+
func (m *DynamodbStorage) List(prefix string) ([]string, error) {
171+
prefixPath := m.makePath(prefix)
172+
173+
keyCond := expression.
174+
Key(partitionKeyName).Equal(expression.Value(partitionValue)).
175+
And(expression.Key(sortKeyName).BeginsWith(prefixPath))
176+
177+
keyExpr, err := expression.NewBuilder().WithKeyCondition(keyCond).Build()
178+
if err != nil {
179+
m.logger.Error(err.Error(), zap.String("key", prefixPath))
180+
return nil, err
181+
}
182+
183+
res, err := m.dynamoSvc.Query(m.ctx, &dynamodb.QueryInput{
184+
TableName: &m.tableName,
185+
KeyConditionExpression: keyExpr.KeyCondition(),
186+
ExpressionAttributeNames: keyExpr.Names(),
187+
ExpressionAttributeValues: keyExpr.Values(),
188+
ConsistentRead: aws.Bool(true), // enable consistent reads as we need this for atomic reads
189+
})
190+
if err != nil {
191+
m.logger.Error(err.Error(), zap.String("key", prefixPath))
192+
return nil, err
193+
}
194+
195+
paths := make([]string, 0)
196+
197+
for _, item := range res.Items {
198+
rec := new(kv)
199+
err = attributevalue.UnmarshalMap(item, rec)
200+
if err != nil {
201+
m.logger.Error(err.Error(), zap.String("key", prefixPath))
202+
return nil, err
203+
}
204+
205+
paths = append(paths, rec.Path[len(prefixPath):])
206+
}
207+
208+
return paths, nil
209+
}
210+
211+
func (m *DynamodbStorage) Delete(path string) error {
212+
fullPath := m.makePath(path)
213+
214+
_, err := m.dynamoSvc.DeleteItem(m.ctx, &dynamodb.DeleteItemInput{
215+
TableName: aws.String(m.tableName),
216+
Key: map[string]types.AttributeValue{
217+
partitionKeyName: &types.AttributeValueMemberS{
218+
Value: partitionValue,
219+
},
220+
sortKeyName: &types.AttributeValueMemberS{
221+
Value: fullPath,
222+
},
223+
},
224+
})
225+
if err != nil {
226+
m.logger.Error(err.Error(), zap.String("key", fullPath))
227+
return err
228+
}
229+
230+
return nil
231+
}
232+
233+
func (m *DynamodbStorage) Exists(path string) (bool, error) {
234+
fullPath := m.makePath(path)
235+
236+
res, err := m.dynamoSvc.GetItem(m.ctx, &dynamodb.GetItemInput{
237+
TableName: aws.String(m.tableName),
238+
Key: map[string]types.AttributeValue{
239+
partitionKeyName: &types.AttributeValueMemberS{
240+
Value: partitionValue,
241+
},
242+
sortKeyName: &types.AttributeValueMemberS{
243+
Value: fullPath,
244+
},
245+
},
246+
ConsistentRead: aws.Bool(true), // enable consistent reads as we need this for atomic reads
247+
})
248+
if err != nil {
249+
m.logger.Error(err.Error(), zap.String("key", fullPath))
250+
return false, err
251+
}
252+
253+
exists := res.Item != nil
254+
255+
m.logger.Info("check record exists", zap.String("fullPath", fullPath), zap.Bool("exists", exists))
256+
257+
return exists, nil
258+
}
259+
260+
func (m *DynamodbStorage) Close() error {
261+
return nil
262+
}
263+
264+
func (m *DynamodbStorage) createTable() error {
265+
_, err := m.dynamoSvc.CreateTable(m.ctx, &dynamodb.CreateTableInput{
266+
TableName: &m.tableName,
267+
AttributeDefinitions: []types.AttributeDefinition{
268+
{
269+
AttributeName: aws.String(partitionKeyName),
270+
AttributeType: types.ScalarAttributeTypeS,
271+
},
272+
{
273+
AttributeName: aws.String(sortKeyName),
274+
AttributeType: types.ScalarAttributeTypeS,
275+
},
276+
},
277+
KeySchema: []types.KeySchemaElement{
278+
{
279+
AttributeName: aws.String(partitionKeyName),
280+
KeyType: types.KeyTypeHash,
281+
},
282+
{
283+
AttributeName: aws.String(sortKeyName),
284+
KeyType: types.KeyTypeRange,
285+
},
286+
},
287+
BillingMode: types.BillingModePayPerRequest,
288+
})
289+
if err != nil {
290+
var rne *types.ResourceInUseException
291+
if errors.As(err, &rne) {
292+
return nil
293+
}
294+
295+
return err
296+
}
297+
298+
return nil
299+
}

0 commit comments

Comments
 (0)