@@ -129,12 +129,15 @@ - (instancetype)init {
129
129
_waitingPartsDictionary = [NSMutableDictionary new ];
130
130
_inProgressPartsDictionary = [NSMutableDictionary new ];
131
131
_completedPartsSet = [NSMutableSet new ];
132
+ _serialQueue = dispatch_queue_create (" com.amazonaws.AWSS3.MultipartUploadTask" , DISPATCH_QUEUE_SERIAL);
132
133
}
133
134
return self;
134
135
}
135
136
136
137
- (BOOL )isUnderConcurrencyLimit {
137
- return self.inProgressPartsDictionary .count < [self .transferUtility.transferUtilityConfiguration.multiPartConcurrencyLimit integerValue ];
138
+ NSUInteger dynamicLimit = NSProcessInfo .processInfo .activeProcessorCount * 2 ;
139
+ NSUInteger configuredLimit = self.transferUtility .transferUtilityConfiguration .multiPartConcurrencyLimit .integerValue ;
140
+ return self.inProgressPartsDictionary .count < MAX (dynamicLimit, configuredLimit);
138
141
}
139
142
140
143
- (BOOL )hasWaitingTasks {
@@ -167,13 +170,11 @@ - (AWSS3TransferUtilityMultiPartUploadExpression *)expression {
167
170
- (void )cancel {
168
171
self.cancelled = YES ;
169
172
self.status = AWSS3TransferUtilityTransferStatusCancelled;
170
- for (NSNumber *key in [self .inProgressPartsDictionary allKeys ]) {
171
- AWSS3TransferUtilityUploadSubTask *subTask = [self .inProgressPartsDictionary objectForKey: key];
173
+ for (AWSS3TransferUtilityUploadSubTask *subTask in self.inProgressTasks ) {
172
174
[subTask.sessionTask cancel ];
173
175
}
174
176
175
- for (NSNumber *key in [self .waitingPartsDictionary allKeys ]) {
176
- AWSS3TransferUtilityUploadSubTask *subTask = [self .waitingPartsDictionary objectForKey: key];
177
+ for (AWSS3TransferUtilityUploadSubTask *subTask in self.waitingTasks ) {
177
178
[subTask.sessionTask cancel ];
178
179
}
179
180
@@ -190,37 +191,13 @@ - (void)resume {
190
191
}
191
192
192
193
NSCAssert (self.transferUtility != nil , @" Transfer Utility must be provided." );
193
-
194
- // for (NSNumber *key in [self.inProgressPartsDictionary allKeys]) {
195
- // AWSS3TransferUtilityUploadSubTask *subTask = [self.inProgressPartsDictionary objectForKey:key];
196
- // subTask.status = AWSS3TransferUtilityTransferStatusInProgress;
197
- // [AWSS3TransferUtilityDatabaseHelper updateTransferRequestInDB:subTask.transferID
198
- // partNumber:subTask.partNumber
199
- // taskIdentifier:subTask.taskIdentifier
200
- // eTag:subTask.eTag
201
- // status:subTask.status
202
- // retry_count:self.retryCount
203
- // databaseQueue:self.databaseQueue];
204
- // [subTask.sessionTask resume];
205
- // }
206
- //
207
- // self.status = AWSS3TransferUtilityTransferStatusInProgress;
208
- // //Update the Master Record
209
- // [AWSS3TransferUtilityDatabaseHelper updateTransferRequestInDB:self.transferID
210
- // partNumber:@0
211
- // taskIdentifier:0
212
- // eTag:@""
213
- // status:self.status
214
- // retry_count:self.retryCount
215
- // databaseQueue:self.databaseQueue];
216
194
217
195
// Change status from paused to waiting
218
196
for (AWSS3TransferUtilityUploadSubTask * nextSubTask in self.waitingTasks ) {
219
197
nextSubTask.status = AWSS3TransferUtilityTransferStatusWaiting;
220
198
}
221
199
222
- [self moveWaitingTasksToInProgress ];
223
- [self completeIfDone ];
200
+ [self moveWaitingTasksToInProgress: YES ];
224
201
}
225
202
226
203
- (void )suspend {
@@ -231,30 +208,6 @@ - (void)suspend {
231
208
232
209
NSCAssert (self.transferUtility != nil , @" Transfer Utility must be provided." );
233
210
234
- // for (NSNumber *key in [self.inProgressPartsDictionary allKeys]) {
235
- // // all in progress tasks should be cancelled and a new subtask should replace it which is
236
- // // put in the waiting dictionary and set with that status with a URLSessionTask which
237
- // // has not been started.
238
- //
239
- // // then resuming should start uploading a number of parts up to the concurrency limit.
240
- //
241
- // AWSS3TransferUtilityUploadSubTask *subTask = [self.inProgressPartsDictionary objectForKey:key];
242
- // if (!subTask) {
243
- // continue;
244
- // }
245
- // [subTask.sessionTask suspend];
246
- // subTask.status = AWSS3TransferUtilityTransferStatusPaused;
247
- //
248
- // [AWSS3TransferUtilityDatabaseHelper updateTransferRequestInDB:subTask.transferID
249
- // partNumber:subTask.partNumber
250
- // taskIdentifier:subTask.taskIdentifier
251
- // eTag:subTask.eTag
252
- // status:subTask.status
253
- // retry_count:self.retryCount
254
- // databaseQueue:self.databaseQueue];
255
- // }
256
- //
257
-
258
211
// Cancel session task for all subtasks which are in progress and set status to paused
259
212
for (AWSS3TransferUtilityUploadSubTask *inProgressSubTask in self.inProgressTasks ) {
260
213
// Note: This can happen due to lack of thread-safety
@@ -280,8 +233,7 @@ - (void)suspend {
280
233
281
234
NSError *error = [self .transferUtility createUploadSubTask: self
282
235
subTask: subTask
283
- startTransfer: NO
284
- internalDictionaryToAddSubTaskTo: self .waitingPartsDictionary];
236
+ startTransfer: NO ];
285
237
286
238
if (error) {
287
239
AWSDDLogError (@" Error creating AWSS3TransferUtilityUploadSubTask [%@ ]" , error);
@@ -294,7 +246,7 @@ - (void)suspend {
294
246
[AWSS3TransferUtilityDatabaseHelper insertMultiPartUploadRequestSubTaskInDB: self subTask: subTask databaseQueue: self .databaseQueue];
295
247
}
296
248
}
297
-
249
+
298
250
self.status = AWSS3TransferUtilityTransferStatusPaused;
299
251
// Update the Master Record
300
252
[AWSS3TransferUtilityDatabaseHelper updateTransferRequestInDB: self .transferID
@@ -313,13 +265,14 @@ - (void)addUploadSubTask:(AWSS3TransferUtilityUploadSubTask *)subTask {
313
265
self.waitingPartsDictionary [@(subTask.taskIdentifier)] = subTask;
314
266
} else if (subTask.status == AWSS3TransferUtilityTransferStatusInProgress) {
315
267
self.inProgressPartsDictionary [@(subTask.taskIdentifier)] = subTask;
316
-
317
268
} else if (subTask.status == AWSS3TransferUtilityTransferStatusCompleted) {
318
269
[self .completedPartsSet addObject: subTask];
319
270
} else {
320
271
AWSDDLogDebug (@" Sub Task status not supported: %lu " , subTask.status );
321
272
NSCAssert (NO , @" Status not supported" );
322
273
}
274
+
275
+ [self completeIfDone ];
323
276
}
324
277
325
278
- (void )removeWaitingUploadSubTask : (NSUInteger )taskIdentifier {
@@ -344,13 +297,15 @@ - (void)moveWaitingTaskToInProgress:(AWSS3TransferUtilityUploadSubTask *)subTask
344
297
345
298
- (void )moveWaitingTaskToInProgress : (AWSS3TransferUtilityUploadSubTask *)subTask startTransfer : (BOOL )startTransfer {
346
299
if ([self .waitingTasks containsObject: subTask]) {
347
- // Add to inProgress list
300
+ // Add to inProgress list
348
301
self.inProgressPartsDictionary [@(subTask.taskIdentifier)] = subTask;
349
- // Remove it from the waitingList
302
+ // Remove it from the waitingList
350
303
self.waitingPartsDictionary [@(subTask.taskIdentifier)] = nil ;
351
304
AWSDDLogDebug (@" Moving Task[%@ ] to progress for Multipart[%@ ]" , @(subTask.taskIdentifier ), self.uploadID );
352
305
353
306
if (startTransfer) {
307
+ AWSDDLogDebug (@" Starting subTask %@ " , @(subTask.taskIdentifier ));
308
+ NSCAssert (subTask.sessionTask.state == NSURLSessionTaskStateSuspended , @" State should be suspended before resuming." );
354
309
[subTask.sessionTask resume ];
355
310
}
356
311
}
@@ -381,20 +336,24 @@ - (void)moveWaitingTasksToInProgress:(BOOL)startTransfer {
381
336
// move parts from waiting to in progress if under the concurrency limit
382
337
while (self.isUnderConcurrencyLimit && self.hasWaitingTasks ) {
383
338
// Get a part from the waitingList
384
- AWSS3TransferUtilityUploadSubTask *nextSubTask = [[ self .waitingPartsDictionary allValues ] objectAtIndex: 0 ];
339
+ AWSS3TransferUtilityUploadSubTask *nextSubTask = [self .waitingTasks objectAtIndex: 0 ];
385
340
386
341
// Add to inProgress list
387
342
self.inProgressPartsDictionary [@(nextSubTask.taskIdentifier)] = nextSubTask;
343
+ nextSubTask.status = AWSS3TransferUtilityTransferStatusInProgress;
388
344
389
345
// Remove it from the waitingList
390
- [ self .waitingPartsDictionary removeObjectForKey: @(nextSubTask.taskIdentifier)];
346
+ self.waitingPartsDictionary [ @(nextSubTask.taskIdentifier)] = nil ;
391
347
AWSDDLogDebug (@" Moving Task[%@ ] to progress for Multipart[%@ ]" , @(nextSubTask.taskIdentifier ), self.uploadID );
348
+
392
349
if (startTransfer) {
393
350
AWSDDLogDebug (@" Starting subTask %@ " , @(nextSubTask.taskIdentifier ));
351
+ NSCAssert (nextSubTask.sessionTask.state == NSURLSessionTaskStateSuspended , @" State should be suspended before resuming." );
394
352
[nextSubTask.sessionTask resume ];
395
353
}
396
- nextSubTask.status = AWSS3TransferUtilityTransferStatusInProgress;
397
354
}
355
+
356
+ [self completeIfDone ];
398
357
}
399
358
400
359
- (void )completeUploadSubTask : (AWSS3TransferUtilityUploadSubTask *)subTask
@@ -421,45 +380,49 @@ - (void)completeUploadSubTask:(AWSS3TransferUtilityUploadSubTask *)subTask
421
380
}
422
381
423
382
- (void )completeIfDone {
424
- // Complete multipart upload if in progress and waiting tasks are done
425
- if (!self.isDone ) {
426
- return ;
427
- }
383
+ dispatch_async (self.serialQueue , ^{
384
+ // Complete multipart upload if in progress and waiting tasks are done
385
+ if (!self.isDone && self.status != AWSS3TransferUtilityTransferStatusCompleted) {
386
+ return ;
387
+ }
428
388
429
- // If there are no more inProgress parts, then we are done.
389
+ // If there are no more inProgress parts, then we are done.
430
390
431
- // Validate that all the content has been uploaded.
432
- int64_t totalBytesSent = 0 ;
433
- for (AWSS3TransferUtilityUploadSubTask *aSubTask in self.completedPartsSet ) {
434
- totalBytesSent += aSubTask.totalBytesExpectedToSend ;
435
- }
391
+ // Validate that all the content has been uploaded.
392
+ int64_t totalBytesSent = 0 ;
393
+ for (AWSS3TransferUtilityUploadSubTask *aSubTask in self.completedTasks ) {
394
+ totalBytesSent += aSubTask.totalBytesExpectedToSend ;
395
+ }
436
396
437
- if (totalBytesSent != self.contentLength .longLongValue ) {
438
- NSString *errorMessage = [NSString stringWithFormat: @" Expected to send [%@ ], but sent [%@ ] and there are no remaining parts. Failing transfer " ,
439
- self .contentLength, @(totalBytesSent)];
440
- AWSDDLogDebug (@" %@ " , errorMessage);
441
- NSDictionary *userInfo = [NSDictionary dictionaryWithObject: errorMessage
442
- forKey: @" Message" ];
397
+ if (totalBytesSent != self.contentLength .longLongValue ) {
398
+ NSString *errorMessage = [NSString stringWithFormat: @" Expected to send [%@ ], but sent [%@ ] and there are no remaining parts. Failing transfer " ,
399
+ self .contentLength, @(totalBytesSent)];
400
+ AWSDDLogDebug (@" %@ " , errorMessage);
401
+ NSDictionary *userInfo = [NSDictionary dictionaryWithObject: errorMessage
402
+ forKey: @" Message" ];
443
403
444
- self.error = [NSError errorWithDomain: AWSS3TransferUtilityErrorDomain
445
- code: AWSS3TransferUtilityErrorClientError
446
- userInfo: userInfo];
404
+ self.error = [NSError errorWithDomain: AWSS3TransferUtilityErrorDomain
405
+ code: AWSS3TransferUtilityErrorClientError
406
+ userInfo: userInfo];
447
407
448
- // Execute call back if provided.
449
- [self .transferUtility completeTask: self ];
408
+ // Execute call back if provided.
409
+ [self .transferUtility completeTask: self ];
450
410
451
- // Abort the request, so the server can clean up any partials.
452
- [self .transferUtility callAbortMultiPartForUploadTask: self ];
411
+ // Abort the request, so the server can clean up any partials.
412
+ [self .transferUtility callAbortMultiPartForUploadTask: self ];
453
413
454
- // clean up.
455
- [self .transferUtility cleanupForMultiPartUploadTask: self ];
456
- return ;
457
- }
414
+ // clean up.
415
+ [self .transferUtility cleanupForMultiPartUploadTask: self ];
416
+ return ;
417
+ }
418
+
419
+ AWSDDLogDebug (@" There are %lu waiting upload parts." , (unsigned long )self.waitingTasks .count );
420
+ AWSDDLogDebug (@" There are %lu in progress upload parts." , (unsigned long )self.inProgressTasks .count );
421
+ AWSDDLogDebug (@" There are %lu completed upload parts." , (unsigned long )self.completedTasks .count );
422
+ [self .transferUtility completeMultiPartForUploadTask: self ];
423
+ self.status = AWSS3TransferUtilityTransferStatusCompleted;
424
+ });
458
425
459
- AWSDDLogDebug (@" There are %lu waiting upload parts." , (unsigned long )self.waitingPartsDictionary .count );
460
- AWSDDLogDebug (@" There are %lu in progress upload parts." , (unsigned long )self.inProgressPartsDictionary .count );
461
- AWSDDLogDebug (@" There are %lu completed upload parts." , (unsigned long )self.completedPartsSet .count );
462
- [self .transferUtility completeMultiPartForUploadTask: self ];
463
426
}
464
427
465
428
- (void )setCompletionHandler : (AWSS3TransferUtilityMultiPartUploadCompletionHandlerBlock)completionHandler {
@@ -498,7 +461,6 @@ - (void)cancel {
498
461
}
499
462
500
463
- (void )setCompletionHandler : (AWSS3TransferUtilityDownloadCompletionHandlerBlock)completionHandler {
501
-
502
464
self.expression .completionHandler = completionHandler;
503
465
// If the task has already completed successfully
504
466
// Or the task has completed with error, complete the task
0 commit comments