Skip to content

Commit 893b665

Browse files
authored
add concurrency to multipart_save method
1 parent c84f6aa commit 893b665

File tree

1 file changed

+67
-8
lines changed

1 file changed

+67
-8
lines changed

lib/fog/aws/models/storage/file.rb

Lines changed: 67 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -328,18 +328,34 @@ def multipart_save(options)
328328
# Store ETags of upload parts
329329
part_tags = []
330330

331+
# Calculate total size and ensure we don't exceed part limit
332+
total_size = Fog::Storage.get_body_size(body)
333+
parts_count = (total_size.to_f / multipart_chunk_size).ceil
334+
335+
# AWS S3 has a hard limit of 10,000 parts, make sure we are below this limit for large objects
336+
if parts_count > 10000
337+
self.multipart_chunk_size = (total_size.to_f / 10000).ceil
338+
parts_count = 10000
339+
end
340+
331341
# Upload each part
332-
# TODO: optionally upload chunks in parallel using threads
333-
# (may cause network performance problems with many small chunks)
334-
# TODO: Support large chunk sizes without reading the chunk into memory
335342
if body.respond_to?(:rewind)
336-
body.rewind rescue nil
337-
end
338-
while (chunk = body.read(multipart_chunk_size)) do
339-
part_upload = service.upload_part(directory.key, key, upload_id, part_tags.size + 1, chunk, part_headers(chunk))
340-
part_tags << part_upload.headers["ETag"]
343+
body.rewind rescue nil
341344
end
342345

346+
pending = PartList.new(
347+
(1..parts_count).map do |part_number|
348+
UploadPartData.new(part_number, {}, nil)
349+
end
350+
)
351+
thread_count = self.concurrency
352+
completed = PartList.new
353+
errors = upload_parts_in_threads(directory.key, key, upload_id, pending, completed, thread_count)
354+
355+
raise errors.first if errors.any?
356+
357+
part_tags = completed.to_a.sort_by { |part| part.part_number }.map(&:etag)
358+
343359
if part_tags.empty? #it is an error to have a multipart upload with no parts
344360
part_upload = service.upload_part(directory.key, key, upload_id, 1, '', part_headers(''))
345361
part_tags << part_upload.headers["ETag"]
@@ -460,6 +476,49 @@ def upload_in_threads(target_directory_key, target_file_key, upload_id, pending,
460476

461477
threads.map(&:value).compact
462478
end
479+
480+
def upload_parts_in_threads(directory_key, target_file_key, upload_id, pending, completed, thread_count)
481+
mutex = Mutex.new
482+
threads = []
483+
484+
thread_count.times do
485+
thread = Thread.new do
486+
begin
487+
while part = pending.shift
488+
# Determine byte range for this part
489+
start_pos = (part.part_number - 1) * multipart_chunk_size
490+
491+
# Safely read the chunk from body
492+
chunk = nil
493+
mutex.synchronize do
494+
if body.respond_to?(:seek)
495+
body.seek(start_pos) rescue nil
496+
end
497+
chunk = body.read(multipart_chunk_size)
498+
end
499+
500+
# Upload the chunk
501+
if chunk
502+
part_upload = service.upload_part(directory_key, target_file_key, upload_id, part.part_number, chunk, part_headers(chunk))
503+
part.etag = part_upload.headers["ETag"]
504+
completed.push(part)
505+
# Release memory
506+
chunk = nil
507+
end
508+
end
509+
nil
510+
rescue => error
511+
pending.clear!
512+
error
513+
end
514+
end
515+
516+
thread.abort_on_exception = true
517+
threads << thread
518+
end
519+
520+
threads.map(&:value).compact
521+
end
463522
end
464523
end
465524
end

0 commit comments

Comments
 (0)