Parallel processing of large xml files in python -
i have several large xml files parsing (extracting subset of data , writing file), there lots of files , lots of records per file, i'm attempting parallelize.
to start, have generator pulls records file (this works fine):
def reader(files): n=0 fname in files: chunk = '' gzip.open(fname, "r") f: line in f: line = line.strip() if '<rec' in line or ('<rec' in chunk , line != '</rec>'): chunk += line continue if line == '</rec>': chunk += line n += 1 yield chunk chunk = ''
a process function (details not relevant here, works fine):
def process(chunk,fields='all'): paper = etree.fromstring(chunk) # # extract info xml # return result # result string
now of course naive, non-parallel way simple as:
records = reader(files) open(output_filepath,'w') fout: record in records: result = process(record) fout.write(result+'\n')
but want parallelize this. first considered doing simple map-based approach, each process handling 1 of files, files of radically different sizes (and big), pretty inefficient use of parallelization, think. current approach:
import multiprocessing mp
def feed(queue, records): rec in records: queue.put(rec) queue.put(none) def calc(queuein, queueout): while true: rec = queuein.get(block=true) if rec none: queueout.put('__done__') break result = process(rec) queueout.put(result) def write(queue, file_handle): records_logged = 0 while true: result = queue.get() if result == '__done__': logger.info("{} --> records logged ({})".format(file_handle.name,records_logged)) break elif result not none: file_handle.write(result+'\n') file_handle.flush() records_logged +=1 if records_logged % 1000 == 0: logger.info("{} --> {} records complete".format(file_handle.name,records_logged)) nthreads = n records = reader(filelist) workerqueue = mp.queue() writerqueue = mp.queue() feedproc = mp.process(target = feed , args = (workerqueue, records)) calcproc = [mp.process(target = calc, args = (workerqueue, writerqueue)) in range(nthreads)] writproc = mp.process(target = write, args = (writerqueue, handle)) feedproc.start() p in calcproc: p.start() writproc.start() feedproc.join() p in calcproc: p.join() writproc.join() feedproc.terminate() writproc.terminate() p in calcproc: p.terminate() workerqueue.close() writerqueue.close()
now, works in sense gets written file, hangs when trying join processes @ end, , i'm not sure why. so, main question is, doing wrong here such worker processes aren't correctly terminating, or signaling they're done?
i think solve problem "easy" way adding timeouts calls join
(a) seems rather inelegant solution here, there clear completion conditions task (i.e. once every record in file has been processed, we're done), , (b) i'm worried introduce problem (e.g. if make timeout short, couldn't terminate things before has been processed? , of course making long wasting time...).
i'm willing consider totally different approach parallelizing if has ideas (the queue seemed choice since files big , reading , generating raw records takes time).
bonus question: i'm aware approach in no way guarantees output i'm writing file in same order original data. not huge deal (sorting reduced/processed data won't unwieldy), maintaining order nice. gratitude if has solution ensure preserve original order.
Comments
Post a Comment