import multiprocessing as mp import threading as th import time from math import ceil class mt(): def __init__(self, threads, data, verbose=False, progressinterval=1000, useprocesses=False): self.__running = False self.__closed = False self.__data = data self.__verbose = verbose self.__useprocesses = useprocesses # dummy self.__final = None self.__comment = None self.__starttime = None self.__endtime = None self.__type = None # thread things self.__threadcount = threads self.__threads = [] if useprocesses: mp.set_start_method('fork', True) self.__manager = mp.Manager() self.__results = self.__manager.list() self.__progress = mp.Value("i", 0) self.__lock = mp.Lock() self.__keepalive = self.__manager.list([0]) else: self.__results = [] self.__progress = 0 self.__lock = th.Lock() self.__keepalive = [0] self.__progressinterval = progressinterval for i in range(self.__threadcount): self.__results.append([]) self.__threads.append(None) def filter(self, cond, comment=None): if self.__closed: raise RuntimeError("Already closed") if self.__running: self.join() self.__data = self.getresults() self.__reset_progress() self.__running = True self.__final = self.__getresultsmapfilter self.__type = "filter" self.__comment = comment if comment is not None: print(self.__comment + ": 0/" + str(len(self.__data)) + " ...", end='\n' if self.__verbose else '', flush=True) self.__starttime = self.__cms() self.__endtime = None for i in range(self.__threadcount): part = self.__data[i * len(self.__data) // self.__threadcount:(i + 1) * len(self.__data) // self.__threadcount] if self.__useprocesses: self.__threads[i] = mp.Process(target=self.__dofilter, args=(i, part, cond)) else: self.__threads[i] = th.Thread(target=self.__dofilter, args=(i, part, cond)) self.__threads[i].start() return self def __dofilter(self, i, list, cond): now = self.__cms() results = [] for j in range(ceil(len(list) / self.__progressinterval)): part = list[j * self.__progressinterval: min((j + 1) * self.__progressinterval, len(list))] results += [l for l in part if cond(l)] with self.__lock: self.__inc_progress(len(part)) if self.__comment is not None: print("\r" + self.__comment + ": " + str(self.__get_progress()) + "/" + str(len(self.__data)) + " ...", end='', flush=True) self.__keepalive[0] = 0 with self.__lock: self.__results[i] = results dur = self.__cms() - now if self.__verbose: if self.__comment is not None: print(self.__comment + ": Thread " + str(i) + ": filter took " + str(dur) + "ms") else: print("Thread " + str(i) + ": filter took " + str(dur) + "ms") def map(self, func, comment=None): if self.__closed: raise RuntimeError("Already closed") if self.__running: self.join() self.__data = self.getresults() self.__reset_progress() self.__running = True self.__final = self.__getresultsmapfilter self.__type = "map" self.__comment = comment if comment is not None: print(self.__comment + ": 0/" + str(len(self.__data)) + " ...", end='\n' if self.__verbose else '', flush=True) self.__starttime = self.__cms() self.__endtime = None for i in range(self.__threadcount): part = self.__data[i * len(self.__data) // self.__threadcount:(i + 1) * len(self.__data) // self.__threadcount] if self.__useprocesses: self.__threads[i] = mp.Process(target=self.__domap, args=(i, part, func)) else: self.__threads[i] = th.Thread(target=self.__domap, args=(i, part, func)) self.__threads[i].start() return self def __domap(self, i, list, func): now = self.__cms() results = [] for j in range(ceil(len(list) / self.__progressinterval)): part = list[j * self.__progressinterval: min((j + 1) * self.__progressinterval, len(list))] results += [func(l) for l in part] with self.__lock: self.__inc_progress(len(part)) if self.__comment is not None: print("\r" + self.__comment + ": " + str(self.__get_progress()) + "/" + str(len(self.__data)) + " ...", end='', flush=True) self.__keepalive[0] = 0 with self.__lock: self.__results[i] = results dur = self.__cms() - now if self.__verbose: if self.__comment is not None: print(self.__comment + ": Thread " + str(i) + ": filter took " + str(dur) + "ms") else: print("Thread " + str(i) + ": filter took " + str(dur) + "ms") def reduce(self, reducer, aggregator, initval, comment=None): if self.__closed: raise RuntimeError("Already closed") if self.__running: self.join() self.__data = self.getresults() self.__reset_progress() self.__running = True self.__final = lambda: self.__getresultsreduce(aggregator, initval) self.__type = "reduce" self.__comment = comment if comment is not None: print(self.__comment + ": 0/" + str(len(self.__data)) + " ...", end='\n' if self.__verbose else '', flush=True) self.__starttime = self.__cms() self.__endtime = None for i in range(self.__threadcount): part = self.__data[i * len(self.__data) // self.__threadcount:(i + 1) * len(self.__data) // self.__threadcount] if self.__useprocesses: self.__threads[i] = mp.Process(target=self.__doreduce, args=(i, part, reducer, initval)) else: self.__threads[i] = th.Thread(target=self.__doreduce, args=(i, part, reducer, initval)) self.__threads[i].start() return self def __doreduce(self, i, list, reducer, initval): now = self.__cms() val = initval() for j in range(ceil(len(list) / self.__progressinterval)): part = list[j * self.__progressinterval: min((j + 1) * self.__progressinterval, len(list))] for k in range(len(part)): val = reducer(val, part[k]) with self.__lock: self.__inc_progress(len(part)) if self.__comment is not None: print("\r" + self.__comment + ": " + str(self.__get_progress()) + "/" + str(len(self.__data)) + " ...", end='', flush=True) self.__keepalive[0] = 0 with self.__lock: self.__results[i] = val dur = self.__cms() - now if self.__verbose: if self.__comment is not None: print(self.__comment + ": Thread " + str(i) + ": filter took " + str(dur) + "ms") else: print("Thread " + str(i) + ": filter took " + str(dur) + "ms") def getresults(self): self.join() return self.__final() def __getresultsmapfilter(self): res = [] for i in range(self.__threadcount): res += self.__results[i] return res def __getresultsreduce(self, aggregator, initval): val = initval() for j in range(self.__threadcount): val = aggregator(val, self.__results[j]) return val def join(self): if self.__closed: raise RuntimeError("Already closed") for i in range(self.__threadcount): if self.__threads[i] is not None: self.__threads[i].join() self.__threads[i] = None if self.__endtime is None: self.__endtime = self.__cms() if self.__comment is not None: dur = self.__endtime - self.__starttime if self.__verbose: print(self.__comment + ": " + str(self.__get_progress()) + "/" + str(len(self.__data)) + ( " -> #" + str(sum([len(l) for l in self.__results])) if self.__type == "filter" else "") + " ... took " + str(dur) + "ms") else: print("\r" + self.__comment + ": " + str(self.__get_progress()) + "/" + str(len(self.__data)) + ( " -> #" + str(sum([len(l) for l in self.__results])) if self.__type == "filter" else "") + " ... took " + str(dur) + "ms") return self def close(self): if self.__closed: raise RuntimeError("Already closed") self.join() self.__closed = True def __cms(self): return int(round(time.time() * 1000)) def __reset_progress(self): if self.__useprocesses: self.__progress.value = 0 else: self.__progress = 0 def __inc_progress(self, val): if self.__useprocesses: self.__progress.value += val else: self.__progress += val def __get_progress(self): if self.__useprocesses: return self.__progress.value else: return self.__progress