Какой самый быстрый способ отправить 100 000 HTTP-запросов в Python?

287

Я открываю файл с 100 000 URL. Мне нужно отправить HTTP-запрос на каждый URL и распечатать код состояния. Я использую Python 2.6 и до сих пор смотрел на многие запутанные способы, которыми Python реализует многопоточность / параллелизм. Я даже посмотрел на библиотеку Python Concurrence , но не могу понять, как правильно написать эту программу. Кто-нибудь сталкивался с подобной проблемой? Я думаю, что в целом мне нужно знать, как выполнить тысячи задач в Python как можно быстрее - я полагаю, это означает «одновременно».

IgorGanapolsky
источник
47
Убедитесь, что вы выполняете только запрос HEAD (чтобы не загружать весь документ). См .: stackoverflow.com/questions/107405/…
Tarnay Kálmán
5
Отличная мысль, Кальми. Если все, что хочет Игорь, это статус запроса, то эти 100 000 запросов будут выполняться намного, намного, намного быстрее. Гораздо быстрее
Адам Кроссленд
1
Вам не нужны темы для этого; наиболее эффективный способ - использовать асинхронную библиотеку, такую ​​как Twisted.
jemfinch
3
Вот примеры кода на основе Gevent, Twisted и Asyncio (протестировано на 1000000 запросов)
jfs
4
@ TarnayKálmán позволяет requests.getи requests.head(т.е. запрос страницы против запроса головы) возвращать разные коды статуса, так что это не лучший совет
AlexG

Ответы:

200

Twistedless решение:

from urlparse import urlparse
from threading import Thread
import httplib, sys
from Queue import Queue

concurrent = 200

def doWork():
    while True:
        url = q.get()
        status, url = getStatus(url)
        doSomethingWithResult(status, url)
        q.task_done()

def getStatus(ourl):
    try:
        url = urlparse(ourl)
        conn = httplib.HTTPConnection(url.netloc)   
        conn.request("HEAD", url.path)
        res = conn.getresponse()
        return res.status, ourl
    except:
        return "error", ourl

def doSomethingWithResult(status, url):
    print status, url

q = Queue(concurrent * 2)
for i in range(concurrent):
    t = Thread(target=doWork)
    t.daemon = True
    t.start()
try:
    for url in open('urllist.txt'):
        q.put(url.strip())
    q.join()
except KeyboardInterrupt:
    sys.exit(1)

Это немного быстрее, чем скрученное решение и использует меньше ресурсов процессора.

Тарнай Кальман
источник
10
@ Калми, почему вы устанавливаете очередь в concurrent*2?
Марсель Уилсон
8
Не забудьте закрыть соединение conn.close() . Открытие слишком большого количества http-соединений может в какой-то момент остановить ваш скрипт и съесть память.
Амир Аднан
4
@hyh, Queueмодуль был переименован queueв Python 3. Это код Python 2.
Тарнай Кальман
3
Насколько быстрее вы можете идти, если хотите каждый раз общаться с тем же сервером, сохраняя соединение? Может ли это быть сделано между потоками или с одним постоянным соединением на поток?
mdurant
2
@mptevsion, если вы используете CPython, вы можете (например) просто заменить «print status, url» на «my_global_list.append ((status, url))». (Большинство операций со списками) неявно поточно-ориентированы в CPython (и некоторых других реализациях Python) благодаря GIL, так что это безопасно.
Тарнай Кальман
54

Решение с использованием асинхронной сетевой библиотеки торнадо

from tornado import ioloop, httpclient

i = 0

def handle_request(response):
    print(response.code)
    global i
    i -= 1
    if i == 0:
        ioloop.IOLoop.instance().stop()

http_client = httpclient.AsyncHTTPClient()
for url in open('urls.txt'):
    i += 1
    http_client.fetch(url.strip(), handle_request, method='HEAD')
ioloop.IOLoop.instance().start()
Мгер
источник
7
Этот код использует неблокирующий сетевой ввод-вывод и не имеет никаких ограничений. Он может масштабироваться до десятков тысяч открытых соединений. Он будет работать в одном потоке, но будет быстрее, чем любое решение для потоков. Checkout неблокирующая I / O en.wikipedia.org/wiki/Asynchronous_I/O
Мгер
1
Можете ли вы объяснить, что здесь происходит с глобальной переменной I? Какая-то проверка ошибок?
LittleBobbyTables
4
Это счетчик для определения того, когда выходить из `` ioloop` - когда закончите.
Майкл Дорнер
1
@AndrewScottEvans предполагал, что вы используете Python 2.7 и прокси
Dejell
5
@Guy Avraham Удачи в получении помощи по плану DDoS.
Уолтер
51

Все изменилось с 2010 года, когда он был опубликован, и я не пробовал все остальные ответы, но попробовал несколько, и я нашел, что это работает лучше всего для меня, используя python3.6.

Мне удалось получить около 150 уникальных доменов в секунду, работающих на AWS.

import pandas as pd
import concurrent.futures
import requests
import time

out = []
CONNECTIONS = 100
TIMEOUT = 5

tlds = open('../data/sample_1k.txt').read().splitlines()
urls = ['http://{}'.format(x) for x in tlds[1:]]

def load_url(url, timeout):
    ans = requests.head(url, timeout=timeout)
    return ans.status_code

with concurrent.futures.ThreadPoolExecutor(max_workers=CONNECTIONS) as executor:
    future_to_url = (executor.submit(load_url, url, TIMEOUT) for url in urls)
    time1 = time.time()
    for future in concurrent.futures.as_completed(future_to_url):
        try:
            data = future.result()
        except Exception as exc:
            data = str(type(exc))
        finally:
            out.append(data)

            print(str(len(out)),end="\r")

    time2 = time.time()

print(f'Took {time2-time1:.2f} s')
print(pd.Series(out).value_counts())
Глен Томпсон
источник
1
Я спрашиваю только потому, что не знаю, но можно ли заменить этот фьючерсный контент на async / await?
TankorSmash
1
Может, но я нашел выше, чтобы работать лучше. Вы можете использовать aiohttp, но он не является частью стандартной библиотеки и сильно меняется. Это работает, но я просто не нашел, чтобы это работало также. Я получаю более высокий уровень ошибок, когда использую его, и на всю жизнь я не могу заставить его работать так же, как и параллельные фьючерсы, хотя в теории кажется, что он должен работать лучше, см .: stackoverflow.com/questions/45800857/… если у вас получится хорошо, напишите свой ответ, чтобы я мог проверить его.
Глен Томпсон
1
Это клевая штука, но я думаю, что это намного чище, чтобы положить time1 = time.time()вверху цикла for и time2 = time.time()сразу после цикла for.
Мэтт М.
Я проверил ваш фрагмент, как-то он выполняется дважды. Я делаю что-то неправильно? Или это должно быть запущено дважды? Если это последний случай, вы также можете помочь мне понять, как это срабатывает дважды?
Ронни
1
Это не должно бежать дважды. Не уверен, почему вы это видите.
Глен Томпсон
40

Темы абсолютно не ответ здесь. Они обеспечат узкие места как процесса, так и ядра, а также ограничения пропускной способности, которые неприемлемы, если общая цель - «самый быстрый путь».

Немного twistedи его асинхронный HTTPклиент даст вам гораздо лучшие результаты.

ironfroggy
источник
ironfroggy: я склоняюсь к вашим чувствам. Я пытался реализовать свое решение с помощью потоков и очередей (для автоматических мьютексов), но можете ли вы представить, сколько времени потребуется, чтобы заполнить очередь 100 000 вещей ?? Я все еще играю с различными вариантами и предложениями от всех в этой теме, и, возможно, Twisted будет хорошим решением.
Игорь Ганапольский
2
Вы можете избежать заполнения очереди 100 тысячами вещей. Просто обрабатывайте элементы по одному из вашего ввода, а затем запускайте поток для обработки запроса, соответствующего каждому элементу. (Как я опишу ниже, используйте поток запуска для запуска потоков HTTP-запросов, когда число потоков ниже некоторого порогового значения. Заставьте потоки записать результаты в URL-адрес, определяющий сложное соответствие, или добавьте кортежи в список.)
Эрик Гарнизон
ironfroggy: Кроме того, мне интересно, какие узкие места вы обнаружили с помощью потоков Python? А как потоки Python взаимодействуют с ядром ОС?
Эрик Гаррисон
Убедитесь, что вы установили реактор epoll; в противном случае вы будете использовать select / poll, и это будет очень медленно. Кроме того, если вы на самом деле попытаетесь одновременно открыть 100 000 подключений (при условии, что ваша программа написана таким образом, а URL-адреса расположены на разных серверах), вам нужно настроить свою ОС, чтобы она не закончилась файловых дескрипторов, эфемерных портов и т. д. (вероятно, проще просто убедиться, что у вас не более, скажем, 10 000 ожидающих соединений одновременно).
Марк Ноттингем
Эрикг: Вы рекомендовали отличную идею. Тем не менее, лучший результат, которого я смог достичь с 200 нитями, был ок. 6 минут Я уверен, что есть способы сделать это за меньшее время ... Марк Н.: Если я решил пойти по Twisted, тогда реактор Эполл, безусловно, полезен. Однако, если мой скрипт будет запускаться с нескольких машин, не потребует ли это установки Twisted на КАЖДУЮ машину? Я не знаю, смогу ли я убедить моего начальника пойти по этому пути ...
Игорь Ганапольский
21

Я знаю, что это старый вопрос, но в Python 3.7 вы можете сделать это, используя asyncioи aiohttp.

import asyncio
import aiohttp
from aiohttp import ClientSession, ClientConnectorError

async def fetch_html(url: str, session: ClientSession, **kwargs) -> tuple:
    try:
        resp = await session.request(method="GET", url=url, **kwargs)
    except ClientConnectorError:
        return (url, 404)
    return (url, resp.status)

async def make_requests(urls: set, **kwargs) -> None:
    async with ClientSession() as session:
        tasks = []
        for url in urls:
            tasks.append(
                fetch_html(url=url, session=session, **kwargs)
            )
        results = await asyncio.gather(*tasks)

    for result in results:
        print(f'{result[1]} - {str(result[0])}')

if __name__ == "__main__":
    import pathlib
    import sys

    assert sys.version_info >= (3, 7), "Script requires Python 3.7+."
    here = pathlib.Path(__file__).parent

    with open(here.joinpath("urls.txt")) as infile:
        urls = set(map(str.strip, infile))

    asyncio.run(make_requests(urls=urls))

Вы можете прочитать больше об этом и увидеть пример здесь .

Мариус Стенеску
источник
Это похоже на C # async / await и Kotlin Coroutines?
Игорь Ганапольский
@IgorGanapolsky, да, это очень похоже на C # async / await. Я не знаком с Kotlin Coroutines.
Мариус
@sandyp, я не уверен, что это работает, но если вы хотите попробовать, вам придется использовать UnixConnector для aiohttp. Подробнее читайте здесь: docs.aiohttp.org/en/stable/client_reference.html#connectors .
Мариус
Спасибо @ MariusStănescu. Это именно то, что я использовал.
sandyp
+1 за показ asyncio.gather (* задачи). вот один такой фрагмент, который я использовал: urls= [fetch(construct_fetch_url(u),idx) for idx, u in enumerate(some_URI_list)] results = await asyncio.gather(*urls)
Ашвини Кумар
19

Используйте grequests , это комбинация запросов + модуль Gevent.

GRequests позволяет использовать запросы с Gevent, чтобы легко выполнять асинхронные HTTP-запросы.

Использование простое:

import grequests

urls = [
   'http://www.heroku.com',
   'http://tablib.org',
   'http://httpbin.org',
   'http://python-requests.org',
   'http://kennethreitz.com'
]

Создайте набор неотправленных запросов:

>>> rs = (grequests.get(u) for u in urls)

Отправьте их все одновременно:

>>> grequests.map(rs)
[<Response [200]>, <Response [200]>, <Response [200]>, <Response [200]>, <Response [200]>]
Акшай Пратап Сингх
источник
7
Gevent теперь поддерживает Python 3
Бенджамин Тоуег
15
grequests не является частью обычных запросов и, по-видимому, в основном не имеет смысла
Том
8

Хороший подход к решению этой проблемы заключается в том, чтобы сначала написать код, необходимый для получения одного результата, а затем включить многопоточный код для распараллеливания приложения.

В идеальном мире это просто означало бы одновременный запуск 100 000 потоков, которые выводят свои результаты в словарь или список для последующей обработки, но на практике вы ограничены в количестве параллельных HTTP-запросов, которые вы можете выполнить таким способом. Локально, у вас есть ограничения на количество сокетов, которые вы можете открывать одновременно, сколько потоков исполнения допускает ваш интерпретатор Python. Удаленно, вы можете быть ограничены в количестве одновременных подключений, если все запросы направлены к одному серверу или ко многим. Эти ограничения, вероятно, потребуют от вас написания сценария таким образом, чтобы опрашивать только небольшую часть URL-адресов одновременно (100, как упоминалось в другом постере, вероятно, является приличным размером пула потоков, хотя вы можете обнаружить, что вы может успешно развернуть еще много).

Вы можете использовать этот шаблон проектирования для решения вышеуказанной проблемы:

  1. Запустите поток, который запускает новые потоки запросов, пока число текущих запущенных потоков (вы можете отслеживать их с помощью threading.active_count () или путем помещения объектов потока в структуру данных) не будет> = вашего максимального количества одновременных запросов (скажем, 100) затем спит в течение короткого перерыва. Этот поток должен завершиться, когда больше нет URL-адресов для обработки. Таким образом, поток будет продолжать просыпаться, запускать новые потоки и спать до тех пор, пока вы не закончите.
  2. Пусть потоки запроса сохранят свои результаты в некоторой структуре данных для последующего поиска и вывода. Если структура, в которой вы сохраняете результаты, представляет собой listили dictв CPython, вы можете безопасно добавлять или вставлять уникальные элементы из ваших потоков без блокировок , но если вы записываете в файл или требуете более сложного взаимодействия данных между потоками, вам следует использовать блокировка взаимного исключения для защиты этого государства от коррупции .

Я хотел бы предложить вам использовать модуль потоков . Вы можете использовать его для запуска и отслеживания запущенных потоков. Поддержка потоков в Python отсутствует, но описание вашей проблемы предполагает, что ее вполне достаточно для ваших нужд.

И, наконец, если вы хотите , чтобы увидеть довольно просто приложение параллельного сетевого приложения , написанного в Python, проверить ssh.py . Это небольшая библиотека, которая использует потоки Python для распараллеливания многих SSH-соединений. Дизайн достаточно близок к вашим требованиям, поэтому вы можете найти его хорошим ресурсом.

Эрик Гаррисон
источник
1
Эрикг: было бы целесообразно добавить очередь в ваше уравнение (для блокировки взаимного исключения)? Я подозреваю, что GIL Python не предназначен для игры с тысячами потоков.
Игорь Ганапольский
Зачем вам нужна блокировка взаимного исключения, чтобы предотвратить генерацию слишком большого количества потоков? Я подозреваю, что неправильно понял термин. Вы можете отслеживать запущенные потоки в очереди потоков, удаляя их по завершении и добавляя больше к указанному пределу потока. Но в простом случае, таком как рассматриваемый, вы также можете просто наблюдать за количеством активных потоков в текущем процессе Python, подождать, пока он не упадет ниже порогового значения, и запустить больше потоков до порогового значения, как описано. Я думаю, вы могли бы считать это неявной блокировкой, но явные блокировки не требуются.
Эрик Гаррисон
erikg: разве несколько потоков не разделяют состояние? На странице 305 в книге О'Рейли «Системное администрирование Python для Unix и Linux» говорится: «... использование потоков без очередей делает его более сложным, чем реально могут справиться многие. Это гораздо лучшая идея - всегда использовать очереди модуль, если вам нужно использовать потоки. Почему? Потому что модуль очереди также устраняет необходимость явной защиты данных с помощью мьютексов, поскольку сама очередь уже внутренне защищена мьютексом. " Опять же, я приветствую вашу точку зрения на это.
Игорь Ганапольский
Игорь: Вы абсолютно правы, что вам следует использовать замок. Я отредактировал пост, чтобы отразить это. Тем не менее, практический опыт работы с Python показывает, что вам не нужно блокировать структуры данных, которые вы изменяете атомарно, из ваших потоков, например, list.append или путем добавления хеш-ключа. Я полагаю, что причина в GIL, который обеспечивает такие операции, как list.append, с определенной степенью атомарности. В настоящее время я выполняю тест, чтобы проверить это (используйте потоки 10k, чтобы добавить номера 0-9999 в список, проверьте, что все добавления работают). После почти 100 итераций тест не прошел.
Эрик Гаррисон
Игорь: мне задают еще один вопрос на эту тему: stackoverflow.com/questions/2740435/…
Эрик Гаррисон,
7

Если вы хотите добиться максимальной производительности, возможно, вы захотите использовать асинхронный ввод-вывод, а не потоки. Издержки, связанные с тысячами потоков ОС, нетривиальны, и переключение контекста в интерпретаторе Python добавляет еще больше. Работа с потоками, безусловно, сделает работу, но я подозреваю, что асинхронный маршрут обеспечит лучшую общую производительность.

В частности, я бы предложил асинхронный веб-клиент в библиотеке Twisted ( http://www.twistedmatrix.com ). У него, по общему признанию, крутая кривая обучения, но его довольно легко использовать, если вы хорошо разбираетесь в стиле асинхронного программирования Twisted.

Инструкции по использованию интерфейса асинхронного веб-клиента Twisted доступны по адресу:

http://twistedmatrix.com/documents/current/web/howto/client.html

Ракис
источник
Ракис: В настоящее время я изучаю асинхронный и неблокирующий ввод-вывод. Я должен изучить это лучше, прежде чем я это осуществлю. Один комментарий, который я хотел бы сделать в вашем посте, заключается в том, что невозможно (по крайней мере, в моем дистрибутиве Linux) создавать «тысячи потоков ОС». Существует максимальное количество потоков, которые Python позволит вам создать до прерывания программы. А в моем случае (на CentOS 5) максимальное количество потоков - 303.
Игорь Ганапольский
Это хорошо знать. Я никогда не пытался порождать в Python больше, чем несколько, но я бы ожидал, что смогу создать больше, чем это, прежде чем его взорвать.
Ракис
6

Решение:

from twisted.internet import reactor, threads
from urlparse import urlparse
import httplib
import itertools


concurrent = 200
finished=itertools.count(1)
reactor.suggestThreadPoolSize(concurrent)

def getStatus(ourl):
    url = urlparse(ourl)
    conn = httplib.HTTPConnection(url.netloc)   
    conn.request("HEAD", url.path)
    res = conn.getresponse()
    return res.status

def processResponse(response,url):
    print response, url
    processedOne()

def processError(error,url):
    print "error", url#, error
    processedOne()

def processedOne():
    if finished.next()==added:
        reactor.stop()

def addTask(url):
    req = threads.deferToThread(getStatus, url)
    req.addCallback(processResponse, url)
    req.addErrback(processError, url)   

added=0
for url in open('urllist.txt'):
    added+=1
    addTask(url.strip())

try:
    reactor.run()
except KeyboardInterrupt:
    reactor.stop()

Testtime:

[kalmi@ubi1:~] wc -l urllist.txt
10000 urllist.txt
[kalmi@ubi1:~] time python f.py > /dev/null 

real    1m10.682s
user    0m16.020s
sys 0m10.330s
[kalmi@ubi1:~] head -n 6 urllist.txt
http://www.google.com
http://www.bix.hu
http://www.godaddy.com
http://www.google.com
http://www.bix.hu
http://www.godaddy.com
[kalmi@ubi1:~] python f.py | head -n 6
200 http://www.bix.hu
200 http://www.bix.hu
200 http://www.bix.hu
200 http://www.bix.hu
200 http://www.bix.hu
200 http://www.bix.hu

Pingtime:

bix.hu is ~10 ms away from me
godaddy.com: ~170 ms
google.com: ~30 ms
Тарнай Кальман
источник
6
Использование Twisted в качестве пула потоков игнорирует большинство преимуществ, которые вы можете получить от него. Вместо этого вы должны использовать асинхронный HTTP-клиент.
Жан-Поль Кальдероне
1

Использование пула потоков - хороший вариант, и это будет довольно просто. К сожалению, в python нет стандартной библиотеки, которая делает пулы потоков очень простыми. Но вот достойная библиотека, с которой стоит начать: http://www.chrisarndt.de/projects/threadpool/

Пример кода с их сайта:

pool = ThreadPool(poolsize)
requests = makeRequests(some_callable, list_of_args, callback)
[pool.putRequest(req) for req in requests]
pool.wait()

Надеюсь это поможет.

Кевин Виския
источник
Я предлагаю вам указать q_size для ThreadPool следующим образом: ThreadPool (poolsize, q_size = 1000), чтобы у вас не было 100000 объектов WorkRequest в памяти. «Если q_size> 0, размер очереди рабочих запросов ограничен, и пул потоков блокируется, когда очередь заполнена, и пытается добавить в нее больше рабочих запросов (см. putRequestМетод), если только вы не используете положительное timeoutзначение для putRequest».
Tarnay Kálmán
Пока что я пытаюсь реализовать решение для пула потоков - как было предложено. Однако я не понимаю список параметров в функции makeRequests. Что такое some_callable, list_of_args, обратный вызов? Возможно, если бы я увидел настоящий фрагмент кода, который бы мне помог. Я удивлен, что автор этой библиотеки не опубликовал никаких примеров.
Игорь Ганапольский
some_callable - ваша функция, в которой выполняется вся ваша работа (подключение к http-серверу). list_of_args - аргументы, которые будут переданы в some_callabe. обратный вызов - это функция, которая будет вызываться после завершения рабочего потока. Он принимает два аргумента: рабочий объект (на самом деле это не нужно беспокоить себя) и результаты, полученные рабочим.
Кевин Виския
1

Создайте epollобъект,
откройте множество клиентских TCP-сокетов,
настройте их буферы отправки так, чтобы они были немного больше заголовка запроса,
отправьте заголовок запроса - это должно быть немедленно, просто поместив в буфер, зарегистрируйте сокет в epollобъекте,
сделайте .pollна epollобъекте,
прочитайте сначала 3 байты из каждого сокета from .poll,
запишите их, sys.stdoutзатем \n(не очищайте), закройте клиентский сокет.

Ограничить количество открытых сокетов одновременно - обрабатывать ошибки при создании сокетов. Создайте новый сокет, только если другой закрыт.
Настройте пределы ОС.
Попробуйте разделить несколько (не много) процессов: это может помочь немного эффективнее использовать процессор.

Георгий Советов
источник
@IgorGanapolsky Должно быть. Я был бы удивлен в противном случае. Но это, безусловно, требует экспериментов.
Георгий Советов
0

В вашем случае многопоточность, вероятно, сработает, поскольку вы, скорее всего, будете тратить большую часть времени на ожидание ответа. В стандартной библиотеке есть полезные модули, такие как Queue, которые могут помочь.

Раньше я делал аналогичные вещи с параллельной загрузкой файлов, и это было достаточно для меня, но не в том масштабе, о котором вы говорите.

Если ваша задача была более привязана к процессору, вы можете посмотреть на многопроцессорный модуль, который позволит вам использовать больше процессоров / ядер / потоков (больше процессов, которые не будут блокировать друг друга, поскольку блокировка выполняется для каждого процесса)

Маттиас Нильссон
источник
Единственное, что я хотел бы упомянуть, это то, что порождение нескольких процессов может быть дороже, чем порождение нескольких потоков. Кроме того, нет явного увеличения производительности при отправке 100 000 HTTP-запросов с несколькими процессами против нескольких потоков.
Игорь Ганапольский
0

Подумайте об использовании Windmill , хотя Windmill, вероятно, не может сделать столько потоков.

Вы можете сделать это с помощью сценария Python, запущенного вручную, на 5 машинах, каждая из которых соединяется с исходящими портами 40000-60000, открывая 100 000 соединений портов.

Кроме того, это может помочь выполнить пример теста с приложением QA с хорошими потоками, таким как OpenSTA , чтобы понять, сколько может обрабатывать каждый сервер.

Также попробуйте изучить использование простого Perl с классом LWP :: ConnCache. Таким образом, вы, вероятно, получите больше производительности (больше подключений).

djangofan
источник
0

Этот скрученный асинхронный веб-клиент работает довольно быстро.

#!/usr/bin/python2.7

from twisted.internet import reactor
from twisted.internet.defer import Deferred, DeferredList, DeferredLock
from twisted.internet.defer import inlineCallbacks
from twisted.web.client import Agent, HTTPConnectionPool
from twisted.web.http_headers import Headers
from pprint import pprint
from collections import defaultdict
from urlparse import urlparse
from random import randrange
import fileinput

pool = HTTPConnectionPool(reactor)
pool.maxPersistentPerHost = 16
agent = Agent(reactor, pool)
locks = defaultdict(DeferredLock)
codes = {}

def getLock(url, simultaneous = 1):
    return locks[urlparse(url).netloc, randrange(simultaneous)]

@inlineCallbacks
def getMapping(url):
    # Limit ourselves to 4 simultaneous connections per host
    # Tweak this number, but it should be no larger than pool.maxPersistentPerHost 
    lock = getLock(url,4)
    yield lock.acquire()
    try:
        resp = yield agent.request('HEAD', url)
        codes[url] = resp.code
    except Exception as e:
        codes[url] = str(e)
    finally:
        lock.release()


dl = DeferredList(getMapping(url.strip()) for url in fileinput.input())
dl.addCallback(lambda _: reactor.stop())

reactor.run()
pprint(codes)
Robᵩ
источник
0

Я обнаружил, что использование tornadoпакета - самый быстрый и простой способ добиться этого:

from tornado import ioloop, httpclient, gen


def main(urls):
    """
    Asynchronously download the HTML contents of a list of URLs.
    :param urls: A list of URLs to download.
    :return: List of response objects, one for each URL.
    """

    @gen.coroutine
    def fetch_and_handle():
        httpclient.AsyncHTTPClient.configure(None, defaults=dict(user_agent='MyUserAgent'))
        http_client = httpclient.AsyncHTTPClient()
        waiter = gen.WaitIterator(*[http_client.fetch(url, raise_error=False, method='HEAD')
                                    for url in urls])
        results = []
        # Wait for the jobs to complete
        while not waiter.done():
            try:
                response = yield waiter.next()
            except httpclient.HTTPError as e:
                print(f'Non-200 HTTP response returned: {e}')
                continue
            except Exception as e:
                print(f'An unexpected error occurred querying: {e}')
                continue
            else:
                print(f'URL \'{response.request.url}\' has status code <{response.code}>')
                results.append(response)
        return results

    loop = ioloop.IOLoop.current()
    web_pages = loop.run_sync(fetch_and_handle)

    return web_pages

my_urls = ['url1.com', 'url2.com', 'url100000.com']
responses = main(my_urls)
print(responses[0])
RDRR
источник
-2

Самый простой способ - использовать встроенную библиотеку потоков Python. Они не являются "реальными" / потоками ядра. У них есть проблемы (например, сериализация), но они достаточно хороши. Вы хотите, чтобы очередь и пул потоков. Один вариант здесь , но это тривиально, чтобы написать свой собственный. Вы не можете распараллелить все 100 000 вызовов, но вы можете запустить 100 (или около того) из них одновременно.

pestilence669
источник
7
Потоки Python вполне реальны, в отличие от, например, Ruby. Под капотом они реализованы как нативные потоки ОС, по крайней мере, в Unix / Linux и Windows. Может быть, вы имеете в виду GIL, но это не делает темы менее реальными ...
Эли Бендерский,
2
Eli прав насчет потоков Python, но утверждение Pestilence о том, что вы хотите использовать пул потоков, также верно. Последнее, что вы хотели бы сделать в этом случае, - это попытаться запустить отдельный поток для каждого из 100 000 запросов одновременно.
Адам Кроссленд
1
Игорь, вы не можете публиковать фрагменты кода в комментариях, но вы можете отредактировать свой вопрос и добавить их туда.
Адам Кроссленд
Мор: сколько очередей и потоков в очереди вы бы порекомендовали для моего решения?
Игорь Ганапольский
Кроме того, это связанная с
вводом