#!/usr/bin/env python ## This routine performs gzip compression in parallel, via ## multiple jobs. For now we only do compression. ## Decompression would require hacking the zlib library ## Usage: ## npgzip file file file file ## or ## cat file | npgzip > file.gz ## or ## tar --use-compress-program npgzip -cvf dir.tar.gz dir/ ## This program should have many more options. ## A version of this program that uses the zlib library directly ## is called npgzip2. It 'works', but see it for problems. ## This program is in the public domain. import threading,Queue,sys,subprocess,os Chunksize=3000000 # read/write in ~3 meg chunks. try: nthread=len([ 1 for i in open('/proc/cpuinfo').read().split() if "processor" in i]) # try to get # cpus in Linux except: nthread=4 class dtm: # A class to hold our data def __init__(self, n, rawdt): self.n=n # Chunk number self.rawdt=rawdt # uncompressed data self.compdt="" # Compressed data class funcdataThread (threading.Thread): # A thread that waits for a queue to give it a function & some data. # It then calls the function on the data. Repeat. def __init__(self,funcdataq): threading.Thread.__init__(self) # initialize threadiness self.setDaemon(True) # Set the thread to die at exit() self.funcdataq=funcdataq # The queue from which we read. def run(self): while True: fundat=self.funcdataq.get() #Get the function and the data fundat[0](*fundat[1]) #perform function on the data w/ arguments class workerThread (threading.Thread): # These threads perform the actual job of compressing the data. def __init__(self): threading.Thread.__init__(self) # initialize threadiness self.setDaemon(True) # Set the thread to die at exit() self.writeq=Queue.Queue() # Create a queue to send data to helper thread # We need a separate thread to write the data into gzip # as we read from gzip funcdataThread(self.writeq).start() # Start a helper writing thread def run (self): while True: x=todoq.get() # Get a chunk of data object from q # start a gzip job pp=subprocess.Popen(["gzip","-8","-"], stdin=subprocess.PIPE, stdout=subprocess.PIPE,close_fds=True) # Here we have our funcdataThread write data to the process self.writeq.put([pp.stdin.write, [x.rawdt]]) self.writeq.put([pp.stdin.close,[]]) #make thread close the stream. while True: compdt=pp.stdout.read(Chunksize/3) # Read compressed data if not compdt: break # stop when all data is read. x.compdt+=compdt pp.stdout.close() # close output. doneq.put(x) # Put data in done queue def readinp(inp,nchunkread): data = inp.read(Chunksize) # Read a chunk of data from the file. if data : nchunkread+=1 todoq.put(dtm(nchunkread,data)) #put data in queue to be processed. return(nchunkread) def xcompress(inp,outp): # Read from a file, put in a queue. Wait for compressed data to come back. nchunkread=0 for i in range(int(nthread*1.5)): # Read from file nchunkread=readinp(inp,nchunkread) # put in queue nnext=1 # next number that we will get back from processing donelist=[] donedata=[] while nnext <= nchunkread : x=doneq.get() # Get the next processed piece off the queue donelist.append(x.n) # list of chunk numbers donedata.append(x.compdt) # Put data chunk in list while nnext in donelist: # if we did get what we needed. ind=donelist.index(nnext) # index in our lists outp.write(donedata.pop(ind)) # Write to output and remove donelist.pop(ind) # remove chunk number from list nnext+=1 # Attempt to read some more data and put it on the todo q nchunkread=readinp(inp,nchunkread) todoq=Queue.Queue() # Queue of data to be compressed doneq=Queue.Queue() # Queue of data that was compressed. for i in range(nthread): workerThread().start() # Start threads. def main(): if len(sys.argv)>1 : for filename in sys.argv[1:]: if filename <> "-": outputname = filename + '.gz' inp = open(filename, 'rb') outp = open(outputname, 'wb') else: inp=sys.stdin outp=sys.stdout sys.stderr.write(" "+filename+'\r') xcompress(inp,outp) inp.close() outp.close() if filename <> "-": os.remove(filename) else: xcompress(sys.stdin,sys.stdout) if __name__ == '__main__': main()