Files
master/mt.py
wea_ondara 60d966a497 wip
2020-01-25 13:51:16 +01:00

239 lines
9.4 KiB
Python

import multiprocessing as mp
import threading as th
import time
from math import ceil
class mt():
def __init__(self, threads, data, verbose=False, progressinterval=1000, useprocesses=False):
self.__running = False
self.__closed = False
self.__data = data
self.__verbose = verbose
self.__useprocesses = useprocesses
# dummy
self.__final = None
self.__comment = None
self.__starttime = None
self.__endtime = None
self.__type = None
# thread things
self.__threadcount = threads
self.__threads = []
if useprocesses:
mp.set_start_method('fork', True)
manager = mp.Manager()
self.__results = manager.list()
self.__progress = mp.Value("i", 0)
self.__lock = mp.Lock()
self.__keepalive = manager.list([0])
else:
self.__results = []
self.__progress = 0
self.__lock = th.Lock()
self.__keepalive = [0]
self.__progressinterval = progressinterval
for i in range(self.__threadcount):
self.__results.append([])
self.__threads.append(None)
def filter(self, cond, comment=None):
if self.__closed:
raise RuntimeError("Already closed")
if self.__running:
self.join()
self.__data = self.getresults()
self.__reset_progress()
self.__running = True
self.__final = self.__getresultsmapfilter
self.__type = "filter"
self.__comment = comment
if comment is not None:
print(self.__comment + ": 0/" + str(len(self.__data)) + " ...", end='\n' if self.__verbose else '', flush=True)
self.__starttime = self.__cms()
self.__endtime = None
for i in range(self.__threadcount):
part = self.__data[i * len(self.__data) // self.__threadcount:(i + 1) * len(self.__data) // self.__threadcount]
if self.__useprocesses:
self.__threads[i] = mp.Process(target=self.__dofilter, args=(i, part, cond))
else:
self.__threads[i] = th.Thread(target=self.__dofilter, args=(i, part, cond))
self.__threads[i].start()
return self
def __dofilter(self, i, list, cond):
now = self.__cms()
results = []
for j in range(ceil(len(list) / self.__progressinterval)):
part = list[j * self.__progressinterval: min((j + 1) * self.__progressinterval, len(list))]
results += [l for l in part if cond(l)]
with self.__lock:
self.__inc_progress(len(part))
if self.__comment is not None:
print("\r" + self.__comment + ": " + str(self.__get_progress()) + "/" + str(len(self.__data)) + " ...", end='', flush=True)
self.__keepalive[0] = 0
with self.__lock:
self.__results[i] = results
dur = self.__cms() - now
if self.__verbose:
if self.__comment is not None:
print(self.__comment + ": Thread " + str(i) + ": filter took " + str(dur) + "ms")
else:
print("Thread " + str(i) + ": filter took " + str(dur) + "ms")
def map(self, func, comment=None):
if self.__closed:
raise RuntimeError("Already closed")
if self.__running:
self.join()
self.__data = self.getresults()
self.__reset_progress()
self.__running = True
self.__final = self.__getresultsmapfilter
self.__type = "map"
self.__comment = comment
if comment is not None:
print(self.__comment + ": 0/" + str(len(self.__data)) + " ...", end='\n' if self.__verbose else '', flush=True)
self.__starttime = self.__cms()
self.__endtime = None
for i in range(self.__threadcount):
part = self.__data[i * len(self.__data) // self.__threadcount:(i + 1) * len(self.__data) // self.__threadcount]
if self.__useprocesses:
self.__threads[i] = mp.Process(target=self.__domap, args=(i, part, func))
else:
self.__threads[i] = th.Thread(target=self.__domap, args=(i, part, func))
self.__threads[i].start()
return self
def __domap(self, i, list, func):
now = self.__cms()
results = []
for j in range(ceil(len(list) / self.__progressinterval)):
part = list[j * self.__progressinterval: min((j + 1) * self.__progressinterval, len(list))]
results += [func(l) for l in part]
with self.__lock:
self.__inc_progress(len(part))
if self.__comment is not None:
print("\r" + self.__comment + ": " + str(self.__get_progress()) + "/" + str(len(self.__data)) + " ...", end='', flush=True)
self.__keepalive[0] = 0
with self.__lock:
self.__results[i] = results
dur = self.__cms() - now
if self.__verbose:
if self.__comment is not None:
print(self.__comment + ": Thread " + str(i) + ": filter took " + str(dur) + "ms")
else:
print("Thread " + str(i) + ": filter took " + str(dur) + "ms")
def reduce(self, reducer, aggregator, initval, comment=None):
if self.__closed:
raise RuntimeError("Already closed")
if self.__running:
self.join()
self.__data = self.getresults()
self.__reset_progress()
self.__running = True
self.__final = lambda: self.__getresultsreduce(aggregator, initval)
self.__type = "reduce"
self.__comment = comment
if comment is not None:
print(self.__comment + ": 0/" + str(len(self.__data)) + " ...", end='\n' if self.__verbose else '', flush=True)
self.__starttime = self.__cms()
self.__endtime = None
for i in range(self.__threadcount):
part = self.__data[i * len(self.__data) // self.__threadcount:(i + 1) * len(self.__data) // self.__threadcount]
if self.__useprocesses:
self.__threads[i] = mp.Process(target=self.__doreduce, args=(i, part, reducer, initval))
else:
self.__threads[i] = th.Thread(target=self.__doreduce, args=(i, part, reducer, initval))
self.__threads[i].start()
return self
def __doreduce(self, i, list, reducer, initval):
now = self.__cms()
val = initval()
for j in range(ceil(len(list) / self.__progressinterval)):
part = list[j * self.__progressinterval: min((j + 1) * self.__progressinterval, len(list))]
for k in range(len(part)):
val = reducer(val, part[k])
with self.__lock:
self.__inc_progress(len(part))
if self.__comment is not None:
print("\r" + self.__comment + ": " + str(self.__get_progress()) + "/" + str(len(self.__data)) + " ...", end='', flush=True)
self.__keepalive[0] = 0
with self.__lock:
self.__results[i] = val
dur = self.__cms() - now
if self.__verbose:
if self.__comment is not None:
print(self.__comment + ": Thread " + str(i) + ": filter took " + str(dur) + "ms")
else:
print("Thread " + str(i) + ": filter took " + str(dur) + "ms")
def getresults(self):
self.join()
return self.__final()
def __getresultsmapfilter(self):
res = []
for i in range(self.__threadcount):
res += self.__results[i]
return res
def __getresultsreduce(self, aggregator, initval):
val = initval()
for j in range(self.__threadcount):
val = aggregator(val, self.__results[j])
return val
def join(self):
if self.__closed:
raise RuntimeError("Already closed")
for i in range(self.__threadcount):
if self.__threads[i] is not None:
self.__threads[i].join()
self.__threads[i] = None
if self.__endtime is None:
self.__endtime = self.__cms()
if self.__comment is not None:
dur = self.__endtime - self.__starttime
if self.__verbose:
print(self.__comment + ": " + str(self.__get_progress()) + "/" + str(len(self.__data)) + (
" -> #" + str(sum([len(l) for l in self.__results])) if self.__type == "filter" else "") + " ... took " + str(dur) + "ms")
else:
print("\r" + self.__comment + ": " + str(self.__get_progress()) + "/" + str(len(self.__data)) + (
" -> #" + str(sum([len(l) for l in self.__results])) if self.__type == "filter" else "") + " ... took " + str(dur) + "ms")
return self
def close(self):
if self.__closed:
raise RuntimeError("Already closed")
self.join()
self.__closed = True
def __cms(self):
return int(round(time.time() * 1000))
def __reset_progress(self):
if self.__useprocesses:
self.__progress.value = 0
else:
self.__progress = 0
def __inc_progress(self, val):
if self.__useprocesses:
self.__progress.value += val
else:
self.__progress += val
def __get_progress(self):
if self.__useprocesses:
return self.__progress.value
else:
return self.__progress