managed_upload.js (19347B)
1 var AWS = require('../core'); 2 var byteLength = AWS.util.string.byteLength; 3 var Buffer = AWS.util.Buffer; 4 5 /** 6 * The managed uploader allows for easy and efficient uploading of buffers, 7 * blobs, or streams, using a configurable amount of concurrency to perform 8 * multipart uploads where possible. This abstraction also enables uploading 9 * streams of unknown size due to the use of multipart uploads. 10 * 11 * To construct a managed upload object, see the {constructor} function. 12 * 13 * ## Tracking upload progress 14 * 15 * The managed upload object can also track progress by attaching an 16 * 'httpUploadProgress' listener to the upload manager. This event is similar 17 * to {AWS.Request~httpUploadProgress} but groups all concurrent upload progress 18 * into a single event. See {AWS.S3.ManagedUpload~httpUploadProgress} for more 19 * information. 20 * 21 * ## Handling Multipart Cleanup 22 * 23 * By default, this class will automatically clean up any multipart uploads 24 * when an individual part upload fails. This behavior can be disabled in order 25 * to manually handle failures by setting the `leavePartsOnError` configuration 26 * option to `true` when initializing the upload object. 27 * 28 * @!event httpUploadProgress(progress) 29 * Triggered when the uploader has uploaded more data. 30 * @note The `total` property may not be set if the stream being uploaded has 31 * not yet finished chunking. In this case the `total` will be undefined 32 * until the total stream size is known. 33 * @note This event will not be emitted in Node.js 0.8.x. 34 * @param progress [map] An object containing the `loaded` and `total` bytes 35 * of the request and the `key` of the S3 object. Note that `total` may be undefined until the payload 36 * size is known. 37 * @context (see AWS.Request~send) 38 */ 39 AWS.S3.ManagedUpload = AWS.util.inherit({ 40 /** 41 * Creates a managed upload object with a set of configuration options. 42 * 43 * @note A "Body" parameter is required to be set prior to calling {send}. 44 * @option options params [map] a map of parameters to pass to the upload 45 * requests. The "Body" parameter is required to be specified either on 46 * the service or in the params option. 47 * @note ContentMD5 should not be provided when using the managed upload object. 48 * Instead, setting "computeChecksums" to true will enable automatic ContentMD5 generation 49 * by the managed upload object. 50 * @option options queueSize [Number] (4) the size of the concurrent queue 51 * manager to upload parts in parallel. Set to 1 for synchronous uploading 52 * of parts. Note that the uploader will buffer at most queueSize * partSize 53 * bytes into memory at any given time. 54 * @option options partSize [Number] (5mb) the size in bytes for each 55 * individual part to be uploaded. Adjust the part size to ensure the number 56 * of parts does not exceed {maxTotalParts}. See {minPartSize} for the 57 * minimum allowed part size. 58 * @option options leavePartsOnError [Boolean] (false) whether to abort the 59 * multipart upload if an error occurs. Set to true if you want to handle 60 * failures manually. 61 * @option options service [AWS.S3] an optional S3 service object to use for 62 * requests. This object might have bound parameters used by the uploader. 63 * @example Creating a default uploader for a stream object 64 * var upload = new AWS.S3.ManagedUpload({ 65 * params: {Bucket: 'bucket', Key: 'key', Body: stream} 66 * }); 67 * @example Creating an uploader with concurrency of 1 and partSize of 10mb 68 * var upload = new AWS.S3.ManagedUpload({ 69 * partSize: 10 * 1024 * 1024, queueSize: 1, 70 * params: {Bucket: 'bucket', Key: 'key', Body: stream} 71 * }); 72 * @see send 73 */ 74 constructor: function ManagedUpload(options) { 75 var self = this; 76 AWS.SequentialExecutor.call(self); 77 self.body = null; 78 self.sliceFn = null; 79 self.callback = null; 80 self.parts = {}; 81 self.completeInfo = []; 82 self.fillQueue = function() { 83 self.callback(new Error('Unsupported body payload ' + typeof self.body)); 84 }; 85 86 self.configure(options); 87 }, 88 89 /** 90 * @api private 91 */ 92 configure: function configure(options) { 93 options = options || {}; 94 this.partSize = this.minPartSize; 95 96 if (options.queueSize) this.queueSize = options.queueSize; 97 if (options.partSize) this.partSize = options.partSize; 98 if (options.leavePartsOnError) this.leavePartsOnError = true; 99 100 if (this.partSize < this.minPartSize) { 101 throw new Error('partSize must be greater than ' + 102 this.minPartSize); 103 } 104 105 this.service = options.service; 106 this.bindServiceObject(options.params); 107 this.validateBody(); 108 this.adjustTotalBytes(); 109 }, 110 111 /** 112 * @api private 113 */ 114 leavePartsOnError: false, 115 116 /** 117 * @api private 118 */ 119 queueSize: 4, 120 121 /** 122 * @api private 123 */ 124 partSize: null, 125 126 /** 127 * @readonly 128 * @return [Number] the minimum number of bytes for an individual part 129 * upload. 130 */ 131 minPartSize: 1024 * 1024 * 5, 132 133 /** 134 * @readonly 135 * @return [Number] the maximum allowed number of parts in a multipart upload. 136 */ 137 maxTotalParts: 10000, 138 139 /** 140 * Initiates the managed upload for the payload. 141 * 142 * @callback callback function(err, data) 143 * @param err [Error] an error or null if no error occurred. 144 * @param data [map] The response data from the successful upload: 145 * * `Location` (String) the URL of the uploaded object 146 * * `ETag` (String) the ETag of the uploaded object 147 * * `Bucket` (String) the bucket to which the object was uploaded 148 * * `Key` (String) the key to which the object was uploaded 149 * @example Sending a managed upload object 150 * var params = {Bucket: 'bucket', Key: 'key', Body: stream}; 151 * var upload = new AWS.S3.ManagedUpload({params: params}); 152 * upload.send(function(err, data) { 153 * console.log(err, data); 154 * }); 155 */ 156 send: function(callback) { 157 var self = this; 158 self.failed = false; 159 self.callback = callback || function(err) { if (err) throw err; }; 160 161 var runFill = true; 162 if (self.sliceFn) { 163 self.fillQueue = self.fillBuffer; 164 } else if (AWS.util.isNode()) { 165 var Stream = AWS.util.stream.Stream; 166 if (self.body instanceof Stream) { 167 runFill = false; 168 self.fillQueue = self.fillStream; 169 self.partBuffers = []; 170 self.body. 171 on('error', function(err) { self.cleanup(err); }). 172 on('readable', function() { self.fillQueue(); }). 173 on('end', function() { 174 self.isDoneChunking = true; 175 self.numParts = self.totalPartNumbers; 176 self.fillQueue.call(self); 177 }); 178 } 179 } 180 181 if (runFill) self.fillQueue.call(self); 182 }, 183 184 /** 185 * @!method promise() 186 * Returns a 'thenable' promise. 187 * 188 * Two callbacks can be provided to the `then` method on the returned promise. 189 * The first callback will be called if the promise is fulfilled, and the second 190 * callback will be called if the promise is rejected. 191 * @callback fulfilledCallback function(data) 192 * Called if the promise is fulfilled. 193 * @param data [map] The response data from the successful upload: 194 * `Location` (String) the URL of the uploaded object 195 * `ETag` (String) the ETag of the uploaded object 196 * `Bucket` (String) the bucket to which the object was uploaded 197 * `Key` (String) the key to which the object was uploaded 198 * @callback rejectedCallback function(err) 199 * Called if the promise is rejected. 200 * @param err [Error] an error or null if no error occurred. 201 * @return [Promise] A promise that represents the state of the upload request. 202 * @example Sending an upload request using promises. 203 * var upload = s3.upload({Bucket: 'bucket', Key: 'key', Body: stream}); 204 * var promise = upload.promise(); 205 * promise.then(function(data) { ... }, function(err) { ... }); 206 */ 207 208 /** 209 * Aborts a managed upload, including all concurrent upload requests. 210 * @note By default, calling this function will cleanup a multipart upload 211 * if one was created. To leave the multipart upload around after aborting 212 * a request, configure `leavePartsOnError` to `true` in the {constructor}. 213 * @note Calling {abort} in the browser environment will not abort any requests 214 * that are already in flight. If a multipart upload was created, any parts 215 * not yet uploaded will not be sent, and the multipart upload will be cleaned up. 216 * @example Aborting an upload 217 * var params = { 218 * Bucket: 'bucket', Key: 'key', 219 * Body: new Buffer(1024 * 1024 * 25) // 25MB payload 220 * }; 221 * var upload = s3.upload(params); 222 * upload.send(function (err, data) { 223 * if (err) console.log("Error:", err.code, err.message); 224 * else console.log(data); 225 * }); 226 * 227 * // abort request in 1 second 228 * setTimeout(upload.abort.bind(upload), 1000); 229 */ 230 abort: function() { 231 this.cleanup(AWS.util.error(new Error('Request aborted by user'), { 232 code: 'RequestAbortedError', retryable: false 233 })); 234 }, 235 236 /** 237 * @api private 238 */ 239 validateBody: function validateBody() { 240 var self = this; 241 self.body = self.service.config.params.Body; 242 if (!self.body) throw new Error('params.Body is required'); 243 if (typeof self.body === 'string') { 244 self.body = new AWS.util.Buffer(self.body); 245 } 246 self.sliceFn = AWS.util.arraySliceFn(self.body); 247 }, 248 249 /** 250 * @api private 251 */ 252 bindServiceObject: function bindServiceObject(params) { 253 params = params || {}; 254 var self = this; 255 256 // bind parameters to new service object 257 if (!self.service) { 258 self.service = new AWS.S3({params: params}); 259 } else { 260 var config = AWS.util.copy(self.service.config); 261 self.service = new self.service.constructor.__super__(config); 262 self.service.config.params = 263 AWS.util.merge(self.service.config.params || {}, params); 264 } 265 }, 266 267 /** 268 * @api private 269 */ 270 adjustTotalBytes: function adjustTotalBytes() { 271 var self = this; 272 try { // try to get totalBytes 273 self.totalBytes = byteLength(self.body); 274 } catch (e) { } 275 276 // try to adjust partSize if we know payload length 277 if (self.totalBytes) { 278 var newPartSize = Math.ceil(self.totalBytes / self.maxTotalParts); 279 if (newPartSize > self.partSize) self.partSize = newPartSize; 280 } else { 281 self.totalBytes = undefined; 282 } 283 }, 284 285 /** 286 * @api private 287 */ 288 isDoneChunking: false, 289 290 /** 291 * @api private 292 */ 293 partPos: 0, 294 295 /** 296 * @api private 297 */ 298 totalChunkedBytes: 0, 299 300 /** 301 * @api private 302 */ 303 totalUploadedBytes: 0, 304 305 /** 306 * @api private 307 */ 308 totalBytes: undefined, 309 310 /** 311 * @api private 312 */ 313 numParts: 0, 314 315 /** 316 * @api private 317 */ 318 totalPartNumbers: 0, 319 320 /** 321 * @api private 322 */ 323 activeParts: 0, 324 325 /** 326 * @api private 327 */ 328 doneParts: 0, 329 330 /** 331 * @api private 332 */ 333 parts: null, 334 335 /** 336 * @api private 337 */ 338 completeInfo: null, 339 340 /** 341 * @api private 342 */ 343 failed: false, 344 345 /** 346 * @api private 347 */ 348 multipartReq: null, 349 350 /** 351 * @api private 352 */ 353 partBuffers: null, 354 355 /** 356 * @api private 357 */ 358 partBufferLength: 0, 359 360 /** 361 * @api private 362 */ 363 fillBuffer: function fillBuffer() { 364 var self = this; 365 var bodyLen = byteLength(self.body); 366 367 if (bodyLen === 0) { 368 self.isDoneChunking = true; 369 self.numParts = 1; 370 self.nextChunk(self.body); 371 return; 372 } 373 374 while (self.activeParts < self.queueSize && self.partPos < bodyLen) { 375 var endPos = Math.min(self.partPos + self.partSize, bodyLen); 376 var buf = self.sliceFn.call(self.body, self.partPos, endPos); 377 self.partPos += self.partSize; 378 379 if (byteLength(buf) < self.partSize || self.partPos === bodyLen) { 380 self.isDoneChunking = true; 381 self.numParts = self.totalPartNumbers + 1; 382 } 383 self.nextChunk(buf); 384 } 385 }, 386 387 /** 388 * @api private 389 */ 390 fillStream: function fillStream() { 391 var self = this; 392 if (self.activeParts >= self.queueSize) return; 393 394 var buf = self.body.read(self.partSize - self.partBufferLength) || 395 self.body.read(); 396 if (buf) { 397 self.partBuffers.push(buf); 398 self.partBufferLength += buf.length; 399 self.totalChunkedBytes += buf.length; 400 } 401 402 if (self.partBufferLength >= self.partSize) { 403 // if we have single buffer we avoid copyfull concat 404 var pbuf = self.partBuffers.length === 1 ? 405 self.partBuffers[0] : Buffer.concat(self.partBuffers); 406 self.partBuffers = []; 407 self.partBufferLength = 0; 408 409 // if we have more than partSize, push the rest back on the queue 410 if (pbuf.length > self.partSize) { 411 var rest = pbuf.slice(self.partSize); 412 self.partBuffers.push(rest); 413 self.partBufferLength += rest.length; 414 pbuf = pbuf.slice(0, self.partSize); 415 } 416 417 self.nextChunk(pbuf); 418 } 419 420 if (self.isDoneChunking && !self.isDoneSending) { 421 // if we have single buffer we avoid copyfull concat 422 pbuf = self.partBuffers.length === 1 ? 423 self.partBuffers[0] : Buffer.concat(self.partBuffers); 424 self.partBuffers = []; 425 self.partBufferLength = 0; 426 self.totalBytes = self.totalChunkedBytes; 427 self.isDoneSending = true; 428 429 if (self.numParts === 0 || pbuf.length > 0) { 430 self.numParts++; 431 self.nextChunk(pbuf); 432 } 433 } 434 435 self.body.read(0); 436 }, 437 438 /** 439 * @api private 440 */ 441 nextChunk: function nextChunk(chunk) { 442 var self = this; 443 if (self.failed) return null; 444 445 var partNumber = ++self.totalPartNumbers; 446 if (self.isDoneChunking && partNumber === 1) { 447 var req = self.service.putObject({Body: chunk}); 448 req._managedUpload = self; 449 req.on('httpUploadProgress', self.progress).send(self.finishSinglePart); 450 return null; 451 } else if (self.service.config.params.ContentMD5) { 452 var err = AWS.util.error(new Error('The Content-MD5 you specified is invalid for multi-part uploads.'), { 453 code: 'InvalidDigest', retryable: false 454 }); 455 456 self.cleanup(err); 457 return null; 458 } 459 460 if (self.completeInfo[partNumber] && self.completeInfo[partNumber].ETag !== null) { 461 return null; // Already uploaded this part. 462 } 463 464 self.activeParts++; 465 if (!self.service.config.params.UploadId) { 466 467 if (!self.multipartReq) { // create multipart 468 self.multipartReq = self.service.createMultipartUpload(); 469 self.multipartReq.on('success', function(resp) { 470 self.service.config.params.UploadId = resp.data.UploadId; 471 self.multipartReq = null; 472 }); 473 self.queueChunks(chunk, partNumber); 474 self.multipartReq.on('error', function(err) { 475 self.cleanup(err); 476 }); 477 self.multipartReq.send(); 478 } else { 479 self.queueChunks(chunk, partNumber); 480 } 481 } else { // multipart is created, just send 482 self.uploadPart(chunk, partNumber); 483 } 484 }, 485 486 /** 487 * @api private 488 */ 489 uploadPart: function uploadPart(chunk, partNumber) { 490 var self = this; 491 492 var partParams = { 493 Body: chunk, 494 ContentLength: AWS.util.string.byteLength(chunk), 495 PartNumber: partNumber 496 }; 497 498 var partInfo = {ETag: null, PartNumber: partNumber}; 499 self.completeInfo[partNumber] = partInfo; 500 501 var req = self.service.uploadPart(partParams); 502 self.parts[partNumber] = req; 503 req._lastUploadedBytes = 0; 504 req._managedUpload = self; 505 req.on('httpUploadProgress', self.progress); 506 req.send(function(err, data) { 507 delete self.parts[partParams.PartNumber]; 508 self.activeParts--; 509 510 if (!err && (!data || !data.ETag)) { 511 var message = 'No access to ETag property on response.'; 512 if (AWS.util.isBrowser()) { 513 message += ' Check CORS configuration to expose ETag header.'; 514 } 515 516 err = AWS.util.error(new Error(message), { 517 code: 'ETagMissing', retryable: false 518 }); 519 } 520 if (err) return self.cleanup(err); 521 522 partInfo.ETag = data.ETag; 523 self.doneParts++; 524 if (self.isDoneChunking && self.doneParts === self.numParts) { 525 self.finishMultiPart(); 526 } else { 527 self.fillQueue.call(self); 528 } 529 }); 530 }, 531 532 /** 533 * @api private 534 */ 535 queueChunks: function queueChunks(chunk, partNumber) { 536 var self = this; 537 self.multipartReq.on('success', function() { 538 self.uploadPart(chunk, partNumber); 539 }); 540 }, 541 542 /** 543 * @api private 544 */ 545 cleanup: function cleanup(err) { 546 var self = this; 547 if (self.failed) return; 548 549 // clean up stream 550 if (typeof self.body.removeAllListeners === 'function' && 551 typeof self.body.resume === 'function') { 552 self.body.removeAllListeners('readable'); 553 self.body.removeAllListeners('end'); 554 self.body.resume(); 555 } 556 557 if (self.service.config.params.UploadId && !self.leavePartsOnError) { 558 self.service.abortMultipartUpload().send(); 559 } 560 561 AWS.util.each(self.parts, function(partNumber, part) { 562 part.removeAllListeners('complete'); 563 part.abort(); 564 }); 565 566 self.activeParts = 0; 567 self.partPos = 0; 568 self.numParts = 0; 569 self.totalPartNumbers = 0; 570 self.parts = {}; 571 self.failed = true; 572 self.callback(err); 573 }, 574 575 /** 576 * @api private 577 */ 578 finishMultiPart: function finishMultiPart() { 579 var self = this; 580 var completeParams = { MultipartUpload: { Parts: self.completeInfo.slice(1) } }; 581 self.service.completeMultipartUpload(completeParams, function(err, data) { 582 if (err) return self.cleanup(err); 583 else self.callback(err, data); 584 }); 585 }, 586 587 /** 588 * @api private 589 */ 590 finishSinglePart: function finishSinglePart(err, data) { 591 var upload = this.request._managedUpload; 592 var httpReq = this.request.httpRequest; 593 var endpoint = httpReq.endpoint; 594 if (err) return upload.callback(err); 595 data.Location = 596 [endpoint.protocol, '//', endpoint.host, httpReq.path].join(''); 597 data.key = this.request.params.Key; // will stay undocumented 598 data.Key = this.request.params.Key; 599 data.Bucket = this.request.params.Bucket; 600 upload.callback(err, data); 601 }, 602 603 /** 604 * @api private 605 */ 606 progress: function progress(info) { 607 var upload = this._managedUpload; 608 if (this.operation === 'putObject') { 609 info.part = 1; 610 info.key = this.params.Key; 611 } else { 612 upload.totalUploadedBytes += info.loaded - this._lastUploadedBytes; 613 this._lastUploadedBytes = info.loaded; 614 info = { 615 loaded: upload.totalUploadedBytes, 616 total: upload.totalBytes, 617 part: this.params.PartNumber, 618 key: this.params.Key 619 }; 620 } 621 upload.emit('httpUploadProgress', [info]); 622 } 623 }); 624 625 AWS.util.mixin(AWS.S3.ManagedUpload, AWS.SequentialExecutor); 626 627 /** 628 * @api private 629 */ 630 AWS.S3.ManagedUpload.addPromisesToClass = function addPromisesToClass(PromiseDependency) { 631 this.prototype.promise = AWS.util.promisifyMethod('send', PromiseDependency); 632 }; 633 634 /** 635 * @api private 636 */ 637 AWS.S3.ManagedUpload.deletePromisesFromClass = function deletePromisesFromClass() { 638 delete this.prototype.promise; 639 }; 640 641 AWS.util.addPromises(AWS.S3.ManagedUpload); 642 643 module.exports = AWS.S3.ManagedUpload;