Parallel processing of large xml files in python -


i have several large xml files parsing (extracting subset of data , writing file), there lots of files , lots of records per file, i'm attempting parallelize.

to start, have generator pulls records file (this works fine):

def reader(files):     n=0     fname in files:         chunk = ''         gzip.open(fname, "r") f:             line in f:                 line = line.strip()                 if '<rec' in line or ('<rec' in chunk , line != '</rec>'):                     chunk += line                     continue                 if line == '</rec>':                     chunk += line                     n += 1                     yield chunk                     chunk = '' 

a process function (details not relevant here, works fine):

def process(chunk,fields='all'):     paper = etree.fromstring(chunk)     #     # extract info xml     #     return result # result string 

now of course naive, non-parallel way simple as:

records = reader(files) open(output_filepath,'w') fout:     record in records:         result = process(record)         fout.write(result+'\n') 

but want parallelize this. first considered doing simple map-based approach, each process handling 1 of files, files of radically different sizes (and big), pretty inefficient use of parallelization, think. current approach:

import multiprocessing mp

def feed(queue, records):     rec in records:         queue.put(rec)     queue.put(none)  def calc(queuein, queueout):     while true:         rec = queuein.get(block=true)         if rec none:             queueout.put('__done__')             break         result = process(rec)         queueout.put(result)  def write(queue, file_handle):     records_logged = 0     while true:         result = queue.get()         if result == '__done__':             logger.info("{} --> records logged ({})".format(file_handle.name,records_logged))             break         elif result not none:             file_handle.write(result+'\n')             file_handle.flush()             records_logged +=1             if records_logged % 1000 == 0:                 logger.info("{} --> {} records complete".format(file_handle.name,records_logged))  nthreads = n  records = reader(filelist)  workerqueue = mp.queue() writerqueue = mp.queue() feedproc = mp.process(target = feed , args = (workerqueue, records)) calcproc = [mp.process(target = calc, args = (workerqueue, writerqueue)) in range(nthreads)] writproc = mp.process(target = write, args = (writerqueue, handle))  feedproc.start() p in calcproc:     p.start() writproc.start()  feedproc.join() p in calcproc:     p.join()  writproc.join()  feedproc.terminate() writproc.terminate() p in calcproc:     p.terminate()  workerqueue.close() writerqueue.close() 

now, works in sense gets written file, hangs when trying join processes @ end, , i'm not sure why. so, main question is, doing wrong here such worker processes aren't correctly terminating, or signaling they're done?

i think solve problem "easy" way adding timeouts calls join (a) seems rather inelegant solution here, there clear completion conditions task (i.e. once every record in file has been processed, we're done), , (b) i'm worried introduce problem (e.g. if make timeout short, couldn't terminate things before has been processed? , of course making long wasting time...).

i'm willing consider totally different approach parallelizing if has ideas (the queue seemed choice since files big , reading , generating raw records takes time).

bonus question: i'm aware approach in no way guarantees output i'm writing file in same order original data. not huge deal (sorting reduced/processed data won't unwieldy), maintaining order nice. gratitude if has solution ensure preserve original order.


Comments

Popular posts from this blog

unity3d - Rotate an object to face an opposite direction -

angular - Is it possible to get native element for formControl? -

javascript - Why jQuery Select box change event is now working? -