В настоящее время я использую плагин node.js под названием s3-upload-stream для потоковой передачи очень больших файлов в Amazon S3. Он использует составной API и по большей части работает очень хорошо.
Однако этот модуль показывает свой возраст, и мне уже пришлось внести в него изменения (автор также отказался от него). Сегодня я столкнулся с другой проблемой с Amazon, и я бы очень хотел принять рекомендацию автора и начать использовать официальный aws-sdk для выполнения своих загрузок.
НО.
Официальный SDK, похоже, не поддерживает подключение к s3.upload()
. Природа s3.upload заключается в том, что вы должны передать читаемый поток в качестве аргумента конструктору S3.
У меня есть примерно 120+ модулей пользовательского кода, которые выполняют различную обработку файлов, и они не зависят от конечного пункта назначения своего вывода. Движок передает им записываемый по конвейеру выходной поток, и они передают его по конвейеру. Я не могу передать им AWS.S3
объект и попросить их вызвать upload()
его без добавления кода ко всем модулям. Причина, по которой я использовал, s3-upload-stream
заключалась в том, что он поддерживал трубопровод.
Есть ли способ сделать aws-sdk s3.upload()
чем - то, на что я могу направить поток?
s3
параметр внутри трубы иstream
откуда?Немного запоздалый ответ, надеюсь, это может помочь кому-то другому. Вы можете вернуть как записываемый поток, так и обещание, чтобы вы могли получить данные ответа по завершении загрузки.
const AWS = require('aws-sdk'); const stream = require('stream'); const uploadStream = ({ Bucket, Key }) => { const s3 = new AWS.S3(); const pass = new stream.PassThrough(); return { writeStream: pass, promise: s3.upload({ Bucket, Key, Body: pass }).promise(), }; }
И вы можете использовать функцию следующим образом:
const { writeStream, promise } = uploadStream({Bucket: 'yourbucket', Key: 'yourfile.mp4'}); const readStream = fs.createReadStream('/path/to/yourfile.mp4'); const pipeline = readStream.pipe(writeStream);
Теперь вы можете проверить обещание:
promise.then(() => { console.log('upload completed successfully'); }).catch((err) => { console.log('upload failed.', err.message); });
Или, поскольку
stream.pipe()
возвращает stream.Writable, пункт назначения (переменная writeStream выше), учитывая цепочку каналов, мы также можем использовать его события:pipeline.on('close', () => { console.log('upload successful'); }); pipeline.on('error', (err) => { console.log('upload failed', err.message) });
источник
В принятом ответе функция завершается до завершения загрузки и, следовательно, неверна. Приведенный ниже код правильно передает из читаемого потока.
Ссылка на загрузку
async function uploadReadableStream(stream) { const params = {Bucket: bucket, Key: key, Body: stream}; return s3.upload(params).promise(); } async function upload() { const readable = getSomeReadableStream(); const results = await uploadReadableStream(readable); console.log('upload complete', results); }
Вы также можете пойти дальше и вывести информацию о ходе выполнения, используя
ManagedUpload
как таковые:const manager = s3.upload(params); manager.on('httpUploadProgress', (progress) => { console.log('progress', progress) // { loaded: 4915, total: 192915, part: 1, key: 'foo.jpg' } });
Справочник по ManagedUpload
Список доступных событий
источник
TypeError: dest.on is not a function
есть идеи, почему?dest.on
? Вы можете показать пример? @FireBrandНи один из ответов не помог мне, потому что я хотел:
s3.upload()
s3.upload()
в другой потокПринятый ответ не делает последнего. Остальные полагаются на обещание api, которое неудобно при работе с потоковыми конвейерами.
Это моя модификация принятого ответа.
const s3 = new S3(); function writeToS3({Key, Bucket}) { const Body = new stream.PassThrough(); s3.upload({ Body, Key, Bucket: process.env.adpBucket }) .on('httpUploadProgress', progress => { console.log('progress', progress); }) .send((err, data) => { if (err) { Body.destroy(err); } else { console.log(`File uploaded and available at ${data.Location}`); Body.destroy(); } }); return Body; } const pipeline = myReadableStream.pipe(writeToS3({Key, Bucket}); pipeline.on('close', () => { // upload finished, do something else }) pipeline.on('error', () => { // upload wasn't successful. Handle it })
источник
Решение Type Script: в
этом примере используются:
import * as AWS from "aws-sdk"; import * as fsExtra from "fs-extra"; import * as zlib from "zlib"; import * as stream from "stream";
И асинхронная функция:
public async saveFile(filePath: string, s3Bucket: AWS.S3, key: string, bucketName: string): Promise<boolean> { const uploadStream = (S3: AWS.S3, Bucket: string, Key: string) => { const passT = new stream.PassThrough(); return { writeStream: passT, promise: S3.upload({ Bucket, Key, Body: passT }).promise(), }; }; const { writeStream, promise } = uploadStream(s3Bucket, bucketName, key); fsExtra.createReadStream(filePath).pipe(writeStream); // NOTE: Addition You can compress to zip by .pipe(zlib.createGzip()).pipe(writeStream) let output = true; await promise.catch((reason)=> { output = false; console.log(reason);}); return output; }
Вызовите этот метод где-нибудь, например:
let result = await saveFileToS3(testFilePath, someS3Bucket, someKey, someBucketName);
источник
В наиболее распространенном ответе выше следует отметить следующее: вам нужно вернуть проход в функции, если вы используете канал, например,
fs.createReadStream(<filePath>).pipe(anyUploadFunction())
function anyUploadFunction () { let pass = new stream.PassThrough(); return pass // <- Returning this pass is important for the stream to understand where it needs to write to. }
В противном случае он молча перейдет к следующему без выдачи ошибки или выдаст ошибку в
TypeError: dest.on is not a function
зависимости от того, как вы написали функцию.источник
Если это поможет кому-то, кому я смог успешно передать поток с клиента на s3:
https://gist.github.com/mattlockyer/532291b6194f6d9ca40cb82564db9d2a
Серверный код предполагает,
req
что это объект потока, в моем случае он был отправлен от клиента с указанием информации о файле в заголовках.const fileUploadStream = (req, res) => { //get "body" args from header const { id, fn } = JSON.parse(req.get('body')); const Key = id + '/' + fn; //upload to s3 folder "id" with filename === fn const params = { Key, Bucket: bucketName, //set somewhere Body: req, //req is a stream }; s3.upload(params, (err, data) => { if (err) { res.send('Error Uploading Data: ' + JSON.stringify(err) + '\n' + JSON.stringify(err.stack)); } else { res.send(Key); } }); };
Да, это нарушает конвенцию, но если вы посмотрите на суть, это намного чище, чем все, что я нашел с помощью multer, busboy и т. Д.
+1 за прагматизм и спасибо @SalehenRahman за помощь.
источник
Для тех, кто жалуется, что когда они используют функцию загрузки s3 api и файл с нулевым байтом заканчивается на s3 (@ Radar155 и @gabo), у меня тоже была эта проблема.
Создайте второй поток PassThrough и просто перенаправьте все данные из первого во второй и передайте ссылку на этот второй в s3. Вы можете сделать это несколькими разными способами - возможно, грязный способ - это прослушать событие «данные» в первом потоке и затем записать те же данные во второй поток - аналогично для события «конца» - просто вызовите конечная функция во втором потоке. Я понятия не имею, является ли это ошибкой в aws api, версией узла или какой-либо другой проблемой, но для меня это помогло.
Вот как это может выглядеть:
var PassThroughStream = require('stream').PassThrough; var srcStream = new PassThroughStream(); var rstream = fs.createReadStream('Learning/stocktest.json'); var sameStream = rstream.pipe(srcStream); // interesting note: (srcStream == sameStream) at this point var destStream = new PassThroughStream(); // call your s3.upload function here - passing in the destStream as the Body parameter srcStream.on('data', function (chunk) { destStream.write(chunk); }); srcStream.on('end', function () { dataStream.end(); });
источник
Следуя другим ответам и используя последний AWS SDK для Node.js, существует гораздо более чистое и простое решение, поскольку функция s3 upload () принимает поток, используя синтаксис await и обещание S3:
var model = await s3Client.upload({ Bucket : bucket, Key : key, ContentType : yourContentType, Body : fs.createReadStream(path-to-file) }).promise();
источник
Я использую KnexJS, и у меня возникла проблема с использованием их потокового API. Я наконец исправил это, надеюсь, следующее поможет кому-то.
const knexStream = knex.select('*').from('my_table').stream(); const passThroughStream = new stream.PassThrough(); knexStream.on('data', (chunk) => passThroughStream.write(JSON.stringify(chunk) + '\n')); knexStream.on('end', () => passThroughStream.end()); const uploadResult = await s3 .upload({ Bucket: 'my-bucket', Key: 'stream-test.txt', Body: passThroughStream }) .promise();
источник
Если вы знаете размер потока, вы можете использовать minio-js для загрузки потока следующим образом:
s3Client.putObject('my-bucketname', 'my-objectname.ogg', stream, size, 'audio/ogg', function(e) { if (e) { return console.log(e) } console.log("Successfully uploaded the stream") })
источник