Boto3 для загрузки всех файлов из S3 Bucket

84

Я использую boto3 для получения файлов из ведра s3. Мне нужна аналогичная функциональность, напримерaws s3 sync

Мой текущий код

#!/usr/bin/python
import boto3
s3=boto3.client('s3')
list=s3.list_objects(Bucket='my_bucket_name')['Contents']
for key in list:
    s3.download_file('my_bucket_name', key['Key'], key['Key'])

Это нормально работает, пока в корзине есть только файлы. Если внутри ведра присутствует папка, выдает ошибку

Traceback (most recent call last):
  File "./test", line 6, in <module>
    s3.download_file('my_bucket_name', key['Key'], key['Key'])
  File "/usr/local/lib/python2.7/dist-packages/boto3/s3/inject.py", line 58, in download_file
    extra_args=ExtraArgs, callback=Callback)
  File "/usr/local/lib/python2.7/dist-packages/boto3/s3/transfer.py", line 651, in download_file
    extra_args, callback)
  File "/usr/local/lib/python2.7/dist-packages/boto3/s3/transfer.py", line 666, in _download_file
    self._get_object(bucket, key, filename, extra_args, callback)
  File "/usr/local/lib/python2.7/dist-packages/boto3/s3/transfer.py", line 690, in _get_object
    extra_args, callback)
  File "/usr/local/lib/python2.7/dist-packages/boto3/s3/transfer.py", line 707, in _do_get_object
    with self._osutil.open(filename, 'wb') as f:
  File "/usr/local/lib/python2.7/dist-packages/boto3/s3/transfer.py", line 323, in open
    return open(filename, mode)
IOError: [Errno 2] No such file or directory: 'my_folder/.8Df54234'

Это правильный способ загрузить полную корзину s3 с помощью boto3. Как скачивать папки.

Шан
источник

Ответы:

40

При работе с бакетами, которые содержат более 1000 объектов, необходимо реализовать решение, которое использует NextContinuationTokenпоследовательные наборы из максимум 1000 ключей. Это решение сначала составляет список объектов, затем итеративно создает указанные каталоги и загружает существующие объекты.

import boto3
import os

s3_client = boto3.client('s3')

def download_dir(prefix, local, bucket, client=s3_client):
    """
    params:
    - prefix: pattern to match in s3
    - local: local path to folder in which to place files
    - bucket: s3 bucket with target contents
    - client: initialized s3 client object
    """
    keys = []
    dirs = []
    next_token = ''
    base_kwargs = {
        'Bucket':bucket,
        'Prefix':prefix,
    }
    while next_token is not None:
        kwargs = base_kwargs.copy()
        if next_token != '':
            kwargs.update({'ContinuationToken': next_token})
        results = client.list_objects_v2(**kwargs)
        contents = results.get('Contents')
        for i in contents:
            k = i.get('Key')
            if k[-1] != '/':
                keys.append(k)
            else:
                dirs.append(k)
        next_token = results.get('NextContinuationToken')
    for d in dirs:
        dest_pathname = os.path.join(local, d)
        if not os.path.exists(os.path.dirname(dest_pathname)):
            os.makedirs(os.path.dirname(dest_pathname))
    for k in keys:
        dest_pathname = os.path.join(local, k)
        if not os.path.exists(os.path.dirname(dest_pathname)):
            os.makedirs(os.path.dirname(dest_pathname))
        client.download_file(bucket, k, dest_pathname)
Грант Лангсет
источник
изменив это на принятый ответ, поскольку он обрабатывает более широкий вариант использования. Спасибо Грант
Шан
мой код переходит в бесконечный цикл вwhile next_token is not None:
gpd
@gpd этого не должно происходить, поскольку клиент boto3 вернет страницу без NextContinuationToken, когда он достигнет последней страницы, выйдя из оператора while. Если вы вставите последний ответ, полученный при использовании API-интерфейса boto3 (все, что хранится в переменной ответа), я думаю, будет более понятно, что происходит в вашем конкретном случае. Попробуйте распечатать переменную "результаты" просто для проверки. Я предполагаю, что вы указали объект префикса, который не соответствует какому-либо содержимому вашей корзины. Вы это проверили?
Грант Лангсет,
1
Обратите внимание, что вам потребуются незначительные изменения, чтобы он работал с Digital Ocean. как объясняется здесь
Дэвид Д.
2
Используя этот код, я получаю следующую ошибку: объект «NoneType» не повторяется:
TypeError
76

У меня те же потребности, и я создал следующую функцию, которая рекурсивно загружает файлы.

Каталоги создаются локально, только если они содержат файлы.

import boto3
import os

def download_dir(client, resource, dist, local='/tmp', bucket='your_bucket'):
    paginator = client.get_paginator('list_objects')
    for result in paginator.paginate(Bucket=bucket, Delimiter='/', Prefix=dist):
        if result.get('CommonPrefixes') is not None:
            for subdir in result.get('CommonPrefixes'):
                download_dir(client, resource, subdir.get('Prefix'), local, bucket)
        for file in result.get('Contents', []):
            dest_pathname = os.path.join(local, file.get('Key'))
            if not os.path.exists(os.path.dirname(dest_pathname)):
                os.makedirs(os.path.dirname(dest_pathname))
            resource.meta.client.download_file(bucket, file.get('Key'), dest_pathname)

Функция вызывается так:

def _start():
    client = boto3.client('s3')
    resource = boto3.resource('s3')
    download_dir(client, resource, 'clientconf/', '/tmp', bucket='my-bucket')
glefait
источник
6
Не думаю, что нужно создавать ресурс и клиента. Я считаю, что на ресурсе всегда есть клиент. Вы можете просто использовать resource.meta.client.
TheHerk 05
2
Я думаю, это должно быть «download_dir (клиент, ресурс, subdir.get ('Prefix'), local, bucket
rm999,
6
Я получал сообщение, OSError: [Errno 21] Is a directoryпоэтому я обернул вызов download_file, if not file.get('Key').endswith('/')чтобы разрешить. Спасибо @glefait и @Shan
user336828
5
Нет ли aws s3 syncв библиотеке boto3 эквивалента команды aws- cli?
greperror 07
8
Что distздесь?
Rob Rose
49

Amazon S3 не имеет папок / каталогов. Это плоская файловая структура .

Для сохранения внешнего вида каталогов имена путей сохраняются как часть объекта Key (имя файла). Например:

  • images/foo.jpg

В этом случае весь Ключ images/foo.jpg, а не просто foo.jpg.

Я подозреваю, что ваша проблема в том, что botoвозвращает файл с именем my_folder/.8Df54234и пытается сохранить его в локальной файловой системе. Однако ваша локальная файловая система интерпретирует my_folder/часть как имя каталога, а этот каталог не существует в вашей локальной файловой системе .

Вы можете либо обрезать имя файла, чтобы сохранить только .8Df54234часть, либо вам придется создать необходимые каталоги перед записью файлов. Обратите внимание, что это могут быть многоуровневые вложенные каталоги.

Более простым способом было бы использовать интерфейс командной строки AWS (CLI) , который сделает всю эту работу за вас, например:

aws s3 cp --recursive s3://my_bucket_name local_folder

Также есть syncопция, которая копирует только новые и измененные файлы.

Джон Ротенштейн
источник
1
@j Я понимаю это. Но мне нужно, чтобы папка создавалась автоматически, как и aws s3 sync. Возможно ли в boto3.
Шан
4
Вам нужно будет включить создание каталога как часть вашего кода Python. Если Ключ содержит каталог (например foo/bar.txt), вы должны будете создать каталог ( foo) перед вызовом s3.download_file. Это не автоматическая возможность boto.
Джон Ротенштейн
Здесь содержимое корзины S3 является динамическим, поэтому мне нужно проверять s3.list_objects(Bucket='my_bucket_name')['Contents']и фильтровать ключи папки и создавать их.
Шан
2
Поигравшись с Boto3 некоторое время, указанная здесь команда AWS CLI определенно является самым простым способом сделать это.
AdjunctProfessorFalcon
1
@Ben Пожалуйста, начните новый вопрос, а не задавайте вопрос как комментарий к старому (2015 г.) вопросу.
Джон Ротенштейн
43
import os
import boto3

#initiate s3 resource
s3 = boto3.resource('s3')

# select bucket
my_bucket = s3.Bucket('my_bucket_name')

# download file into current directory
for s3_object in my_bucket.objects.all():
    # Need to split s3_object.key into path and file name, else it will give error file not found.
    path, filename = os.path.split(s3_object.key)
    my_bucket.download_file(s3_object.key, filename)
Тушар Нирас
источник
3
Чисто и просто, почему бы не использовать это? Это гораздо понятнее, чем все остальные решения. Коллекции, кажется, делают для вас много вещей в фоновом режиме.
Joost
3
Я думаю, вам следует сначала создать все подпапки, чтобы это работало правильно.
rpanai 02
2
Этот код поместит все в выходной каталог верхнего уровня независимо от того, насколько глубоко он вложен в S3. И если несколько файлов имеют одинаковое имя в разных каталогах, они будут перекрывать друг друга. Думаю, вам нужна еще одна строчка:, os.makedirs(path)и тогда должно быть место загрузки object.key.
Скотт Смит
13

В настоящее время я выполняю задачу, используя следующие

#!/usr/bin/python
import boto3
s3=boto3.client('s3')
list=s3.list_objects(Bucket='bucket')['Contents']
for s3_key in list:
    s3_object = s3_key['Key']
    if not s3_object.endswith("/"):
        s3.download_file('bucket', s3_object, s3_object)
    else:
        import os
        if not os.path.exists(s3_object):
            os.makedirs(s3_object)

Хотя он выполняет свою работу, я не уверен, что это хорошо. Я оставляю это здесь, чтобы помочь другим пользователям и получить дополнительные ответы, чтобы лучше достичь этого

Шан
источник
9

Лучше поздно, чем никогда :) Предыдущий ответ с пагинатором действительно хорош. Однако это рекурсивно, и вы можете выйти за пределы рекурсии Python. Вот альтернативный подход с парочкой дополнительных проверок.

import os
import errno
import boto3


def assert_dir_exists(path):
    """
    Checks if directory tree in path exists. If not it created them.
    :param path: the path to check if it exists
    """
    try:
        os.makedirs(path)
    except OSError as e:
        if e.errno != errno.EEXIST:
            raise


def download_dir(client, bucket, path, target):
    """
    Downloads recursively the given S3 path to the target directory.
    :param client: S3 client to use.
    :param bucket: the name of the bucket to download from
    :param path: The S3 directory to download.
    :param target: the local directory to download the files to.
    """

    # Handle missing / at end of prefix
    if not path.endswith('/'):
        path += '/'

    paginator = client.get_paginator('list_objects_v2')
    for result in paginator.paginate(Bucket=bucket, Prefix=path):
        # Download each file individually
        for key in result['Contents']:
            # Calculate relative path
            rel_path = key['Key'][len(path):]
            # Skip paths ending in /
            if not key['Key'].endswith('/'):
                local_file_path = os.path.join(target, rel_path)
                # Make sure directories exist
                local_file_dir = os.path.dirname(local_file_path)
                assert_dir_exists(local_file_dir)
                client.download_file(bucket, key['Key'], local_file_path)


client = boto3.client('s3')

download_dir(client, 'bucket-name', 'path/to/data', 'downloads')
Ифукаракис
источник
1
Получил KeyError: 'Contents'. входной путь '/arch/R/storeincomelogs/, полный путь /arch/R/storeincomelogs/201901/01/xxx.parquet.
Mithril
3

У меня есть обходной путь, который запускает AWS CLI в том же процессе.

Установить awscliкак python lib:

pip install awscli

Затем определите эту функцию:

from awscli.clidriver import create_clidriver

def aws_cli(*cmd):
    old_env = dict(os.environ)
    try:

        # Environment
        env = os.environ.copy()
        env['LC_CTYPE'] = u'en_US.UTF'
        os.environ.update(env)

        # Run awscli in the same process
        exit_code = create_clidriver().main(*cmd)

        # Deal with problems
        if exit_code > 0:
            raise RuntimeError('AWS CLI exited with code {}'.format(exit_code))
    finally:
        os.environ.clear()
        os.environ.update(old_env)

Выполнить:

aws_cli('s3', 'sync', '/path/to/source', 's3://bucket/destination', '--delete')
mattalxndr
источник
Я использовал ту же идею, но без использования syncкоманды, а просто выполнял команду aws s3 cp s3://{bucket}/{folder} {local_folder} --recursive. Время сокращено с минут (почти 1 час) до буквально секунд
acaruci
Я использую этот код, но у меня есть проблема, когда отображаются все журналы отладки. Я объявил это глобально: logging.basicConfig(format='%(levelname)s: %(message)s', level=logging.WARNING) logger = logging.getLogger()и хочу, чтобы журналы выводились только из root. Есть идеи?
апрель Polubiec
1

Получать все файлы за один раз - очень плохая идея. Лучше получать их партиями.

Одна реализация, которую я использую для получения определенной папки (каталога) из S3, это:

def get_directory(directory_path, download_path, exclude_file_names):
    # prepare session
    session = Session(aws_access_key_id, aws_secret_access_key, region_name)

    # get instances for resource and bucket
    resource = session.resource('s3')
    bucket = resource.Bucket(bucket_name)

    for s3_key in self.client.list_objects(Bucket=self.bucket_name, Prefix=directory_path)['Contents']:
        s3_object = s3_key['Key']
        if s3_object not in exclude_file_names:
            bucket.download_file(file_path, download_path + str(s3_object.split('/')[-1])

и все же, если вы хотите получить все ведро, используйте его через CIL, как @John Rotenstein, как указано ниже,

aws s3 cp --recursive s3://bucket_name download_path
Ганатра
источник
0
for objs in my_bucket.objects.all():
    print(objs.key)
    path='/tmp/'+os.sep.join(objs.key.split(os.sep)[:-1])
    try:
        if not os.path.exists(path):
            os.makedirs(path)
        my_bucket.download_file(objs.key, '/tmp/'+objs.key)
    except FileExistsError as fe:                          
        print(objs.key+' exists')

Этот код загрузит содержимое в /tmp/каталог. Если хотите, можете сменить каталог.

Раджеш Раджендран
источник
0

Если вы хотите вызвать сценарий bash с помощью python, вот простой способ загрузить файл из папки в корзине S3 в локальную папку (на машине Linux):

import boto3
import subprocess
import os

###TOEDIT###
my_bucket_name = "your_my_bucket_name"
bucket_folder_name = "your_bucket_folder_name"
local_folder_path = "your_local_folder_path"
###TOEDIT###

# 1.Load thes list of files existing in the bucket folder
FILES_NAMES = []
s3 = boto3.resource('s3')
my_bucket = s3.Bucket('{}'.format(my_bucket_name))
for object_summary in my_bucket.objects.filter(Prefix="{}/".format(bucket_folder_name)):
#     print(object_summary.key)
    FILES_NAMES.append(object_summary.key)

# 2.List only new files that do not exist in local folder (to not copy everything!)
new_filenames = list(set(FILES_NAMES )-set(os.listdir(local_folder_path)))

# 3.Time to load files in your destination folder 
for new_filename in new_filenames:
    upload_S3files_CMD = """aws s3 cp s3://{}/{}/{} {}""".format(my_bucket_name,bucket_folder_name,new_filename ,local_folder_path)

    subprocess_call = subprocess.call([upload_S3files_CMD], shell=True)
    if subprocess_call != 0:
        print("ALERT: loading files not working correctly, please re-check new loaded files")
HazimoRa3d
источник
0

Я получил аналогичное требование и получил помощь, прочитав несколько из вышеперечисленных решений и на других веб-сайтах, я придумал сценарий ниже, просто хотел поделиться, может ли это кому-то помочь.

from boto3.session import Session
import os

def sync_s3_folder(access_key_id,secret_access_key,bucket_name,folder,destination_path):    
    session = Session(aws_access_key_id=access_key_id,aws_secret_access_key=secret_access_key)
    s3 = session.resource('s3')
    your_bucket = s3.Bucket(bucket_name)
    for s3_file in your_bucket.objects.all():
        if folder in s3_file.key:
            file=os.path.join(destination_path,s3_file.key.replace('/','\\'))
            if not os.path.exists(os.path.dirname(file)):
                os.makedirs(os.path.dirname(file))
            your_bucket.download_file(s3_file.key,file)
sync_s3_folder(access_key_id,secret_access_key,bucket_name,folder,destination_path)
Кранти
источник
0

Повторная публикация ответа @glefait с условием if в конце, чтобы избежать ошибки ОС 20. Первый ключ, который он получает, - это само имя папки, которое не может быть записано в путь назначения.

def download_dir(client, resource, dist, local='/tmp', bucket='your_bucket'):
    paginator = client.get_paginator('list_objects')
    for result in paginator.paginate(Bucket=bucket, Delimiter='/', Prefix=dist):
        if result.get('CommonPrefixes') is not None:
            for subdir in result.get('CommonPrefixes'):
                download_dir(client, resource, subdir.get('Prefix'), local, bucket)
        for file in result.get('Contents', []):
            print("Content: ",result)
            dest_pathname = os.path.join(local, file.get('Key'))
            print("Dest path: ",dest_pathname)
            if not os.path.exists(os.path.dirname(dest_pathname)):
                print("here last if")
                os.makedirs(os.path.dirname(dest_pathname))
            print("else file key: ", file.get('Key'))
            if not file.get('Key') == dist:
                print("Key not equal? ",file.get('Key'))
                resource.meta.client.download_file(bucket, file.get('Key'), dest_pathname)enter code here
Vinay
источник
0

Я столкнулся с этой проблемой какое-то время, и на всех форумах, которые я посещал, я не видел полного сквозного фрагмента того, что работает. Итак, я взял все части (добавил кое-что самостоятельно) и создал полный сквозной загрузчик S3!

Это не только автоматически загрузит файлы, но и, если файлы S3 находятся в подкаталогах, они будут созданы в локальном хранилище. В моем экземпляре приложения мне нужно установить разрешения и владельцев, поэтому я тоже добавил это (можно закомментировать, если не нужно).

Это было протестировано и работает в среде Docker (K8), но я добавил переменные среды в сценарий на всякий случай, если вы хотите протестировать / запустить его локально.

Я надеюсь, что это поможет кому-нибудь в поисках автоматизации загрузки S3. Я также приветствую любые советы, информацию и т. Д. О том, как это можно лучше оптимизировать, если это необходимо.

#!/usr/bin/python3
import gc
import logging
import os
import signal
import sys
import time
from datetime import datetime

import boto
from boto.exception import S3ResponseError
from pythonjsonlogger import jsonlogger

formatter = jsonlogger.JsonFormatter('%(message)%(levelname)%(name)%(asctime)%(filename)%(lineno)%(funcName)')

json_handler_out = logging.StreamHandler()
json_handler_out.setFormatter(formatter)

#Manual Testing Variables If Needed
#os.environ["DOWNLOAD_LOCATION_PATH"] = "some_path"
#os.environ["BUCKET_NAME"] = "some_bucket"
#os.environ["AWS_ACCESS_KEY"] = "some_access_key"
#os.environ["AWS_SECRET_KEY"] = "some_secret"
#os.environ["LOG_LEVEL_SELECTOR"] = "DEBUG, INFO, or ERROR"

#Setting Log Level Test
logger = logging.getLogger('json')
logger.addHandler(json_handler_out)
logger_levels = {
    'ERROR' : logging.ERROR,
    'INFO' : logging.INFO,
    'DEBUG' : logging.DEBUG
}
logger_level_selector = os.environ["LOG_LEVEL_SELECTOR"]
logger.setLevel(logger_level_selector)

#Getting Date/Time
now = datetime.now()
logger.info("Current date and time : ")
logger.info(now.strftime("%Y-%m-%d %H:%M:%S"))

#Establishing S3 Variables and Download Location
download_location_path = os.environ["DOWNLOAD_LOCATION_PATH"]
bucket_name = os.environ["BUCKET_NAME"]
aws_access_key_id = os.environ["AWS_ACCESS_KEY"]
aws_access_secret_key = os.environ["AWS_SECRET_KEY"]
logger.debug("Bucket: %s" % bucket_name)
logger.debug("Key: %s" % aws_access_key_id)
logger.debug("Secret: %s" % aws_access_secret_key)
logger.debug("Download location path: %s" % download_location_path)

#Creating Download Directory
if not os.path.exists(download_location_path):
    logger.info("Making download directory")
    os.makedirs(download_location_path)

#Signal Hooks are fun
class GracefulKiller:
    kill_now = False
    def __init__(self):
        signal.signal(signal.SIGINT, self.exit_gracefully)
        signal.signal(signal.SIGTERM, self.exit_gracefully)
    def exit_gracefully(self, signum, frame):
        self.kill_now = True

#Downloading from S3 Bucket
def download_s3_bucket():
    conn = boto.connect_s3(aws_access_key_id, aws_access_secret_key)
    logger.debug("Connection established: ")
    bucket = conn.get_bucket(bucket_name)
    logger.debug("Bucket: %s" % str(bucket))
    bucket_list = bucket.list()
#    logger.info("Number of items to download: {0}".format(len(bucket_list)))

    for s3_item in bucket_list:
        key_string = str(s3_item.key)
        logger.debug("S3 Bucket Item to download: %s" % key_string)
        s3_path = download_location_path + "/" + key_string
        logger.debug("Downloading to: %s" % s3_path)
        local_dir = os.path.dirname(s3_path)

        if not os.path.exists(local_dir):
            logger.info("Local directory doesn't exist, creating it... %s" % local_dir)
            os.makedirs(local_dir)
            logger.info("Updating local directory permissions to %s" % local_dir)
#Comment or Uncomment Permissions based on Local Usage
            os.chmod(local_dir, 0o775)
            os.chown(local_dir, 60001, 60001)
        logger.debug("Local directory for download: %s" % local_dir)
        try:
            logger.info("Downloading File: %s" % key_string)
            s3_item.get_contents_to_filename(s3_path)
            logger.info("Successfully downloaded File: %s" % s3_path)
            #Updating Permissions
            logger.info("Updating Permissions for %s" % str(s3_path))
#Comment or Uncomment Permissions based on Local Usage
            os.chmod(s3_path, 0o664)
            os.chown(s3_path, 60001, 60001)
        except (OSError, S3ResponseError) as e:
            logger.error("Fatal error in s3_item.get_contents_to_filename", exc_info=True)
            # logger.error("Exception in file download from S3: {}".format(e))
            continue
        logger.info("Deleting %s from S3 Bucket" % str(s3_item.key))
        s3_item.delete()

def main():
    killer = GracefulKiller()
    while not killer.kill_now:
        logger.info("Checking for new files on S3 to download...")
        download_s3_bucket()
        logger.info("Done checking for new files, will check in 120s...")
        gc.collect()
        sys.stdout.flush()
        time.sleep(120)
if __name__ == '__main__':
    main()
Товарищ35
источник
0

Из Документов AWS S3 (Как использовать папки в корзине S3?):

В Amazon S3 корзины и объекты являются основными ресурсами, а объекты хранятся в корзинах. Amazon S3 имеет плоскую структуру, а не иерархию, как в файловой системе. Однако для упрощения организации консоль Amazon S3 поддерживает концепцию папки как средства группировки объектов. Amazon S3 делает это с помощью префикса общего имени для объектов (то есть объекты имеют имена, начинающиеся с общей строки). Имена объектов также называются ключевыми именами.

Например, вы можете создать на консоли папку с именем photos и сохранить в ней объект с именем myphoto.jpg. Затем объект сохраняется с ключевым именем photos / myphoto.jpg, где photos / - префикс.

Чтобы загрузить все файлы из «mybucket» в текущий каталог, соблюдая эмулируемую структуру каталогов корзины (создание папок из корзины, если они еще не существуют локально):

import boto3
import os

bucket_name = "mybucket"
s3 = boto3.client("s3")
objects = s3.list_objects(Bucket = bucket_name)["Contents"]
for s3_object in objects:
    s3_key = s3_object["Key"]
    path, filename = os.path.split(s3_key)
    if len(path) != 0 and not os.path.exists(path):
        os.makedirs(path)
    if not s3_key.endswith("/"):
        download_to = path + '/' + filename if path else filename
        s3.download_file(bucket_name, s3_key, download_to)
Дарья
источник
Было бы лучше, если бы вы могли включить какое-то объяснение своего кода.
johan
1
@johan, спасибо за отзыв! Я добавил соответствующее объяснение
Дарья