wip
This commit is contained in:
46
mt.py
46
mt.py
@@ -1,10 +1,10 @@
|
||||
from threading import Thread, Lock
|
||||
import time
|
||||
from math import ceil
|
||||
from threading import Thread, Lock
|
||||
|
||||
|
||||
class mt():
|
||||
def __init__(self, threads, data, verbose=False):
|
||||
def __init__(self, threads, data, verbose=False, progressinterval=1000):
|
||||
self.__running = False
|
||||
self.__closed = False
|
||||
self.__data = data
|
||||
@@ -21,6 +21,7 @@ class mt():
|
||||
self.__lock = Lock()
|
||||
self.__results = []
|
||||
self.__progress = 0
|
||||
self.__progressinterval = progressinterval
|
||||
for i in range(self.__threadcount):
|
||||
self.__results.append([])
|
||||
self.__threads.append(None)
|
||||
@@ -35,9 +36,9 @@ class mt():
|
||||
self.__running = True
|
||||
self.__final = self.__getresultsmapfilter
|
||||
self.__type = "filter"
|
||||
self.__comment = comment if comment is not None else ""
|
||||
self.__comment = comment
|
||||
if comment is not None:
|
||||
print(self.__comment + ": #" + str(len(self.__data)) + " ...", end='\n' if self.__verbose else '', flush=True)
|
||||
print(self.__comment + ": 0/" + str(len(self.__data)) + " ...", end='\n' if self.__verbose else '', flush=True)
|
||||
self.__starttime = self.__cms()
|
||||
self.__endtime = None
|
||||
for i in range(self.__threadcount):
|
||||
@@ -50,20 +51,22 @@ class mt():
|
||||
now = self.__cms()
|
||||
|
||||
results = []
|
||||
for j in range(ceil(len(list) / 1000)):
|
||||
part = list[j * 1000: min((j + 1) * 1000, len(list))]
|
||||
for j in range(ceil(len(list) / self.__progressinterval)):
|
||||
part = list[j * self.__progressinterval: min((j + 1) * self.__progressinterval, len(list))]
|
||||
results += [l for l in part if cond(l)]
|
||||
with self.__lock:
|
||||
self.__progress += len(part)
|
||||
if self.__comment is not None:
|
||||
print("\r" + self.__comment + ": " + str(self.__progress) + "/" + str(len(self.__data)) + " ...", end='', flush=True)
|
||||
|
||||
# results = [l for l in list if cond(l)]
|
||||
with self.__lock:
|
||||
self.__results[i] = results
|
||||
dur = self.__cms() - now
|
||||
if self.__verbose:
|
||||
print(self.__comment + ": Thread " + str(i) + ": filter took " + str(dur) + "ms")
|
||||
if self.__comment is not None:
|
||||
print(self.__comment + ": Thread " + str(i) + ": filter took " + str(dur) + "ms")
|
||||
else:
|
||||
print("Thread " + str(i) + ": filter took " + str(dur) + "ms")
|
||||
|
||||
def map(self, func, comment=None):
|
||||
if self.__closed:
|
||||
@@ -75,7 +78,7 @@ class mt():
|
||||
self.__running = True
|
||||
self.__final = self.__getresultsmapfilter
|
||||
self.__type = "map"
|
||||
self.__comment = comment if comment is not None else ""
|
||||
self.__comment = comment
|
||||
if comment is not None:
|
||||
print(self.__comment + ": 0/" + str(len(self.__data)) + " ...", end='\n' if self.__verbose else '', flush=True)
|
||||
self.__starttime = self.__cms()
|
||||
@@ -89,20 +92,22 @@ class mt():
|
||||
def __domap(self, i, list, func):
|
||||
now = self.__cms()
|
||||
results = []
|
||||
for j in range(ceil(len(list) / 1000)):
|
||||
part = list[j * 1000: min((j + 1) * 1000, len(list))]
|
||||
for j in range(ceil(len(list) / self.__progressinterval)):
|
||||
part = list[j * self.__progressinterval: min((j + 1) * self.__progressinterval, len(list))]
|
||||
results += [func(l) for l in part]
|
||||
with self.__lock:
|
||||
self.__progress += len(part)
|
||||
if self.__comment is not None:
|
||||
print("\r" + self.__comment + ": " + str(self.__progress) + "/" + str(len(self.__data)) + " ...", end='', flush=True)
|
||||
|
||||
# results = [func(l) for l in list]
|
||||
with self.__lock:
|
||||
self.__results[i] = results
|
||||
dur = self.__cms() - now
|
||||
if self.__verbose:
|
||||
print(self.__comment + ": Thread " + str(i) + ": map took " + str(dur) + "ms")
|
||||
if self.__comment is not None:
|
||||
print(self.__comment + ": Thread " + str(i) + ": filter took " + str(dur) + "ms")
|
||||
else:
|
||||
print("Thread " + str(i) + ": filter took " + str(dur) + "ms")
|
||||
|
||||
def reduce(self, reducer, aggregator, initval, comment=None):
|
||||
if self.__closed:
|
||||
@@ -114,9 +119,9 @@ class mt():
|
||||
self.__running = True
|
||||
self.__final = lambda: self.__getresultsreduce(aggregator, initval)
|
||||
self.__type = "reduce"
|
||||
self.__comment = comment if comment is not None else ""
|
||||
self.__comment = comment
|
||||
if comment is not None:
|
||||
print(self.__comment + ": #" + str(len(self.__data)) + " ...", end='\n' if self.__verbose else '', flush=True)
|
||||
print(self.__comment + ": 0/" + str(len(self.__data)) + " ...", end='\n' if self.__verbose else '', flush=True)
|
||||
self.__starttime = self.__cms()
|
||||
self.__endtime = None
|
||||
for i in range(self.__threadcount):
|
||||
@@ -129,8 +134,8 @@ class mt():
|
||||
now = self.__cms()
|
||||
val = initval()
|
||||
|
||||
for j in range(ceil(len(list) / 1000)):
|
||||
part = list[j * 1000: min((j + 1) * 1000, len(list))]
|
||||
for j in range(ceil(len(list) / self.__progressinterval)):
|
||||
part = list[j * self.__progressinterval: min((j + 1) * self.__progressinterval, len(list))]
|
||||
for k in range(len(part)):
|
||||
val = reducer(val, part[k])
|
||||
with self.__lock:
|
||||
@@ -138,13 +143,14 @@ class mt():
|
||||
if self.__comment is not None:
|
||||
print("\r" + self.__comment + ": " + str(self.__progress) + "/" + str(len(self.__data)) + " ...", end='', flush=True)
|
||||
|
||||
# for j in range(len(list)):
|
||||
# val = reducer(val, list[j])
|
||||
with self.__lock:
|
||||
self.__results[i] = val
|
||||
dur = self.__cms() - now
|
||||
if self.__verbose:
|
||||
print(self.__comment + ": Thread " + str(i) + ": reduce took " + str(dur) + "ms")
|
||||
if self.__comment is not None:
|
||||
print(self.__comment + ": Thread " + str(i) + ": filter took " + str(dur) + "ms")
|
||||
else:
|
||||
print("Thread " + str(i) + ": filter took " + str(dur) + "ms")
|
||||
|
||||
def getresults(self):
|
||||
self.join()
|
||||
|
||||
Reference in New Issue
Block a user