Skip to content

Commit

Permalink
fix: CopyObject behavior on expanded zones (minio#9729)
Browse files Browse the repository at this point in the history
CopyObject was not correctly figuring out the correct
destination object location and would end up creating
duplicate objects on two different zones, reproduced
by doing encryption based key rotation.
  • Loading branch information
harshavardhana authored May 28, 2020
1 parent b2db812 commit 41688a9
Show file tree
Hide file tree
Showing 8 changed files with 79 additions and 73 deletions.
2 changes: 1 addition & 1 deletion cmd/api-headers.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ func setObjectHeaders(w http.ResponseWriter, objInfo ObjectInfo, rs *HTTPRangeSp

// Set all other user defined metadata.
for k, v := range objInfo.UserDefined {
if strings.HasPrefix(k, ReservedMetadataPrefix) {
if strings.HasPrefix(strings.ToLower(k), ReservedMetadataPrefixLower) {
// Do not need to send any internal metadata
// values to client.
continue
Expand Down
2 changes: 1 addition & 1 deletion cmd/api-response.go
Original file line number Diff line number Diff line change
Expand Up @@ -541,7 +541,7 @@ func generateListObjectsV2Response(bucket, prefix, token, nextToken, startAfter,
if metadata {
content.UserMetadata = make(StringMap)
for k, v := range CleanMinioInternalMetadataKeys(object.UserDefined) {
if strings.HasPrefix(k, ReservedMetadataPrefix) {
if strings.HasPrefix(strings.ToLower(k), ReservedMetadataPrefixLower) {
// Do not need to send any internal metadata
// values to client.
continue
Expand Down
4 changes: 2 additions & 2 deletions cmd/disk-cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,14 +99,14 @@ func (c *cacheObjects) updateMetadataIfChanged(ctx context.Context, dcache *disk
bkMeta := make(map[string]string)
cacheMeta := make(map[string]string)
for k, v := range bkObjectInfo.UserDefined {
if strings.HasPrefix(k, ReservedMetadataPrefix) {
if strings.HasPrefix(strings.ToLower(k), ReservedMetadataPrefixLower) {
// Do not need to send any internal metadata
continue
}
bkMeta[http.CanonicalHeaderKey(k)] = v
}
for k, v := range cacheObjInfo.UserDefined {
if strings.HasPrefix(k, ReservedMetadataPrefix) {
if strings.HasPrefix(strings.ToLower(k), ReservedMetadataPrefixLower) {
// Do not need to send any internal metadata
continue
}
Expand Down
7 changes: 5 additions & 2 deletions cmd/generic-handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,10 @@ func isHTTPHeaderSizeTooLarge(header http.Header) bool {

// ReservedMetadataPrefix is the prefix of a metadata key which
// is reserved and for internal use only.
const ReservedMetadataPrefix = "X-Minio-Internal-"
const (
ReservedMetadataPrefix = "X-Minio-Internal-"
ReservedMetadataPrefixLower = "x-minio-internal-"
)

type reservedMetadataHandler struct {
http.Handler
Expand All @@ -141,7 +144,7 @@ func (h reservedMetadataHandler) ServeHTTP(w http.ResponseWriter, r *http.Reques
// and must not set by clients
func containsReservedMetadata(header http.Header) bool {
for key := range header {
if HasPrefix(key, ReservedMetadataPrefix) {
if strings.HasPrefix(strings.ToLower(key), ReservedMetadataPrefixLower) {
return true
}
}
Expand Down
2 changes: 1 addition & 1 deletion cmd/object-handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -995,7 +995,7 @@ func (api objectAPIHandlers) CopyObjectHandler(w http.ResponseWriter, r *http.Re
}

for k, v := range srcInfo.UserDefined {
if HasPrefix(k, ReservedMetadataPrefix) {
if strings.HasPrefix(strings.ToLower(k), ReservedMetadataPrefixLower) {
encMetadata[k] = v
}
}
Expand Down
11 changes: 5 additions & 6 deletions cmd/xl-sets.go
Original file line number Diff line number Diff line change
Expand Up @@ -773,18 +773,17 @@ func (s *xlSets) DeleteObjects(ctx context.Context, bucket string, objects []str
}

// CopyObject - copies objects from one hashedSet to another hashedSet, on server side.
func (s *xlSets) CopyObject(ctx context.Context, srcBucket, srcObject, destBucket, destObject string, srcInfo ObjectInfo, srcOpts, dstOpts ObjectOptions) (objInfo ObjectInfo, err error) {
func (s *xlSets) CopyObject(ctx context.Context, srcBucket, srcObject, dstBucket, dstObject string, srcInfo ObjectInfo, srcOpts, dstOpts ObjectOptions) (objInfo ObjectInfo, err error) {
srcSet := s.getHashedSet(srcObject)
destSet := s.getHashedSet(destObject)
dstSet := s.getHashedSet(dstObject)

// Check if this request is only metadata update.
cpSrcDstSame := isStringEqual(pathJoin(srcBucket, srcObject), pathJoin(destBucket, destObject))
if cpSrcDstSame && srcInfo.metadataOnly {
return srcSet.CopyObject(ctx, srcBucket, srcObject, destBucket, destObject, srcInfo, srcOpts, dstOpts)
if srcSet == dstSet && srcInfo.metadataOnly {
return srcSet.CopyObject(ctx, srcBucket, srcObject, dstBucket, dstObject, srcInfo, srcOpts, dstOpts)
}

putOpts := ObjectOptions{ServerSideEncryption: dstOpts.ServerSideEncryption, UserDefined: srcInfo.UserDefined}
return destSet.putObject(ctx, destBucket, destObject, srcInfo.PutObjReader, putOpts)
return dstSet.putObject(ctx, dstBucket, dstObject, srcInfo.PutObjReader, putOpts)
}

// FileInfoCh - file info channel
Expand Down
83 changes: 40 additions & 43 deletions cmd/xl-v1-object.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,64 +65,61 @@ func (xl xlObjects) putObjectDir(ctx context.Context, bucket, object string, wri
// if source object and destination object are same we only
// update metadata.
func (xl xlObjects) CopyObject(ctx context.Context, srcBucket, srcObject, dstBucket, dstObject string, srcInfo ObjectInfo, srcOpts, dstOpts ObjectOptions) (oi ObjectInfo, e error) {
cpSrcDstSame := isStringEqual(pathJoin(srcBucket, srcObject), pathJoin(dstBucket, dstObject))

// Check if this request is only metadata update.
if cpSrcDstSame {
defer ObjectPathUpdated(path.Join(dstBucket, dstObject))
// This call shouldn't be used for anything other than metadata updates.
if !srcInfo.metadataOnly {
return oi, NotImplemented{}
}

// Read metadata associated with the object from all disks.
storageDisks := xl.getDisks()
defer ObjectPathUpdated(path.Join(dstBucket, dstObject))

metaArr, errs := readAllXLMetadata(ctx, storageDisks, srcBucket, srcObject)
// Read metadata associated with the object from all disks.
storageDisks := xl.getDisks()

// get Quorum for this object
readQuorum, writeQuorum, err := objectQuorumFromMeta(ctx, xl, metaArr, errs)
if err != nil {
return oi, toObjectErr(err, srcBucket, srcObject)
}
metaArr, errs := readAllXLMetadata(ctx, storageDisks, srcBucket, srcObject)

if reducedErr := reduceReadQuorumErrs(ctx, errs, objectOpIgnoredErrs, readQuorum); reducedErr != nil {
return oi, toObjectErr(reducedErr, srcBucket, srcObject)
}
// get Quorum for this object
readQuorum, writeQuorum, err := objectQuorumFromMeta(ctx, xl, metaArr, errs)
if err != nil {
return oi, toObjectErr(err, srcBucket, srcObject)
}

// List all online disks.
_, modTime := listOnlineDisks(storageDisks, metaArr, errs)
if reducedErr := reduceReadQuorumErrs(ctx, errs, objectOpIgnoredErrs, readQuorum); reducedErr != nil {
return oi, toObjectErr(reducedErr, srcBucket, srcObject)
}

// Pick latest valid metadata.
xlMeta, err := pickValidXLMeta(ctx, metaArr, modTime, readQuorum)
if err != nil {
return oi, toObjectErr(err, srcBucket, srcObject)
}
// List all online disks.
_, modTime := listOnlineDisks(storageDisks, metaArr, errs)

// Update `xl.json` content on each disks.
for index := range metaArr {
metaArr[index].Meta = srcInfo.UserDefined
metaArr[index].Meta["etag"] = srcInfo.ETag
}
// Pick latest valid metadata.
xlMeta, err := pickValidXLMeta(ctx, metaArr, modTime, readQuorum)
if err != nil {
return oi, toObjectErr(err, srcBucket, srcObject)
}

var onlineDisks []StorageAPI
// Update `xl.json` content on each disks.
for index := range metaArr {
metaArr[index].Meta = srcInfo.UserDefined
metaArr[index].Meta["etag"] = srcInfo.ETag
}

tempObj := mustGetUUID()
var onlineDisks []StorageAPI

// Cleanup in case of xl.json writing failure
defer xl.deleteObject(ctx, minioMetaTmpBucket, tempObj, writeQuorum, false)
tempObj := mustGetUUID()

// Write unique `xl.json` for each disk.
if onlineDisks, err = writeUniqueXLMetadata(ctx, storageDisks, minioMetaTmpBucket, tempObj, metaArr, writeQuorum); err != nil {
return oi, toObjectErr(err, srcBucket, srcObject)
}
// Cleanup in case of xl.json writing failure
defer xl.deleteObject(ctx, minioMetaTmpBucket, tempObj, writeQuorum, false)

// Rename atomically `xl.json` from tmp location to destination for each disk.
if _, err = renameXLMetadata(ctx, onlineDisks, minioMetaTmpBucket, tempObj, srcBucket, srcObject, writeQuorum); err != nil {
return oi, toObjectErr(err, srcBucket, srcObject)
}
// Write unique `xl.json` for each disk.
if onlineDisks, err = writeUniqueXLMetadata(ctx, storageDisks, minioMetaTmpBucket, tempObj, metaArr, writeQuorum); err != nil {
return oi, toObjectErr(err, srcBucket, srcObject)
}

return xlMeta.ToObjectInfo(srcBucket, srcObject), nil
// Rename atomically `xl.json` from tmp location to destination for each disk.
if _, err = renameXLMetadata(ctx, onlineDisks, minioMetaTmpBucket, tempObj, srcBucket, srcObject, writeQuorum); err != nil {
return oi, toObjectErr(err, srcBucket, srcObject)
}

putOpts := ObjectOptions{ServerSideEncryption: dstOpts.ServerSideEncryption, UserDefined: srcInfo.UserDefined}
return xl.PutObject(ctx, dstBucket, dstObject, srcInfo.PutObjReader, putOpts)
return xlMeta.ToObjectInfo(srcBucket, srcObject), nil
}

// GetObjectNInfo - returns object info and an object
Expand Down
41 changes: 24 additions & 17 deletions cmd/xl-zones.go
Original file line number Diff line number Diff line change
Expand Up @@ -533,36 +533,42 @@ func (z *xlZones) DeleteObjects(ctx context.Context, bucket string, objects []st
return derrs, nil
}

func (z *xlZones) CopyObject(ctx context.Context, srcBucket, srcObject, destBucket, destObject string, srcInfo ObjectInfo, srcOpts, dstOpts ObjectOptions) (objInfo ObjectInfo, err error) {
func (z *xlZones) CopyObject(ctx context.Context, srcBucket, srcObject, dstBucket, dstObject string, srcInfo ObjectInfo, srcOpts, dstOpts ObjectOptions) (objInfo ObjectInfo, err error) {
// Check if this request is only metadata update.
cpSrcDstSame := isStringEqual(pathJoin(srcBucket, srcObject), pathJoin(destBucket, destObject))
cpSrcDstSame := isStringEqual(pathJoin(srcBucket, srcObject), pathJoin(dstBucket, dstObject))
if !cpSrcDstSame {
lk := z.NewNSLock(ctx, destBucket, destObject)
lk := z.NewNSLock(ctx, dstBucket, dstObject)
if err := lk.GetLock(globalObjectTimeout); err != nil {
return objInfo, err
}
defer lk.Unlock()
}

if z.SingleZone() {
return z.zones[0].CopyObject(ctx, srcBucket, srcObject, destBucket, destObject, srcInfo, srcOpts, dstOpts)
return z.zones[0].CopyObject(ctx, srcBucket, srcObject, dstBucket, dstObject, srcInfo, srcOpts, dstOpts)
}
if cpSrcDstSame && srcInfo.metadataOnly {
for _, zone := range z.zones {
objInfo, err = zone.CopyObject(ctx, srcBucket, srcObject, destBucket,
destObject, srcInfo, srcOpts, dstOpts)
if err != nil {
if isErrObjectNotFound(err) {
continue
}
return objInfo, err

zoneIndex := -1
for i, zone := range z.zones {
objInfo, err := zone.GetObjectInfo(ctx, dstBucket, dstObject, srcOpts)
if err != nil {
if isErrObjectNotFound(err) {
continue
}
return objInfo, nil
return objInfo, err
}
zoneIndex = i
break
}

putOpts := ObjectOptions{ServerSideEncryption: dstOpts.ServerSideEncryption, UserDefined: srcInfo.UserDefined}
if zoneIndex >= 0 {
if cpSrcDstSame && srcInfo.metadataOnly {
return z.zones[zoneIndex].CopyObject(ctx, srcBucket, srcObject, dstBucket, dstObject, srcInfo, srcOpts, dstOpts)
}
return objInfo, ObjectNotFound{Bucket: srcBucket, Object: srcObject}
return z.zones[zoneIndex].PutObject(ctx, dstBucket, dstObject, srcInfo.PutObjReader, putOpts)
}
return z.zones[z.getAvailableZoneIdx(ctx)].CopyObject(ctx, srcBucket, srcObject,
destBucket, destObject, srcInfo, srcOpts, dstOpts)
return z.zones[z.getAvailableZoneIdx(ctx)].PutObject(ctx, dstBucket, dstObject, srcInfo.PutObjReader, putOpts)
}

func (z *xlZones) ListObjectsV2(ctx context.Context, bucket, prefix, continuationToken, delimiter string, maxKeys int, fetchOwner bool, startAfter string) (ListObjectsV2Info, error) {
Expand Down Expand Up @@ -1050,6 +1056,7 @@ func (z *xlZones) PutObjectPart(ctx context.Context, bucket, object, uploadID st
if z.SingleZone() {
return z.zones[0].PutObjectPart(ctx, bucket, object, uploadID, partID, data, opts)
}

for _, zone := range z.zones {
_, err := zone.GetMultipartInfo(ctx, bucket, object, uploadID, opts)
if err == nil {
Expand Down

0 comments on commit 41688a9

Please sign in to comment.