git-off

git off handles large files in git repos
git clone https://noulin.net/git/git-off.git
Log | Files | Refs | README

managed_upload.js (19347B)


      1 var AWS = require('../core');
      2 var byteLength = AWS.util.string.byteLength;
      3 var Buffer = AWS.util.Buffer;
      4 
      5 /**
      6  * The managed uploader allows for easy and efficient uploading of buffers,
      7  * blobs, or streams, using a configurable amount of concurrency to perform
      8  * multipart uploads where possible. This abstraction also enables uploading
      9  * streams of unknown size due to the use of multipart uploads.
     10  *
     11  * To construct a managed upload object, see the {constructor} function.
     12  *
     13  * ## Tracking upload progress
     14  *
     15  * The managed upload object can also track progress by attaching an
     16  * 'httpUploadProgress' listener to the upload manager. This event is similar
     17  * to {AWS.Request~httpUploadProgress} but groups all concurrent upload progress
     18  * into a single event. See {AWS.S3.ManagedUpload~httpUploadProgress} for more
     19  * information.
     20  *
     21  * ## Handling Multipart Cleanup
     22  *
     23  * By default, this class will automatically clean up any multipart uploads
     24  * when an individual part upload fails. This behavior can be disabled in order
     25  * to manually handle failures by setting the `leavePartsOnError` configuration
     26  * option to `true` when initializing the upload object.
     27  *
     28  * @!event httpUploadProgress(progress)
     29  *   Triggered when the uploader has uploaded more data.
     30  *   @note The `total` property may not be set if the stream being uploaded has
     31  *     not yet finished chunking. In this case the `total` will be undefined
     32  *     until the total stream size is known.
     33  *   @note This event will not be emitted in Node.js 0.8.x.
     34  *   @param progress [map] An object containing the `loaded` and `total` bytes
     35  *     of the request and the `key` of the S3 object. Note that `total` may be undefined until the payload
     36  *     size is known.
     37  *   @context (see AWS.Request~send)
     38  */
     39 AWS.S3.ManagedUpload = AWS.util.inherit({
     40   /**
     41    * Creates a managed upload object with a set of configuration options.
     42    *
     43    * @note A "Body" parameter is required to be set prior to calling {send}.
     44    * @option options params [map] a map of parameters to pass to the upload
     45    *   requests. The "Body" parameter is required to be specified either on
     46    *   the service or in the params option.
     47    * @note ContentMD5 should not be provided when using the managed upload object.
     48    *   Instead, setting "computeChecksums" to true will enable automatic ContentMD5 generation
     49    *   by the managed upload object.
     50    * @option options queueSize [Number] (4) the size of the concurrent queue
     51    *   manager to upload parts in parallel. Set to 1 for synchronous uploading
     52    *   of parts. Note that the uploader will buffer at most queueSize * partSize
     53    *   bytes into memory at any given time.
     54    * @option options partSize [Number] (5mb) the size in bytes for each
     55    *   individual part to be uploaded. Adjust the part size to ensure the number
     56    *   of parts does not exceed {maxTotalParts}. See {minPartSize} for the
     57    *   minimum allowed part size.
     58    * @option options leavePartsOnError [Boolean] (false) whether to abort the
     59    *   multipart upload if an error occurs. Set to true if you want to handle
     60    *   failures manually.
     61    * @option options service [AWS.S3] an optional S3 service object to use for
     62    *   requests. This object might have bound parameters used by the uploader.
     63    * @example Creating a default uploader for a stream object
     64    *   var upload = new AWS.S3.ManagedUpload({
     65    *     params: {Bucket: 'bucket', Key: 'key', Body: stream}
     66    *   });
     67    * @example Creating an uploader with concurrency of 1 and partSize of 10mb
     68    *   var upload = new AWS.S3.ManagedUpload({
     69    *     partSize: 10 * 1024 * 1024, queueSize: 1,
     70    *     params: {Bucket: 'bucket', Key: 'key', Body: stream}
     71    *   });
     72    * @see send
     73    */
     74   constructor: function ManagedUpload(options) {
     75     var self = this;
     76     AWS.SequentialExecutor.call(self);
     77     self.body = null;
     78     self.sliceFn = null;
     79     self.callback = null;
     80     self.parts = {};
     81     self.completeInfo = [];
     82     self.fillQueue = function() {
     83       self.callback(new Error('Unsupported body payload ' + typeof self.body));
     84     };
     85 
     86     self.configure(options);
     87   },
     88 
     89   /**
     90    * @api private
     91    */
     92   configure: function configure(options) {
     93     options = options || {};
     94     this.partSize = this.minPartSize;
     95 
     96     if (options.queueSize) this.queueSize = options.queueSize;
     97     if (options.partSize) this.partSize = options.partSize;
     98     if (options.leavePartsOnError) this.leavePartsOnError = true;
     99 
    100     if (this.partSize < this.minPartSize) {
    101       throw new Error('partSize must be greater than ' +
    102                       this.minPartSize);
    103     }
    104 
    105     this.service = options.service;
    106     this.bindServiceObject(options.params);
    107     this.validateBody();
    108     this.adjustTotalBytes();
    109   },
    110 
    111   /**
    112    * @api private
    113    */
    114   leavePartsOnError: false,
    115 
    116   /**
    117    * @api private
    118    */
    119   queueSize: 4,
    120 
    121   /**
    122    * @api private
    123    */
    124   partSize: null,
    125 
    126   /**
    127    * @readonly
    128    * @return [Number] the minimum number of bytes for an individual part
    129    *   upload.
    130    */
    131   minPartSize: 1024 * 1024 * 5,
    132 
    133   /**
    134    * @readonly
    135    * @return [Number] the maximum allowed number of parts in a multipart upload.
    136    */
    137   maxTotalParts: 10000,
    138 
    139   /**
    140    * Initiates the managed upload for the payload.
    141    *
    142    * @callback callback function(err, data)
    143    *   @param err [Error] an error or null if no error occurred.
    144    *   @param data [map] The response data from the successful upload:
    145    *     * `Location` (String) the URL of the uploaded object
    146    *     * `ETag` (String) the ETag of the uploaded object
    147    *     * `Bucket` (String) the bucket to which the object was uploaded
    148    *     * `Key` (String) the key to which the object was uploaded
    149    * @example Sending a managed upload object
    150    *   var params = {Bucket: 'bucket', Key: 'key', Body: stream};
    151    *   var upload = new AWS.S3.ManagedUpload({params: params});
    152    *   upload.send(function(err, data) {
    153    *     console.log(err, data);
    154    *   });
    155    */
    156   send: function(callback) {
    157     var self = this;
    158     self.failed = false;
    159     self.callback = callback || function(err) { if (err) throw err; };
    160 
    161     var runFill = true;
    162     if (self.sliceFn) {
    163       self.fillQueue = self.fillBuffer;
    164     } else if (AWS.util.isNode()) {
    165       var Stream = AWS.util.stream.Stream;
    166       if (self.body instanceof Stream) {
    167         runFill = false;
    168         self.fillQueue = self.fillStream;
    169         self.partBuffers = [];
    170         self.body.
    171           on('error', function(err) { self.cleanup(err); }).
    172           on('readable', function() { self.fillQueue(); }).
    173           on('end', function() {
    174             self.isDoneChunking = true;
    175             self.numParts = self.totalPartNumbers;
    176             self.fillQueue.call(self);
    177           });
    178       }
    179     }
    180 
    181     if (runFill) self.fillQueue.call(self);
    182   },
    183 
    184   /**
    185    * @!method  promise()
    186    *   Returns a 'thenable' promise.
    187    *
    188    *   Two callbacks can be provided to the `then` method on the returned promise.
    189    *   The first callback will be called if the promise is fulfilled, and the second
    190    *   callback will be called if the promise is rejected.
    191    *   @callback fulfilledCallback function(data)
    192    *     Called if the promise is fulfilled.
    193    *     @param data [map] The response data from the successful upload:
    194    *       `Location` (String) the URL of the uploaded object
    195    *       `ETag` (String) the ETag of the uploaded object
    196    *       `Bucket` (String) the bucket to which the object was uploaded
    197    *       `Key` (String) the key to which the object was uploaded
    198    *   @callback rejectedCallback function(err)
    199    *     Called if the promise is rejected.
    200    *     @param err [Error] an error or null if no error occurred.
    201    *   @return [Promise] A promise that represents the state of the upload request.
    202    *   @example Sending an upload request using promises.
    203    *     var upload = s3.upload({Bucket: 'bucket', Key: 'key', Body: stream});
    204    *     var promise = upload.promise();
    205    *     promise.then(function(data) { ... }, function(err) { ... });
    206    */
    207 
    208   /**
    209    * Aborts a managed upload, including all concurrent upload requests.
    210    * @note By default, calling this function will cleanup a multipart upload
    211    *   if one was created. To leave the multipart upload around after aborting
    212    *   a request, configure `leavePartsOnError` to `true` in the {constructor}.
    213    * @note Calling {abort} in the browser environment will not abort any requests
    214    *   that are already in flight. If a multipart upload was created, any parts
    215    *   not yet uploaded will not be sent, and the multipart upload will be cleaned up.
    216    * @example Aborting an upload
    217    *   var params = {
    218    *     Bucket: 'bucket', Key: 'key',
    219    *     Body: new Buffer(1024 * 1024 * 25) // 25MB payload
    220    *   };
    221    *   var upload = s3.upload(params);
    222    *   upload.send(function (err, data) {
    223    *     if (err) console.log("Error:", err.code, err.message);
    224    *     else console.log(data);
    225    *   });
    226    *
    227    *   // abort request in 1 second
    228    *   setTimeout(upload.abort.bind(upload), 1000);
    229    */
    230   abort: function() {
    231     this.cleanup(AWS.util.error(new Error('Request aborted by user'), {
    232       code: 'RequestAbortedError', retryable: false
    233     }));
    234   },
    235 
    236   /**
    237    * @api private
    238    */
    239   validateBody: function validateBody() {
    240     var self = this;
    241     self.body = self.service.config.params.Body;
    242     if (!self.body) throw new Error('params.Body is required');
    243     if (typeof self.body === 'string') {
    244       self.body = new AWS.util.Buffer(self.body);
    245     }
    246     self.sliceFn = AWS.util.arraySliceFn(self.body);
    247   },
    248 
    249   /**
    250    * @api private
    251    */
    252   bindServiceObject: function bindServiceObject(params) {
    253     params = params || {};
    254     var self = this;
    255 
    256     // bind parameters to new service object
    257     if (!self.service) {
    258       self.service = new AWS.S3({params: params});
    259     } else {
    260       var config = AWS.util.copy(self.service.config);
    261       self.service = new self.service.constructor.__super__(config);
    262       self.service.config.params =
    263         AWS.util.merge(self.service.config.params || {}, params);
    264     }
    265   },
    266 
    267   /**
    268    * @api private
    269    */
    270   adjustTotalBytes: function adjustTotalBytes() {
    271     var self = this;
    272     try { // try to get totalBytes
    273       self.totalBytes = byteLength(self.body);
    274     } catch (e) { }
    275 
    276     // try to adjust partSize if we know payload length
    277     if (self.totalBytes) {
    278       var newPartSize = Math.ceil(self.totalBytes / self.maxTotalParts);
    279       if (newPartSize > self.partSize) self.partSize = newPartSize;
    280     } else {
    281       self.totalBytes = undefined;
    282     }
    283   },
    284 
    285   /**
    286    * @api private
    287    */
    288   isDoneChunking: false,
    289 
    290   /**
    291    * @api private
    292    */
    293   partPos: 0,
    294 
    295   /**
    296    * @api private
    297    */
    298   totalChunkedBytes: 0,
    299 
    300   /**
    301    * @api private
    302    */
    303   totalUploadedBytes: 0,
    304 
    305   /**
    306    * @api private
    307    */
    308   totalBytes: undefined,
    309 
    310   /**
    311    * @api private
    312    */
    313   numParts: 0,
    314 
    315   /**
    316    * @api private
    317    */
    318   totalPartNumbers: 0,
    319 
    320   /**
    321    * @api private
    322    */
    323   activeParts: 0,
    324 
    325   /**
    326    * @api private
    327    */
    328   doneParts: 0,
    329 
    330   /**
    331    * @api private
    332    */
    333   parts: null,
    334 
    335   /**
    336    * @api private
    337    */
    338   completeInfo: null,
    339 
    340   /**
    341    * @api private
    342    */
    343   failed: false,
    344 
    345   /**
    346    * @api private
    347    */
    348   multipartReq: null,
    349 
    350   /**
    351    * @api private
    352    */
    353   partBuffers: null,
    354 
    355   /**
    356    * @api private
    357    */
    358   partBufferLength: 0,
    359 
    360   /**
    361    * @api private
    362    */
    363   fillBuffer: function fillBuffer() {
    364     var self = this;
    365     var bodyLen = byteLength(self.body);
    366 
    367     if (bodyLen === 0) {
    368       self.isDoneChunking = true;
    369       self.numParts = 1;
    370       self.nextChunk(self.body);
    371       return;
    372     }
    373 
    374     while (self.activeParts < self.queueSize && self.partPos < bodyLen) {
    375       var endPos = Math.min(self.partPos + self.partSize, bodyLen);
    376       var buf = self.sliceFn.call(self.body, self.partPos, endPos);
    377       self.partPos += self.partSize;
    378 
    379       if (byteLength(buf) < self.partSize || self.partPos === bodyLen) {
    380         self.isDoneChunking = true;
    381         self.numParts = self.totalPartNumbers + 1;
    382       }
    383       self.nextChunk(buf);
    384     }
    385   },
    386 
    387   /**
    388    * @api private
    389    */
    390   fillStream: function fillStream() {
    391     var self = this;
    392     if (self.activeParts >= self.queueSize) return;
    393 
    394     var buf = self.body.read(self.partSize - self.partBufferLength) ||
    395               self.body.read();
    396     if (buf) {
    397       self.partBuffers.push(buf);
    398       self.partBufferLength += buf.length;
    399       self.totalChunkedBytes += buf.length;
    400     }
    401 
    402     if (self.partBufferLength >= self.partSize) {
    403       // if we have single buffer we avoid copyfull concat
    404       var pbuf = self.partBuffers.length === 1 ?
    405         self.partBuffers[0] : Buffer.concat(self.partBuffers);
    406       self.partBuffers = [];
    407       self.partBufferLength = 0;
    408 
    409       // if we have more than partSize, push the rest back on the queue
    410       if (pbuf.length > self.partSize) {
    411         var rest = pbuf.slice(self.partSize);
    412         self.partBuffers.push(rest);
    413         self.partBufferLength += rest.length;
    414         pbuf = pbuf.slice(0, self.partSize);
    415       }
    416 
    417       self.nextChunk(pbuf);
    418     }
    419 
    420     if (self.isDoneChunking && !self.isDoneSending) {
    421       // if we have single buffer we avoid copyfull concat
    422       pbuf = self.partBuffers.length === 1 ?
    423           self.partBuffers[0] : Buffer.concat(self.partBuffers);
    424       self.partBuffers = [];
    425       self.partBufferLength = 0;
    426       self.totalBytes = self.totalChunkedBytes;
    427       self.isDoneSending = true;
    428 
    429       if (self.numParts === 0 || pbuf.length > 0) {
    430         self.numParts++;
    431         self.nextChunk(pbuf);
    432       }
    433     }
    434 
    435     self.body.read(0);
    436   },
    437 
    438   /**
    439    * @api private
    440    */
    441   nextChunk: function nextChunk(chunk) {
    442     var self = this;
    443     if (self.failed) return null;
    444 
    445     var partNumber = ++self.totalPartNumbers;
    446     if (self.isDoneChunking && partNumber === 1) {
    447       var req = self.service.putObject({Body: chunk});
    448       req._managedUpload = self;
    449       req.on('httpUploadProgress', self.progress).send(self.finishSinglePart);
    450       return null;
    451     } else if (self.service.config.params.ContentMD5) {
    452       var err = AWS.util.error(new Error('The Content-MD5 you specified is invalid for multi-part uploads.'), {
    453         code: 'InvalidDigest', retryable: false
    454       });
    455 
    456       self.cleanup(err);
    457       return null;
    458     }
    459 
    460     if (self.completeInfo[partNumber] && self.completeInfo[partNumber].ETag !== null) {
    461       return null; // Already uploaded this part.
    462     }
    463 
    464     self.activeParts++;
    465     if (!self.service.config.params.UploadId) {
    466 
    467       if (!self.multipartReq) { // create multipart
    468         self.multipartReq = self.service.createMultipartUpload();
    469         self.multipartReq.on('success', function(resp) {
    470           self.service.config.params.UploadId = resp.data.UploadId;
    471           self.multipartReq = null;
    472         });
    473         self.queueChunks(chunk, partNumber);
    474         self.multipartReq.on('error', function(err) {
    475           self.cleanup(err);
    476         });
    477         self.multipartReq.send();
    478       } else {
    479         self.queueChunks(chunk, partNumber);
    480       }
    481     } else { // multipart is created, just send
    482       self.uploadPart(chunk, partNumber);
    483     }
    484   },
    485 
    486   /**
    487    * @api private
    488    */
    489   uploadPart: function uploadPart(chunk, partNumber) {
    490     var self = this;
    491 
    492     var partParams = {
    493       Body: chunk,
    494       ContentLength: AWS.util.string.byteLength(chunk),
    495       PartNumber: partNumber
    496     };
    497 
    498     var partInfo = {ETag: null, PartNumber: partNumber};
    499     self.completeInfo[partNumber] = partInfo;
    500 
    501     var req = self.service.uploadPart(partParams);
    502     self.parts[partNumber] = req;
    503     req._lastUploadedBytes = 0;
    504     req._managedUpload = self;
    505     req.on('httpUploadProgress', self.progress);
    506     req.send(function(err, data) {
    507       delete self.parts[partParams.PartNumber];
    508       self.activeParts--;
    509 
    510       if (!err && (!data || !data.ETag)) {
    511         var message = 'No access to ETag property on response.';
    512         if (AWS.util.isBrowser()) {
    513           message += ' Check CORS configuration to expose ETag header.';
    514         }
    515 
    516         err = AWS.util.error(new Error(message), {
    517           code: 'ETagMissing', retryable: false
    518         });
    519       }
    520       if (err) return self.cleanup(err);
    521 
    522       partInfo.ETag = data.ETag;
    523       self.doneParts++;
    524       if (self.isDoneChunking && self.doneParts === self.numParts) {
    525         self.finishMultiPart();
    526       } else {
    527         self.fillQueue.call(self);
    528       }
    529     });
    530   },
    531 
    532   /**
    533    * @api private
    534    */
    535   queueChunks: function queueChunks(chunk, partNumber) {
    536     var self = this;
    537     self.multipartReq.on('success', function() {
    538       self.uploadPart(chunk, partNumber);
    539     });
    540   },
    541 
    542   /**
    543    * @api private
    544    */
    545   cleanup: function cleanup(err) {
    546     var self = this;
    547     if (self.failed) return;
    548 
    549     // clean up stream
    550     if (typeof self.body.removeAllListeners === 'function' &&
    551         typeof self.body.resume === 'function') {
    552       self.body.removeAllListeners('readable');
    553       self.body.removeAllListeners('end');
    554       self.body.resume();
    555     }
    556 
    557     if (self.service.config.params.UploadId && !self.leavePartsOnError) {
    558       self.service.abortMultipartUpload().send();
    559     }
    560 
    561     AWS.util.each(self.parts, function(partNumber, part) {
    562       part.removeAllListeners('complete');
    563       part.abort();
    564     });
    565 
    566     self.activeParts = 0;
    567     self.partPos = 0;
    568     self.numParts = 0;
    569     self.totalPartNumbers = 0;
    570     self.parts = {};
    571     self.failed = true;
    572     self.callback(err);
    573   },
    574 
    575   /**
    576    * @api private
    577    */
    578   finishMultiPart: function finishMultiPart() {
    579     var self = this;
    580     var completeParams = { MultipartUpload: { Parts: self.completeInfo.slice(1) } };
    581     self.service.completeMultipartUpload(completeParams, function(err, data) {
    582       if (err) return self.cleanup(err);
    583       else self.callback(err, data);
    584     });
    585   },
    586 
    587   /**
    588    * @api private
    589    */
    590   finishSinglePart: function finishSinglePart(err, data) {
    591     var upload = this.request._managedUpload;
    592     var httpReq = this.request.httpRequest;
    593     var endpoint = httpReq.endpoint;
    594     if (err) return upload.callback(err);
    595     data.Location =
    596       [endpoint.protocol, '//', endpoint.host, httpReq.path].join('');
    597     data.key = this.request.params.Key; // will stay undocumented
    598     data.Key = this.request.params.Key;
    599     data.Bucket = this.request.params.Bucket;
    600     upload.callback(err, data);
    601   },
    602 
    603   /**
    604    * @api private
    605    */
    606   progress: function progress(info) {
    607     var upload = this._managedUpload;
    608     if (this.operation === 'putObject') {
    609       info.part = 1;
    610       info.key = this.params.Key;
    611     } else {
    612       upload.totalUploadedBytes += info.loaded - this._lastUploadedBytes;
    613       this._lastUploadedBytes = info.loaded;
    614       info = {
    615         loaded: upload.totalUploadedBytes,
    616         total: upload.totalBytes,
    617         part: this.params.PartNumber,
    618         key: this.params.Key
    619       };
    620     }
    621     upload.emit('httpUploadProgress', [info]);
    622   }
    623 });
    624 
    625 AWS.util.mixin(AWS.S3.ManagedUpload, AWS.SequentialExecutor);
    626 
    627 /**
    628  * @api private
    629  */
    630 AWS.S3.ManagedUpload.addPromisesToClass = function addPromisesToClass(PromiseDependency) {
    631   this.prototype.promise = AWS.util.promisifyMethod('send', PromiseDependency);
    632 };
    633 
    634 /**
    635  * @api private
    636  */
    637 AWS.S3.ManagedUpload.deletePromisesFromClass = function deletePromisesFromClass() {
    638   delete this.prototype.promise;
    639 };
    640 
    641 AWS.util.addPromises(AWS.S3.ManagedUpload);
    642 
    643 module.exports = AWS.S3.ManagedUpload;