У меня есть один большой текстовый файл, в котором я хочу обработать каждую строку (выполнить некоторые операции) и сохранить их в базе данных. Поскольку одна простая программа занимает слишком много времени, я хочу, чтобы она выполнялась с помощью нескольких процессов или потоков. Каждый поток / процесс должен читать РАЗЛИЧНЫЕ данные (разные строки) из этого единственного файла и выполнять некоторые операции с их частями данных (строками) и помещать их в базу данных, чтобы в конечном итоге у меня были все обработанные данные и мои база данных выгружается с нужными мне данными.
Но я не могу понять, как к этому подойти.
python
multithreading
multiprocessing
пранавк
источник
источник
Ответы:
То, что вы ищете, - это шаблон "производитель / потребитель".
Базовый пример потоковой передачи
Вот базовый пример использования модуля потоковой передачи (вместо многопроцессорной обработки)
import threading import Queue import sys def do_work(in_queue, out_queue): while True: item = in_queue.get() # process result = item out_queue.put(result) in_queue.task_done() if __name__ == "__main__": work = Queue.Queue() results = Queue.Queue() total = 20 # start for workers for i in xrange(4): t = threading.Thread(target=do_work, args=(work, results)) t.daemon = True t.start() # produce data for i in xrange(total): work.put(i) work.join() # get the results for i in xrange(total): print results.get() sys.exit()
Вы бы не стали делиться файловым объектом с потоками. Вы могли бы произвести для них работу, снабдив очередь строками данных. Затем каждый поток брал строку, обрабатывал ее и затем возвращал в очередь.
В модуль многопроцессорной обработки встроены некоторые более продвинутые средства для обмена данными, такие как списки и особый вид очереди . Есть компромиссы между использованием многопроцессорности и потоков, и это зависит от того, привязана ли ваша работа к процессору или к вводу-выводу.
Базовая многопроцессорная обработка. Пример пула
Вот действительно простой пример многопроцессорного пула
from multiprocessing import Pool def process_line(line): return "FOO: %s" % line if __name__ == "__main__": pool = Pool(4) with open('file.txt') as source_file: # chunk the work into batches of 4 lines at a time results = pool.map(process_line, source_file, 4) print results
Пул - это удобный объект, который управляет собственными процессами. Поскольку открытый файл может перебирать свои строки, вы можете передать его объекту
pool.map()
, который выполнит цикл и доставит строки в рабочую функцию. Карта блокирует и возвращает весь результат, когда он готов. Имейте в виду, что это слишком упрощенный пример, и чтоpool.map()
он будет читать весь ваш файл в память сразу перед отправкой работы. Помните об этом, если вы ожидаете иметь большие файлы. Существуют более продвинутые способы создания настройки производителя / потребителя.Ручной "пул" с пересортировкой лимита и строк
Это ручной пример Pool.map , но вместо того, чтобы использовать всю итерацию за один раз, вы можете установить размер очереди, чтобы вы загружали ее по частям так быстро, как она может обрабатывать. Я также добавил номера строк, чтобы вы могли отслеживать их и ссылаться на них, если хотите, позже.
from multiprocessing import Process, Manager import time import itertools def do_work(in_queue, out_list): while True: item = in_queue.get() line_no, line = item # exit signal if line == None: return # fake work time.sleep(.5) result = (line_no, line) out_list.append(result) if __name__ == "__main__": num_workers = 4 manager = Manager() results = manager.list() work = manager.Queue(num_workers) # start for workers pool = [] for i in xrange(num_workers): p = Process(target=do_work, args=(work, results)) p.start() pool.append(p) # produce data with open("source.txt") as f: iters = itertools.chain(f, (None,)*num_workers) for num_and_line in enumerate(iters): work.put(num_and_line) for p in pool: p.join() # get the results # example: [(1, "foo"), (10, "bar"), (0, "start")] print sorted(results)
источник
(None,) * num_workers
создает кортеж из значений None, равный размеру числа рабочих. Это будут контрольные значения, которые сообщают каждому потоку о завершении работы, потому что больше нет работы. Этаitertools.chain
функция позволяет вам объединить несколько последовательностей в одну виртуальную последовательность, не копируя ничего. Итак, мы получаем, что сначала он перебирает строки в файле, а затем значения None.Вот действительно глупый пример, который я придумал:
import os.path import multiprocessing def newlinebefore(f,n): f.seek(n) c=f.read(1) while c!='\n' and n > 0: n-=1 f.seek(n) c=f.read(1) f.seek(n) return n filename='gpdata.dat' #your filename goes here. fsize=os.path.getsize(filename) #size of file (in bytes) #break the file into 20 chunks for processing. nchunks=20 initial_chunks=range(1,fsize,fsize/nchunks) #You could also do something like: #initial_chunks=range(1,fsize,max_chunk_size_in_bytes) #this should work too. with open(filename,'r') as f: start_byte=sorted(set([newlinebefore(f,i) for i in initial_chunks])) end_byte=[i-1 for i in start_byte] [1:] + [None] def process_piece(filename,start,end): with open(filename,'r') as f: f.seek(start+1) if(end is None): text=f.read() else: nbytes=end-start+1 text=f.read(nbytes) # process text here. createing some object to be returned # You could wrap text into a StringIO object if you want to be able to # read from it the way you would a file. returnobj=text return returnobj def wrapper(args): return process_piece(*args) filename_repeated=[filename]*len(start_byte) args=zip(filename_repeated,start_byte,end_byte) pool=multiprocessing.Pool(4) result=pool.map(wrapper,args) #Now take your results and write them to the database. print "".join(result) #I just print it to make sure I get my file back ...
Сложная часть здесь состоит в том, чтобы убедиться, что мы разбили файл на символы новой строки, чтобы вы не пропустили ни одной строки (или прочитали только частичные строки). Затем каждый процесс считывает свою часть файла и возвращает объект, который может быть помещен в базу данных основным потоком. Конечно, вам может даже понадобиться выполнять эту часть по частям, чтобы вам не приходилось хранить всю информацию в памяти сразу. (это довольно легко сделать - просто разделите список «args» на X фрагментов и вызовите
pool.map(wrapper,chunk)
- см. здесь )источник
хорошо разбить один большой файл на несколько файлов меньшего размера и обработать каждый из них в отдельных потоках.
источник