This commit is contained in:
wea_ondara
2019-07-22 21:57:46 +02:00
parent 3d4b1f26ba
commit 661e3c810a
3 changed files with 126 additions and 18 deletions

52
mt.py
View File

@@ -1,5 +1,6 @@
from threading import Thread, Lock
import time
from math import ceil
class mt():
@@ -19,6 +20,7 @@ class mt():
self.__threads = []
self.__lock = Lock()
self.__results = []
self.__progress = 0
for i in range(self.__threadcount):
self.__results.append([])
self.__threads.append(None)
@@ -29,6 +31,7 @@ class mt():
if self.__running:
self.join()
self.__data = self.getresults()
self.__progress = 0
self.__running = True
self.__final = self.__getresultsmapfilter
self.__type = "filter"
@@ -45,7 +48,17 @@ class mt():
def __dofilter(self, i, list, cond):
now = self.__cms()
results = [l for l in list if cond(l)]
results = []
for j in range(ceil(len(list) / 1000)):
part = list[j * 1000: min((j + 1) * 1000, len(list))]
results += [l for l in part if cond(l)]
with self.__lock:
self.__progress += len(part)
if self.__comment is not None:
print("\r" + self.__comment + ": " + str(self.__progress) + "/" + str(len(self.__data)) + " ...", end='', flush=True)
# results = [l for l in list if cond(l)]
with self.__lock:
self.__results[i] = results
dur = self.__cms() - now
@@ -58,12 +71,13 @@ class mt():
if self.__running:
self.join()
self.__data = self.getresults()
self.__progress = 0
self.__running = True
self.__final = self.__getresultsmapfilter
self.__type = "map"
self.__comment = comment if comment is not None else ""
if comment is not None:
print(self.__comment + ": #" + str(len(self.__data)) + " ...", end='\n' if self.__verbose else '', flush=True)
print(self.__comment + ": 0/" + str(len(self.__data)) + " ...", end='\n' if self.__verbose else '', flush=True)
self.__starttime = self.__cms()
self.__endtime = None
for i in range(self.__threadcount):
@@ -74,7 +88,16 @@ class mt():
def __domap(self, i, list, func):
now = self.__cms()
results = [func(l) for l in list]
results = []
for j in range(ceil(len(list) / 1000)):
part = list[j * 1000: min((j + 1) * 1000, len(list))]
results += [func(l) for l in part]
with self.__lock:
self.__progress += len(part)
if self.__comment is not None:
print("\r" + self.__comment + ": " + str(self.__progress) + "/" + str(len(self.__data)) + " ...", end='', flush=True)
# results = [func(l) for l in list]
with self.__lock:
self.__results[i] = results
dur = self.__cms() - now
@@ -87,6 +110,7 @@ class mt():
if self.__running:
self.join()
self.__data = self.getresults()
self.__progress = 0
self.__running = True
self.__final = lambda: self.__getresultsreduce(aggregator, initval)
self.__type = "reduce"
@@ -104,8 +128,18 @@ class mt():
def __doreduce(self, i, list, reducer, initval):
now = self.__cms()
val = initval()
for j in range(len(list)):
val = reducer(val, list[j])
for j in range(ceil(len(list) / 1000)):
part = list[j * 1000: min((j + 1) * 1000, len(list))]
for k in range(len(part)):
val = reducer(val, part[k])
with self.__lock:
self.__progress += len(part)
if self.__comment is not None:
print("\r" + self.__comment + ": " + str(self.__progress) + "/" + str(len(self.__data)) + " ...", end='', flush=True)
# for j in range(len(list)):
# val = reducer(val, list[j])
with self.__lock:
self.__results[i] = val
dur = self.__cms() - now
@@ -136,13 +170,15 @@ class mt():
self.__threads[i].join()
self.__threads[i] = None
if self.__endtime is None:
self.__endtime = self.__cms();
self.__endtime = self.__cms()
if self.__comment is not None:
dur = self.__endtime - self.__starttime
if self.__verbose:
print(self.__comment + ": #" + str(len(self.__data)) + (" -> #" + str(sum([len(l) for l in self.__results])) if self.__type == "filter" else "") + " ... took " + str(dur) + "ms")
print(self.__comment + ": " + str(self.__progress) + "/" + str(len(self.__data)) + (
" -> #" + str(sum([len(l) for l in self.__results])) if self.__type == "filter" else "") + " ... took " + str(dur) + "ms")
else:
print("\r" + self.__comment + ": #" + str(len(self.__data)) + (" -> #" + str(sum([len(l) for l in self.__results])) if self.__type == "filter" else "") + " ... took " + str(dur) + "ms")
print("\r" + self.__comment + ": " + str(self.__progress) + "/" + str(len(self.__data)) + (
" -> #" + str(sum([len(l) for l in self.__results])) if self.__type == "filter" else "") + " ... took " + str(dur) + "ms")
return self
def close(self):