Как я могу скрести быстрее

16

Работа здесь , чтобы очистить АНИ сайт , который начинается с https://xxx.xxx.xxx/xxx/1.jsonк https://xxx.xxx.xxx/xxx/1417749.jsonи записать его точно MongoDB. Для этого у меня есть следующий код:

client = pymongo.MongoClient("mongodb://127.0.0.1:27017")
db = client["thread1"]
com = db["threadcol"]
start_time = time.time()
write_log = open("logging.log", "a")
min = 1
max = 1417749
for n in range(min, max):
    response = requests.get("https:/xx.xxx.xxx/{}.json".format(str(n)))
    if response.status_code == 200:
        parsed = json.loads(response.text)
        inserted = com.insert_one(parsed)
        write_log.write(str(n) + "\t" + str(inserted) + "\n")
        print(str(n) + "\t" + str(inserted) + "\n")
write_log.close()

Но это занимает много времени, чтобы сделать задачу. Вопрос в том, как я могу ускорить этот процесс.

Тек Натх
источник
Вы сначала пытались оценить, сколько времени занимает обработка одного JSON? Предполагая, что для каждой записи требуется 300 мс, вы можете обработать все эти записи последовательно примерно за 5 дней.
Tuxdna

Ответы:

5

Asyncio также является решением, если вы не хотите использовать многопоточность

import time
import pymongo
import json
import asyncio
from aiohttp import ClientSession


async def get_url(url, session):
    async with session.get(url) as response:
        if response.status == 200:
            return await response.text()


async def create_task(sem, url, session):
    async with sem:
        response = await get_url(url, session)
        if response:
            parsed = json.loads(response)
            n = url.rsplit('/', 1)[1]
            inserted = com.insert_one(parsed)
            write_log.write(str(n) + "\t" + str(inserted) + "\n")
            print(str(n) + "\t" + str(inserted) + "\n")


async def run(minimum, maximum):
    url = 'https:/xx.xxx.xxx/{}.json'
    tasks = []
    sem = asyncio.Semaphore(1000)   # Maximize the concurrent sessions to 1000, stay below the max open sockets allowed
    async with ClientSession() as session:
        for n in range(minimum, maximum):
            task = asyncio.ensure_future(create_task(sem, url.format(n), session))
            tasks.append(task)
        responses = asyncio.gather(*tasks)
        await responses


client = pymongo.MongoClient("mongodb://127.0.0.1:27017")
db = client["thread1"]
com = db["threadcol"]
start_time = time.time()
write_log = open("logging.log", "a")
min_item = 1
max_item = 100

loop = asyncio.get_event_loop()
future = asyncio.ensure_future(run(min_item, max_item))
loop.run_until_complete(future)
write_log.close()
Frans
источник
1
Использование async работало быстрее, чем многопоточность.
Тек Нат
Спасибо за ответ. Интересный результат.
Франс
10

Есть несколько вещей, которые вы могли бы сделать:

  1. Повторное использование соединения. В соответствии с нижеприведенным тестом он примерно в 3 раза быстрее
  2. Вы можете обрабатывать несколько процессов параллельно

Параллельный код отсюда

from threading import Thread
from Queue import Queue
q = Queue(concurrent * 2)
for i in range(concurrent):
    t = Thread(target=doWork)
    t.daemon = True
    t.start()
try:
    for url in open('urllist.txt'):
        q.put(url.strip())
    q.join()
except KeyboardInterrupt:
    sys.exit(1)

Сроки из этого вопроса для многоразового подключения

>>> timeit.timeit('_ = requests.get("https://www.wikipedia.org")', 'import requests', number=100)
Starting new HTTPS connection (1): www.wikipedia.org
Starting new HTTPS connection (1): www.wikipedia.org
Starting new HTTPS connection (1): www.wikipedia.org
...
Starting new HTTPS connection (1): www.wikipedia.org
Starting new HTTPS connection (1): www.wikipedia.org
Starting new HTTPS connection (1): www.wikipedia.org
52.74904417991638
>>> timeit.timeit('_ = session.get("https://www.wikipedia.org")', 'import requests; session = requests.Session()', number=100)
Starting new HTTPS connection (1): www.wikipedia.org
15.770191192626953
keiv.fly
источник
6

Вы можете улучшить свой код по двум аспектам:

  • Использование a Session, чтобы соединение не перестраивалось при каждом запросе и оставалось открытым;

  • Использование параллелизма в вашем коде с asyncio;

Посмотрите здесь https://pawelmhm.github.io/asyncio/python/aiohttp/2016/04/22/asyncio-aiohttp.html

albestro
источник
2
Можете ли вы добавить больше деталей?
Tek Nath
4

То, что вы, вероятно, ищете, это асинхронная очистка. Я бы порекомендовал вам создать несколько пакетов URL-адресов, то есть 5 URL-адресов (постарайтесь не перебивать веб-сайт), и очищать их асинхронно. Если вы не знаете много об async, поищите в Google asyncio. Я надеюсь, что смогу помочь вам :)

Т Пайпер
источник
1
Можете ли вы добавить еще некоторые детали.
Tek Nath
3

Попробуйте разделить запросы на части и использовать операцию массовой записи MongoDB.

  • сгруппировать запросы (100 запросов на группу)
  • Итерация по группам
  • Используйте асинхронную модель запроса для извлечения данных (URL в группе)
  • Обновите БД после завершения группы (операция массовой записи)

Это может сэкономить много времени следующими способами: * Задержка записи MongoDB * Задержка синхронного сетевого вызова.

Но не увеличивайте количество параллельных запросов (размер чанка), это увеличит сетевую нагрузку на сервер, и сервер может подумать, что это DDoS-атака.

  1. https://api.mongodb.com/python/current/examples/bulk.html
thuva4
источник
1
Можете ли вы помочь с кодом для группировки запросов и групповой выборки
Tek Nath
3

Предполагая, что вы не будете заблокированы API и что нет ограничений по скорости, этот код должен сделать процесс в 50 раз быстрее (возможно, больше, потому что все запросы теперь отправляются с использованием одного и того же сеанса).

import pymongo
import threading

client = pymongo.MongoClient("mongodb://127.0.0.1:27017")
db = client["thread1"]
com = db["threadcol"]
start_time = time.time()
logs=[]

number_of_json_objects=1417750
number_of_threads=50

session=requests.session()

def scrap_write_log(session,start,end):
    for n in range(start, end):
        response = session.get("https:/xx.xxx.xxx/{}.json".format(n))
        if response.status_code == 200:
            try:
                logs.append(str(n) + "\t" + str(com.insert_one(json.loads(response.text))) + "\n")
                print(str(n) + "\t" + str(inserted) + "\n")
            except:
                logs.append(str(n) + "\t" + "Failed to insert" + "\n")
                print(str(n) + "\t" + "Failed to insert" + "\n")

thread_ranges=[[x,x+number_of_json_objects//number_of_threads] for x in range(0,number_of_json_objects,number_of_json_objects//number_of_threads)]

threads=[threading.Thread(target=scrap_write_log, args=(session,start_and_end[0],start_and_end[1])) for start_and_end in thread_ranges]

for thread in threads:
    thread.start()
for thread in threads:
    thread.join()

with open("logging.log", "a") as f:
    for line in logs:
        f.write(line)
Ибрагим Дар
источник
2

У меня случился такой же вопрос много лет назад. Меня никогда не устраивают основанные на Python ответы, которые довольно медленные или слишком сложные. После того, как я переключаюсь на другие зрелые инструменты, скорость становится высокой, и я никогда не возвращаюсь.

В последнее время я использую такие шаги для ускорения процесса следующим образом.

  1. генерировать кучу URL в TXT
  2. использовать aria2c -x16 -d ~/Downloads -i /path/to/urls.txtдля загрузки этих файлов
  3. разбирать локально

Это самый быстрый процесс, который я когда-либо делал.

Что касается очистки веб-страниц, я даже загружаю необходимый * .html вместо того, чтобы посещать страницу по одному, что на самом деле не имеет значения. Когда вы нажимаете на страницу, с помощью таких инструментов Python, как requestsили scrapyили urllib, она все равно кэшируется и загружает весь веб-контент для вас.

анонимный
источник
1

Сначала создайте список всех ссылок, потому что все они одинаковые, просто измените его.

list_of_links=[]
for i in range(1,1417749):
    list_of_links.append("https:/xx.xxx.xxx/{}.json".format(str(i)))

t_no=2
for i in range(0, len(list_of_links), t_no):
    all_t = []
    twenty_links = list_of_links[i:i + t_no]
    for link in twenty_links:
        obj_new = Demo(link,)
        t = threading.Thread(target=obj_new.get_json)
        t.start()
        all_t.append(t)
    for t in all_t:
        t.join()

class Demo:
    def __init__(self, url):
        self.json_url = url

def get_json(self):
    try:
       your logic
    except Exception as e:
       print(e)

Просто увеличивая или уменьшая t_no, вы можете изменить ни одного из потоков.

Мобин Альхассан
источник