Сохранение Dataframe в CSV непосредственно в s3 Python

126

У меня есть DataFrame pandas, который я хочу загрузить в новый файл CSV. Проблема в том, что я не хочу сохранять файл локально перед переносом на s3. Есть ли какой-нибудь метод, например to_csv, для прямой записи фрейма данных в s3? Я использую boto3.
Вот что у меня есть на данный момент:

import boto3
s3 = boto3.client('s3', aws_access_key_id='key', aws_secret_access_key='secret_key')
read_file = s3.get_object(Bucket, Key)
df = pd.read_csv(read_file['Body'])

# Make alterations to DataFrame

# Then export DataFrame to CSV through direct transfer to s3
user2494275
источник
3
df.to_csv('s3://mybucket/dfs/somedf.csv'), stackoverflow.com/a/56275519/908886 для получения дополнительной информации.
Питер Берг

Ответы:

160

Ты можешь использовать:

from io import StringIO # python3; python2: BytesIO 
import boto3

bucket = 'my_bucket_name' # already created on S3
csv_buffer = StringIO()
df.to_csv(csv_buffer)
s3_resource = boto3.resource('s3')
s3_resource.Object(bucket, 'df.csv').put(Body=csv_buffer.getvalue())
Стефан
источник
10
Если это большой файл, что он делает с памятью ...?
citynorman 03
2
Если файл больше, чем имеющаяся у вас оперативная память, действие завершится ошибкой и будет исключение (не знаю, какое именно). Это следует принять как ответ
Эран Моше
5
У меня TypeError: unicode argument expected, got 'str'ошибка при использовании StringIO. Я использовал, BytesIOи он работал отлично. Примечание: это было в Python 2.7
Абхишек Упадхьяя
1
что такое bucketобъект? как ты это создал?
Чарльз Чоу
1
bucketэто место, где вы храните объекты на S3. Код предполагает, что вы уже создали место назначения (думаю: каталог), где это хранить. См. Документы S3
Стефан
68

Вы можете напрямую использовать путь S3. Я использую Pandas 0.24.1

In [1]: import pandas as pd

In [2]: df = pd.DataFrame( [ [1, 1, 1], [2, 2, 2] ], columns=['a', 'b', 'c'])

In [3]: df
Out[3]:
   a  b  c
0  1  1  1
1  2  2  2

In [4]: df.to_csv('s3://experimental/playground/temp_csv/dummy.csv', index=False)

In [5]: pd.__version__
Out[5]: '0.24.1'

In [6]: new_df = pd.read_csv('s3://experimental/playground/temp_csv/dummy.csv')

In [7]: new_df
Out[7]:
   a  b  c
0  1  1  1
1  2  2  2

Release Note:

Обработка файлов S3

pandas теперь использует s3fs для обработки соединений S3. Это не должно нарушать какой-либо код. Однако, поскольку s3fs не является обязательной зависимостью, вам нужно будет установить ее отдельно, как и boto в предыдущих версиях pandas. GH11915 .

yardstick17
источник
7
это определенно самый простой ответ сейчас, он использует s3fs за кулисами, поэтому вам нужно добавить его в свой файл requirements.txt
JD D
1
Мне нравится, что это просто, но похоже, что на самом деле это не работает, поскольку я продолжаю получать следующую ошибку NoCredentialsError: Unable to locate credentials. Какие-либо предложения?
CathyQian
1
Я могу подтвердить, что это не работает с pandas <= 0.23.4, поэтому обязательно обновитесь до pandas 0.24
Гуидо
1
Это ошибка, которую я вижу, когда пытаюсь использовать команду to_csv TypeError: аргумент write () 1 должен быть unicode, а не str
Raj
13
Я использую pandas 0.24.2 и получаю NotImplementedError: Text mode not supported, use mode='wb' and manage bytes. какие-либо предложения?
Биньямин Эвен
57

Мне нравится s3fs который позволяет использовать s3 (почти) как локальную файловую систему.

Ты можешь сделать это:

import s3fs

bytes_to_write = df.to_csv(None).encode()
fs = s3fs.S3FileSystem(key=key, secret=secret)
with fs.open('s3://bucket/path/to/file.csv', 'wb') as f:
    f.write(bytes_to_write)

s3fsподдерживает только rbи wbрежимы открытия файла, поэтому я и сделал это bytes_to_write.

michcio1234
источник
Большой! Как я могу получить URL-адрес файла, используя тот же модуль s3fs?
M.Zaman
Я искал URL-адрес, по которому я могу загрузить написанный файл, в любом случае я получаю его через S3FileSystem. Спасибо
M.Zaman
это то, что я использую; Спасибо. Мне любопытно, почему pd.read_csv (<s3path>) работает так, как ожидалось, но для написания мы должны использовать эту работу ... за исключением случая, когда я пишу непосредственно в корзину s3, в которой находится мой jupyter.
Рене
@ michcio1234 как я могу сделать то же самое в режиме добавления? Мне нужно добавить данные в существующий csv на s3
j '
@j ' s3fs, похоже, не поддерживает режим добавления.
michcio1234 06
43

Это более свежий ответ:

import s3fs

s3 = s3fs.S3FileSystem(anon=False)

# Use 'w' for py3, 'wb' for py2
with s3.open('<bucket-name>/<filename>.csv','w') as f:
    df.to_csv(f)

Проблема с StringIO в том, что он разъедает вашу память. С помощью этого метода вы передаете файл в s3, а не конвертируете его в строку, а затем записываете в s3. Хранение фрейма данных pandas и его строковой копии в памяти кажется очень неэффективным.

Если вы работаете в момент времени ec2, вы можете назначить ему роль IAM, чтобы разрешить запись в s3, поэтому вам не нужно передавать учетные данные напрямую. Однако вы также можете подключиться к корзине, передав учетные данные вS3FileSystem() функции. См. Документацию: https://s3fs.readthedocs.io/en/latest/

erncyp
источник
По какой-то причине, когда я сделал это, каждая строка была пропущена в выходном CSV
kjmerf
хмм. не уверен, почему это могло произойти. возможно, попробуйте с другим pandas df, чтобы увидеть, все еще возникает проблема? Если ваша версия pandas поддерживает это, попробуйте ответ @ amit-kushwaha, в котором вы передаете URL-адрес s3 напрямую to_csv(). кажется более чистой реализацией.
erncyp 04
@erncyp Кажется, у меня появляется ошибка: botocore.exceptions.ClientError: An error occurred (AccessDenied) when calling the PutObject operation: Access Denied ... Я даже сделал сегмент ПУБЛИЧНОЕ ЧТЕНИЕ, и я добавил следующие действия под моим конкретным пользователем IAM учетной записи в политику "Action": [ "s3:PutObject", "s3:PutObjectAcl", "s3:GetObject", "s3:GetObjectAcl", "s3:DeleteObject" ]
сегмента
похоже, у вас не хватает разрешений? Обязательно прикрепите разрешения на чтение и запись S3 к используемой вами роли IAM
erncyp
@erncyp У меня есть политика AdministratorAccess, привязанная к моему пользователю IAM, поэтому теоретически я должен уметь нормально читать / писать ... Как ни странно, я могу писать нормально, когда использую следующую функцию, которую я сделал, используя другого пользователя StackOverflow совет (кстати, точка с запятой - конец строки, так как я не знаю, как форматировать в разделе комментариев):def send_to_bucket(df, fn_out, bucketname): csv_buffer = StringIO(); df.to_csv(csv_buffer); s3_resource = boto3.resource('s3'); s3_resource.Object(bucketname, fn_out).put(Body=csv_buffer.getvalue());
ajoros
13

Если вы передадите Noneпервый аргумент, to_csv()данные будут возвращены в виде строки. Оттуда это простой шаг, чтобы загрузить его на S3 за один раз.

Также должна быть возможность передать StringIOобъект to_csv(), но с использованием строки будет проще.

mhawke
источник
Как будет проще? Как правильно это делать?
Эран Моше
@EranMoshe: в любом случае будет работать правильно, но, очевидно, проще перейти Noneк to_csv()возвращенной строке и использовать ее, чем создать StringIOобъект и затем прочитать данные обратно.
mhawke
Как ленивый программист я поступил именно так. И вы имели в виду, что программисту, который пишет меньше кода, проще:>
Эран Моше
3

Я обнаружил, что это можно сделать с помощью clientтакже, а не только resource.

from io import StringIO
import boto3
s3 = boto3.client("s3",\
                  region_name=region_name,\
                  aws_access_key_id=aws_access_key_id,\
                  aws_secret_access_key=aws_secret_access_key)
csv_buf = StringIO()
df.to_csv(csv_buf, header=True, index=False)
csv_buf.seek(0)
s3.put_object(Bucket=bucket, Body=csv_buf.getvalue(), Key='path/test.csv')
Harry_pb
источник
2

Вы также можете использовать AWS Data Wrangler :

import awswrangler

session = awswrangler.Session()
session.pandas.to_csv(
    dataframe=df,
    path="s3://...",
)

Обратите внимание, что он будет разделен на несколько частей, поскольку загружает его параллельно.

Gabra
источник
0

поскольку вы используете boto3.client(), попробуйте:

import boto3
from io import StringIO #python3 
s3 = boto3.client('s3', aws_access_key_id='key', aws_secret_access_key='secret_key')
def copy_to_s3(client, df, bucket, filepath):
    csv_buf = StringIO()
    df.to_csv(csv_buf, header=True, index=False)
    csv_buf.seek(0)
    client.put_object(Bucket=bucket, Body=csv_buf.getvalue(), Key=filepath)
    print(f'Copy {df.shape[0]} rows to S3 Bucket {bucket} at {filepath}, Done!')

copy_to_s3(client=s3, df=df_to_upload, bucket='abc', filepath='def/test.csv')
jerrytim
источник
-1

Я нашел очень простое решение, которое, похоже, работает:

s3 = boto3.client("s3")

s3.put_object(
    Body=open("filename.csv").read(),
    Bucket="your-bucket",
    Key="your-key"
)

Надеюсь, это поможет !

Антуан Крайнц
источник
-5

Я прочитал csv с двумя столбцами из ведра s3 и содержимое файла csv, которое я поместил в pandas dataframe.

Пример:

config.json

{
  "credential": {
    "access_key":"xxxxxx",
    "secret_key":"xxxxxx"
}
,
"s3":{
       "bucket":"mybucket",
       "key":"csv/user.csv"
   }
}

cls_config.json

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import os
import json

class cls_config(object):

    def __init__(self,filename):

        self.filename = filename


    def getConfig(self):

        fileName = os.path.join(os.path.dirname(__file__), self.filename)
        with open(fileName) as f:
        config = json.load(f)
        return config

cls_pandas.py

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import pandas as pd
import io

class cls_pandas(object):

    def __init__(self):
        pass

    def read(self,stream):

        df = pd.read_csv(io.StringIO(stream), sep = ",")
        return df

cls_s3.py

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import boto3
import json

class cls_s3(object):

    def  __init__(self,access_key,secret_key):

        self.s3 = boto3.client('s3', aws_access_key_id=access_key, aws_secret_access_key=secret_key)

    def getObject(self,bucket,key):

        read_file = self.s3.get_object(Bucket=bucket, Key=key)
        body = read_file['Body'].read().decode('utf-8')
        return body

test.py

#!/usr/bin/env python
# -*- coding: utf-8 -*-

from cls_config import *
from cls_s3 import *
from cls_pandas import *

class test(object):

    def __init__(self):
        self.conf = cls_config('config.json')

    def process(self):

        conf = self.conf.getConfig()

        bucket = conf['s3']['bucket']
        key = conf['s3']['key']

        access_key = conf['credential']['access_key']
        secret_key = conf['credential']['secret_key']

        s3 = cls_s3(access_key,secret_key)
        ob = s3.getObject(bucket,key)

        pa = cls_pandas()
        df = pa.read(ob)

        print df

if __name__ == '__main__':
    test = test()
    test.process()
Джамир Хосимар Уаман Кампос
источник
4
пожалуйста, не просто публикуйте решение, добавьте его объяснение.
sjaustirni
Есть ли преимущество в создании такого сложного (для новичка в Python) решения?
Хавьер Лопес Томас,
1
Это читает файл из s3, вопрос был в том, как записать df в s3.
Дамиан Саттертуэйт-Филлипс