import html import multiprocessing import operator import re import time import xml.etree.cElementTree as et from collections import defaultdict from datetime import datetime from mt import mt import gc TAG_RE = re.compile(r'<[^>]+>') TAG_CODE = re.compile(r'') printnoln = lambda text: print(text, end='', flush=True) rprint = lambda text: print('\r' + text) def dmt(data, progressinterval=5000): return mt(1, data, False, progressinterval) def dmp(data, progressinterval=5000): return mt(multiprocessing.cpu_count(), data, False, progressinterval, True) def cms(): return int(round(time.time() * 1000)) class Cache: CACHED_FOLDER = None CACHED_USERS = None CACHED_POSTS = None CACHED_FIRSTCONTRIB = None CACHED_SUMCONTRIB = None def load(folder): if folder == Cache.CACHED_FOLDER: return Cache.CACHED_USERS, Cache.CACHED_POSTS, Cache.CACHED_FIRSTCONTRIB, Cache.CACHED_SUMCONTRIB Cache.CACHED_FOLDER = None users = readUsers(folder + "/Users.xml") gc.collect() posts = readPosts(folder + "/Posts.xml") gc.collect() # get first contribution to page: firstcontrib = computefirstcontrib(posts) sumcontrib = computesumcontrib(posts) Cache.CACHED_USERS = users Cache.CACHED_POSTS = posts Cache.CACHED_FIRSTCONTRIB = firstcontrib Cache.CACHED_SUMCONTRIB = sumcontrib Cache.CACHED_FOLDER = folder return users, posts, firstcontrib, sumcontrib def readVotes(folder): file = folder + "/Votes.xml" prefix = "readVotes: " printnoln(prefix + "reading xml file ...") now = cms() items = [elem for event, elem in et.iterparse(file) if elem.tag == "row"] rprint(prefix + "reading xml file ... took " + str(cms() - now) + "ms") votes = dmt(items).map(mapvote, prefix + "mapping votes").getresults() print(prefix + "done") return votes VOTE_TAGS = ['PostId', 'VoteTypeId', 'CreationDate'] VOTE_DTAGS = ['CreationDate'] VOTE_ITAGS = ['PostId', 'VoteTypeId'] def mapvote(item): vote = {tag: getTag(item, tag) for tag in VOTE_TAGS} for tag in VOTE_DTAGS: if vote[tag] is not None: vote[tag] = datetime.fromisoformat(vote[tag]) else: print("map vote: tag " + tag + " is None: " + str(vote)) for tag in VOTE_ITAGS: vote[tag] = int(vote[tag]) return vote def computesumcontrib(posts): x1 = dmt(posts).map(lambda q: q['OwnerUserId'], "calc sum contrib q").getresults() x2 = dmt(posts).map(lambda q: [a['OwnerUserId'] for a in q['Answers']], "calc sum contrib a").getresults() # x3 = dmt(posts).map(lambda q: [c['OwnerUserId'] for a in q['Answers'] for c in a['Comments']] + [c['OwnerUserId'] for c in q['Comments']], "calc sum contrib c").getresults() sumcontrib = defaultdict(int) for id in x1: sumcontrib[id] += 1 for y in x2: for id in y: sumcontrib[id] += 1 # for y in x3: # for id in y: # sumcontrib[id] += 1 return sumcontrib def computefirstcontrib(posts): x1 = dmt(posts).map(lambda q: (q['OwnerUserId'], q['CreationDate']), "calc first contrib q").getresults() x2 = dmt(posts).map(lambda q: [(a['OwnerUserId'], a['CreationDate']) for a in q['Answers']], "calc first contrib a").getresults() firstcontrib = defaultdict(list) for (id, date) in x1: firstcontrib[id].append(date) for y in x2: for (id, date) in y: firstcontrib[id].append(date) firstcontrib = {id: min(ldate) for (id, ldate) in firstcontrib.items()} return firstcontrib USER_TAGS = ['Id', 'CreationDate'] USER_DTAGS = ['CreationDate'] USER_ITAGS = ['Id'] def mapuser(item): user = {tag: getTag(item, tag) for tag in USER_TAGS} for tag in USER_DTAGS: if user[tag] is not None: user[tag] = datetime.fromisoformat(user[tag]) else: print("map user: tag " + tag + " is None: " + str(user)) for tag in USER_ITAGS: user[tag] = int(user[tag]) return user Q_TAGS = ['Id', 'CreationDate', 'Body', 'Title', 'OwnerUserId', 'OwnerDisplayName', 'Score'] Q_DTAGS = ['CreationDate'] Q_ITAGS = ['Id', 'OwnerUserId', 'Score'] Q_BODY = 'Body' def mapQuestion(item): question = {tag: getTag(item, tag) for tag in Q_TAGS} for tag in Q_DTAGS: question[tag] = datetime.fromisoformat(question[tag]) for tag in Q_ITAGS: question[tag] = int(question[tag]) if question[tag] is not None else None question[Q_BODY] = removetags(html.unescape(question[Q_BODY])) return question A_TAGS = ['Id', 'ParentId', 'CreationDate', 'Body', 'OwnerUserId', 'Score'] A_DTAGS = ['CreationDate'] A_ITAGS = ['Id', 'ParentId', 'OwnerUserId', 'Score'] def mapAnswer(item): answer = {tag: getTag(item, tag) for tag in A_TAGS} for tag in A_DTAGS: answer[tag] = datetime.fromisoformat(answer[tag]) for tag in A_ITAGS: answer[tag] = int(answer[tag]) if answer[tag] is not None else None answer['Body'] = removetags(html.unescape(answer['Body'])) return answer # def mapComment(item): # tags = ['Id', 'ParentId', 'CreationDate', 'Body', 'OwnerUserId'] # datetags = ['CreationDate'] # comment = {tag: getTag(item, tag) for tag in tags} # for tag in datetags: # comment[tag] = datetime.fromisoformat(comment[tag]) # comment['Body'] = removetags(html.unescape(comment['Body'])) # return comment def readUsers(file): prefix = "readUsers: " printnoln(prefix + "reading xml file ...") now = cms() items = [elem for event, elem in et.iterparse(file) if elem.tag == "row"] rprint(prefix + "reading xml file ... took " + str(cms() - now) + "ms") users = dmt(items).map(mapuser, prefix + "mapping users").getresults() print(prefix + "done") return users def readPosts(file): prefix = "readPosts: " printnoln(prefix + "reading xml file ...") now = cms() items = [elem for event, elem in et.iterparse(file) if elem.tag == "row"] rprint(prefix + "reading xml file ... took " + str(cms() - now) + "ms") print(prefix + "#posts total: " + str(len(items))) questions = readQuestions(items) questionids = set(dmt(questions).map(lambda q: q['Id'], prefix + "get question ids").getresults()) answers = readAnswers(items) answerids = set(dmt(answers).map(lambda a: a['Id'], prefix + "get answer ids").getresults()) # comments = readComments(items) # filter answers answers = dmt(answers).filter(lambda a: a['ParentId'] in questionids, prefix + "filter answers by a.id in q.id").getresults() # filter comments # comments = dmt(comments).filter(lambda c: c['ParentId'] in questionids.union(answerids), prefix + "filter comments by c.id in q.id + a.id").getresults() # create question answer mapping printnoln(prefix + "create qamapping ...") qamapping = {id: [] for id in questionids} for a in answers: qamapping[a['ParentId']].append(a) rprint(prefix + "create qamapping ... done") questions = dmt(questions).map(lambda q: setprop(q, 'Answers', qamapping[q['Id']]), prefix + "assign answers to questions").getresults() # # create comment question comment answer mapping # printnoln(prefix + "create qacmapping ...") # qacmapping = {id: [] for id in questionids.union(answerids)} # for c in comments: # qacmapping[c['ParentId']].append(c) # rprint(prefix + "create qacmapping ... done") # answers = dmt(answers).map(lambda a: setprop(a, 'Comments', qacmapping[a['Id']]), prefix + "assign comments to answers").getresults() # questions = dmt(questions).map(lambda q: setprop(q, 'Comments', qacmapping[q['Id']]), prefix + "assign comments to questions").getresults() # safety check countans = dmt(questions).map(lambda q: len(q['Answers']), prefix + "sum answer count") \ .reduce(operator.add, operator.add, lambda: 0, prefix + "sum answer count").getresults() if countans != len(answers): print(prefix + "countans != #answer: " + countans + " " + len(answers)) print(prefix + "done") return questions def readQuestions(items): prefix = "readQuestions: " questions = dmt(items).filter(lambda item: getTag(item, 'PostTypeId') == "1", prefix + "filter out questions") \ .map(mapQuestion, prefix + "mapping questions") \ .filter(lambda q: q['OwnerUserId'] is not None, prefix + "filter out broken questions").getresults() print(prefix + "questions read: " + str(len(questions))) return questions def readAnswers(items): prefix = "readAnswers: " answers = dmt(items).filter(lambda item: getTag(item, 'PostTypeId') == "2", prefix + "filter out answers") \ .map(mapAnswer, prefix + "mapping answers") \ .filter(lambda q: q['OwnerUserId'] is not None, prefix + "filter out broken answers").getresults() print(prefix + "answers read: " + str(len(answers))) return answers # def readComments(items): # prefix = "readComments: " # comments = dmt(items).filter(lambda item: getTag(item, 'PostTypeId') == "3", prefix + "filter out comments") \ # .map(mapComment, prefix + "mapping comments") \ # .filter(lambda c: c['OwnerUserId'] is not None, prefix + "filter out broken comments").getresults() # # print(prefix + "comments read: " + str(len(comments))) # return comments def getTag(item, tag): return item.attrib.get(tag) if tagExists(item, tag) else None def tagExists(item, tag): return tag in item.attrib.keys() def setprop(dic, key, value): dic[key] = value return dic def removetags(text): return TAG_RE.sub('', TAG_CODE.sub('', text))