from threading import Thread, Lock import time class mt(): def __init__(self, threads, data, verbose=False): self.__running = False self.__closed = False self.__data = data self.__verbose = verbose # dummy self.__final = None self.__comment = None self.__starttime = None self.__endtime = None self.__type = None # thread things self.__threadcount = threads self.__threads = [] self.__lock = Lock() self.__results = [] for i in range(self.__threadcount): self.__results.append([]) self.__threads.append(None) def filter(self, cond, comment=None): if self.__closed: raise RuntimeError("Already closed") if self.__running: self.join() self.__data = self.getresults() self.__running = True self.__final = self.__getresultsmapfilter self.__type = "filter" self.__comment = comment if comment is not None else "" if comment is not None: print(self.__comment + ": #" + str(len(self.__data)) + " ...", end='\n' if self.__verbose else '', flush=True) self.__starttime = self.__cms() self.__endtime = None for i in range(self.__threadcount): part = self.__data[i * len(self.__data) // self.__threadcount:(i + 1) * len(self.__data) // self.__threadcount] self.__threads[i] = Thread(target=self.__dofilter, args=(i, part, cond)) self.__threads[i].start() return self def __dofilter(self, i, list, cond): now = self.__cms() results = [l for l in list if cond(l)] with self.__lock: self.__results[i] = results dur = self.__cms() - now if self.__verbose: print(self.__comment + ": Thread " + str(i) + ": filter took " + str(dur) + "ms") def map(self, func, comment=None): if self.__closed: raise RuntimeError("Already closed") if self.__running: self.join() self.__data = self.getresults() self.__running = True self.__final = self.__getresultsmapfilter self.__type = "map" self.__comment = comment if comment is not None else "" if comment is not None: print(self.__comment + ": #" + str(len(self.__data)) + " ...", end='\n' if self.__verbose else '', flush=True) self.__starttime = self.__cms() self.__endtime = None for i in range(self.__threadcount): part = self.__data[i * len(self.__data) // self.__threadcount:(i + 1) * len(self.__data) // self.__threadcount] self.__threads[i] = Thread(target=self.__domap, args=(i, part, func)) self.__threads[i].start() return self def __domap(self, i, list, func): now = self.__cms() results = [func(l) for l in list] with self.__lock: self.__results[i] = results dur = self.__cms() - now if self.__verbose: print(self.__comment + ": Thread " + str(i) + ": map took " + str(dur) + "ms") def reduce(self, reducer, aggregator, initval, comment=None): if self.__closed: raise RuntimeError("Already closed") if self.__running: self.join() self.__data = self.getresults() self.__running = True self.__final = lambda: self.__getresultsreduce(aggregator, initval) self.__type = "reduce" self.__comment = comment if comment is not None else "" if comment is not None: print(self.__comment + ": #" + str(len(self.__data)) + " ...", end='\n' if self.__verbose else '', flush=True) self.__starttime = self.__cms() self.__endtime = None for i in range(self.__threadcount): part = self.__data[i * len(self.__data) // self.__threadcount:(i + 1) * len(self.__data) // self.__threadcount] self.__threads[i] = Thread(target=self.__doreduce, args=(i, part, reducer, initval)) self.__threads[i].start() return self def __doreduce(self, i, list, reducer, initval): now = self.__cms() val = initval() for j in range(len(list)): val = reducer(val, list[j]) with self.__lock: self.__results[i] = val dur = self.__cms() - now if self.__verbose: print(self.__comment + ": Thread " + str(i) + ": reduce took " + str(dur) + "ms") def getresults(self): self.join() return self.__final() def __getresultsmapfilter(self): res = [] for i in range(self.__threadcount): res += self.__results[i] return res def __getresultsreduce(self, aggregator, initval): val = initval() for j in range(self.__threadcount): val = aggregator(val, self.__results[j]) return val def join(self): if self.__closed: raise RuntimeError("Already closed") for i in range(self.__threadcount): if self.__threads[i] is not None: self.__threads[i].join() self.__threads[i] = None if self.__endtime is None: self.__endtime = self.__cms(); if self.__comment is not None: dur = self.__endtime - self.__starttime if self.__verbose: print(self.__comment + ": #" + str(len(self.__data)) + (" -> #" + str(sum([len(l) for l in self.__results])) if self.__type == "filter" else "") + " ... took " + str(dur) + "ms") else: print("\r" + self.__comment + ": #" + str(len(self.__data)) + (" -> #" + str(sum([len(l) for l in self.__results])) if self.__type == "filter" else "") + " ... took " + str(dur) + "ms") return self def close(self): if self.__closed: raise RuntimeError("Already closed") self.join() self.__closed = True def __cms(self): return int(round(time.time() * 1000))