198 lines
8.0 KiB
Python
198 lines
8.0 KiB
Python
import time
|
|
from math import ceil
|
|
from threading import Thread, Lock
|
|
|
|
|
|
class mt():
|
|
def __init__(self, threads, data, verbose=False, progressinterval=1000):
|
|
self.__running = False
|
|
self.__closed = False
|
|
self.__data = data
|
|
self.__verbose = verbose
|
|
# dummy
|
|
self.__final = None
|
|
self.__comment = None
|
|
self.__starttime = None
|
|
self.__endtime = None
|
|
self.__type = None
|
|
# thread things
|
|
self.__threadcount = threads
|
|
self.__threads = []
|
|
self.__lock = Lock()
|
|
self.__results = []
|
|
self.__progress = 0
|
|
self.__progressinterval = progressinterval
|
|
for i in range(self.__threadcount):
|
|
self.__results.append([])
|
|
self.__threads.append(None)
|
|
|
|
def filter(self, cond, comment=None):
|
|
if self.__closed:
|
|
raise RuntimeError("Already closed")
|
|
if self.__running:
|
|
self.join()
|
|
self.__data = self.getresults()
|
|
self.__progress = 0
|
|
self.__running = True
|
|
self.__final = self.__getresultsmapfilter
|
|
self.__type = "filter"
|
|
self.__comment = comment
|
|
if comment is not None:
|
|
print(self.__comment + ": 0/" + str(len(self.__data)) + " ...", end='\n' if self.__verbose else '', flush=True)
|
|
self.__starttime = self.__cms()
|
|
self.__endtime = None
|
|
for i in range(self.__threadcount):
|
|
part = self.__data[i * len(self.__data) // self.__threadcount:(i + 1) * len(self.__data) // self.__threadcount]
|
|
self.__threads[i] = Thread(target=self.__dofilter, args=(i, part, cond))
|
|
self.__threads[i].start()
|
|
return self
|
|
|
|
def __dofilter(self, i, list, cond):
|
|
now = self.__cms()
|
|
|
|
results = []
|
|
for j in range(ceil(len(list) / self.__progressinterval)):
|
|
part = list[j * self.__progressinterval: min((j + 1) * self.__progressinterval, len(list))]
|
|
results += [l for l in part if cond(l)]
|
|
with self.__lock:
|
|
self.__progress += len(part)
|
|
if self.__comment is not None:
|
|
print("\r" + self.__comment + ": " + str(self.__progress) + "/" + str(len(self.__data)) + " ...", end='', flush=True)
|
|
|
|
with self.__lock:
|
|
self.__results[i] = results
|
|
dur = self.__cms() - now
|
|
if self.__verbose:
|
|
if self.__comment is not None:
|
|
print(self.__comment + ": Thread " + str(i) + ": filter took " + str(dur) + "ms")
|
|
else:
|
|
print("Thread " + str(i) + ": filter took " + str(dur) + "ms")
|
|
|
|
def map(self, func, comment=None):
|
|
if self.__closed:
|
|
raise RuntimeError("Already closed")
|
|
if self.__running:
|
|
self.join()
|
|
self.__data = self.getresults()
|
|
self.__progress = 0
|
|
self.__running = True
|
|
self.__final = self.__getresultsmapfilter
|
|
self.__type = "map"
|
|
self.__comment = comment
|
|
if comment is not None:
|
|
print(self.__comment + ": 0/" + str(len(self.__data)) + " ...", end='\n' if self.__verbose else '', flush=True)
|
|
self.__starttime = self.__cms()
|
|
self.__endtime = None
|
|
for i in range(self.__threadcount):
|
|
part = self.__data[i * len(self.__data) // self.__threadcount:(i + 1) * len(self.__data) // self.__threadcount]
|
|
self.__threads[i] = Thread(target=self.__domap, args=(i, part, func))
|
|
self.__threads[i].start()
|
|
return self
|
|
|
|
def __domap(self, i, list, func):
|
|
now = self.__cms()
|
|
results = []
|
|
for j in range(ceil(len(list) / self.__progressinterval)):
|
|
part = list[j * self.__progressinterval: min((j + 1) * self.__progressinterval, len(list))]
|
|
results += [func(l) for l in part]
|
|
with self.__lock:
|
|
self.__progress += len(part)
|
|
if self.__comment is not None:
|
|
print("\r" + self.__comment + ": " + str(self.__progress) + "/" + str(len(self.__data)) + " ...", end='', flush=True)
|
|
|
|
with self.__lock:
|
|
self.__results[i] = results
|
|
dur = self.__cms() - now
|
|
if self.__verbose:
|
|
if self.__comment is not None:
|
|
print(self.__comment + ": Thread " + str(i) + ": filter took " + str(dur) + "ms")
|
|
else:
|
|
print("Thread " + str(i) + ": filter took " + str(dur) + "ms")
|
|
|
|
def reduce(self, reducer, aggregator, initval, comment=None):
|
|
if self.__closed:
|
|
raise RuntimeError("Already closed")
|
|
if self.__running:
|
|
self.join()
|
|
self.__data = self.getresults()
|
|
self.__progress = 0
|
|
self.__running = True
|
|
self.__final = lambda: self.__getresultsreduce(aggregator, initval)
|
|
self.__type = "reduce"
|
|
self.__comment = comment
|
|
if comment is not None:
|
|
print(self.__comment + ": 0/" + str(len(self.__data)) + " ...", end='\n' if self.__verbose else '', flush=True)
|
|
self.__starttime = self.__cms()
|
|
self.__endtime = None
|
|
for i in range(self.__threadcount):
|
|
part = self.__data[i * len(self.__data) // self.__threadcount:(i + 1) * len(self.__data) // self.__threadcount]
|
|
self.__threads[i] = Thread(target=self.__doreduce, args=(i, part, reducer, initval))
|
|
self.__threads[i].start()
|
|
return self
|
|
|
|
def __doreduce(self, i, list, reducer, initval):
|
|
now = self.__cms()
|
|
val = initval()
|
|
|
|
for j in range(ceil(len(list) / self.__progressinterval)):
|
|
part = list[j * self.__progressinterval: min((j + 1) * self.__progressinterval, len(list))]
|
|
for k in range(len(part)):
|
|
val = reducer(val, part[k])
|
|
with self.__lock:
|
|
self.__progress += len(part)
|
|
if self.__comment is not None:
|
|
print("\r" + self.__comment + ": " + str(self.__progress) + "/" + str(len(self.__data)) + " ...", end='', flush=True)
|
|
|
|
with self.__lock:
|
|
self.__results[i] = val
|
|
dur = self.__cms() - now
|
|
if self.__verbose:
|
|
if self.__comment is not None:
|
|
print(self.__comment + ": Thread " + str(i) + ": filter took " + str(dur) + "ms")
|
|
else:
|
|
print("Thread " + str(i) + ": filter took " + str(dur) + "ms")
|
|
|
|
def getresults(self):
|
|
self.join()
|
|
return self.__final()
|
|
|
|
def __getresultsmapfilter(self):
|
|
res = []
|
|
for i in range(self.__threadcount):
|
|
res += self.__results[i]
|
|
return res
|
|
|
|
def __getresultsreduce(self, aggregator, initval):
|
|
val = initval()
|
|
for j in range(self.__threadcount):
|
|
val = aggregator(val, self.__results[j])
|
|
return val
|
|
|
|
def join(self):
|
|
if self.__closed:
|
|
raise RuntimeError("Already closed")
|
|
for i in range(self.__threadcount):
|
|
if self.__threads[i] is not None:
|
|
self.__threads[i].join()
|
|
self.__threads[i] = None
|
|
if self.__endtime is None:
|
|
self.__endtime = self.__cms()
|
|
if self.__comment is not None:
|
|
dur = self.__endtime - self.__starttime
|
|
if self.__verbose:
|
|
print(self.__comment + ": " + str(self.__progress) + "/" + str(len(self.__data)) + (
|
|
" -> #" + str(sum([len(l) for l in self.__results])) if self.__type == "filter" else "") + " ... took " + str(dur) + "ms")
|
|
else:
|
|
print("\r" + self.__comment + ": " + str(self.__progress) + "/" + str(len(self.__data)) + (
|
|
" -> #" + str(sum([len(l) for l in self.__results])) if self.__type == "filter" else "") + " ... took " + str(dur) + "ms")
|
|
return self
|
|
|
|
def close(self):
|
|
if self.__closed:
|
|
raise RuntimeError("Already closed")
|
|
self.join()
|
|
self.__closed = True
|
|
|
|
def __cms(self):
|
|
return int(round(time.time() * 1000))
|