Files
master/loader.py
wea_ondara 4df2de7970 wip
2020-05-03 12:01:38 +02:00

293 lines
9.6 KiB
Python

import html
import multiprocessing
import operator
import re
import time
import xml.etree.cElementTree as et
from collections import defaultdict
from datetime import datetime
from mt import mt
import gc
TAG_RE = re.compile(r'<[^>]+>')
TAG_CODE = re.compile(r'<code.+</code>')
TAG_MATH = re.compile(r'<span class="math-container".+</span>')
printnoln = lambda text: print(text, end='', flush=True)
rprint = lambda text: print('\r' + text)
def dmt(data, progressinterval=5000): return mt(1, data, False, progressinterval)
def dmp(data, progressinterval=5000): return mt(multiprocessing.cpu_count(), data, False, progressinterval, True)
def cms(): return int(round(time.time() * 1000))
class Cache:
CACHED_FOLDER = None
CACHED_USERS = None
CACHED_POSTS = None
CACHED_FIRSTCONTRIB = None
CACHED_SUMCONTRIB = None
def load(folder):
if folder == Cache.CACHED_FOLDER:
return Cache.CACHED_USERS, Cache.CACHED_POSTS, Cache.CACHED_FIRSTCONTRIB, Cache.CACHED_SUMCONTRIB
Cache.CACHED_FOLDER = None
users = readUsers(folder + "/Users.xml")
gc.collect()
posts = readPosts(folder + "/Posts.xml")
gc.collect()
# get first contribution to page:
firstcontrib = computefirstcontrib(posts)
sumcontrib = computesumcontrib(posts)
Cache.CACHED_USERS = users
Cache.CACHED_POSTS = posts
Cache.CACHED_FIRSTCONTRIB = firstcontrib
Cache.CACHED_SUMCONTRIB = sumcontrib
Cache.CACHED_FOLDER = folder
return users, posts, firstcontrib, sumcontrib
def readVotes(folder):
file = folder + "/Votes.xml"
prefix = "readVotes: "
printnoln(prefix + "reading xml file ...")
now = cms()
items = [elem for event, elem in et.iterparse(file) if elem.tag == "row"]
rprint(prefix + "reading xml file ... took " + str(cms() - now) + "ms")
votes = dmt(items).map(mapvote, prefix + "mapping votes").getresults()
print(prefix + "done")
items = None
gc.collect()
return votes
VOTE_TAGS = ['PostId', 'VoteTypeId', 'CreationDate']
VOTE_DTAGS = ['CreationDate']
VOTE_ITAGS = ['PostId', 'VoteTypeId']
def mapvote(item):
vote = {tag: getTag(item, tag) for tag in VOTE_TAGS}
for tag in VOTE_DTAGS:
if vote[tag] is not None:
vote[tag] = datetime.fromisoformat(vote[tag])
else:
print("map vote: tag " + tag + " is None: " + str(vote))
for tag in VOTE_ITAGS:
vote[tag] = int(vote[tag])
return vote
def computesumcontrib(posts):
x1 = dmt(posts).map(lambda q: q['OwnerUserId'], "calc sum contrib q").getresults()
x2 = dmt(posts).map(lambda q: [a['OwnerUserId'] for a in q['Answers']], "calc sum contrib a").getresults()
# x3 = dmt(posts).map(lambda q: [c['OwnerUserId'] for a in q['Answers'] for c in a['Comments']] + [c['OwnerUserId'] for c in q['Comments']], "calc sum contrib c").getresults()
sumcontrib = defaultdict(int)
for id in x1:
sumcontrib[id] += 1
for y in x2:
for id in y:
sumcontrib[id] += 1
# for y in x3:
# for id in y:
# sumcontrib[id] += 1
return sumcontrib
def computefirstcontrib(posts):
x1 = dmt(posts).map(lambda q: (q['OwnerUserId'], q['CreationDate']), "calc first contrib q").getresults()
x2 = dmt(posts).map(lambda q: [(a['OwnerUserId'], a['CreationDate']) for a in q['Answers']], "calc first contrib a").getresults()
firstcontrib = defaultdict(list)
for (id, date) in x1:
firstcontrib[id].append(date)
for y in x2:
for (id, date) in y:
firstcontrib[id].append(date)
firstcontrib = {id: min(ldate) for (id, ldate) in firstcontrib.items()}
return firstcontrib
USER_TAGS = ['Id', 'CreationDate']
USER_DTAGS = ['CreationDate']
USER_ITAGS = ['Id']
def mapuser(item):
user = {tag: getTag(item, tag) for tag in USER_TAGS}
for tag in USER_DTAGS:
if user[tag] is not None:
user[tag] = datetime.fromisoformat(user[tag])
else:
print("map user: tag " + tag + " is None: " + str(user))
for tag in USER_ITAGS:
user[tag] = int(user[tag])
return user
Q_TAGS = ['Id', 'CreationDate', 'OwnerUserId', 'OwnerDisplayName', 'Score']
Q_DTAGS = ['CreationDate']
Q_ITAGS = ['Id', 'OwnerUserId', 'Score']
Q_BODY = 'Body'
def mapQuestion(item):
question = {tag: getTag(item, tag) for tag in Q_TAGS}
for tag in Q_DTAGS:
question[tag] = datetime.fromisoformat(question[tag])
for tag in Q_ITAGS:
question[tag] = int(question[tag]) if question[tag] is not None else None
#question[Q_BODY] = removetags(html.unescape(question[Q_BODY]))
return question
A_TAGS = ['Id', 'ParentId', 'CreationDate', 'Body', 'OwnerUserId', 'Score']
A_DTAGS = ['CreationDate']
A_ITAGS = ['Id', 'ParentId', 'OwnerUserId', 'Score']
A_BODY = 'Body'
def mapAnswer(item):
answer = {tag: getTag(item, tag) for tag in A_TAGS}
for tag in A_DTAGS:
answer[tag] = datetime.fromisoformat(answer[tag])
for tag in A_ITAGS:
answer[tag] = int(answer[tag]) if answer[tag] is not None else None
answer[A_BODY] = removetags(html.unescape(answer[A_BODY]))
return answer
# def mapComment(item):
# tags = ['Id', 'ParentId', 'CreationDate', 'Body', 'OwnerUserId']
# datetags = ['CreationDate']
# comment = {tag: getTag(item, tag) for tag in tags}
# for tag in datetags:
# comment[tag] = datetime.fromisoformat(comment[tag])
# comment['Body'] = removetags(html.unescape(comment['Body']))
# return comment
def readUsers(file):
prefix = "readUsers: "
printnoln(prefix + "reading xml file ...")
now = cms()
items = [elem for event, elem in et.iterparse(file) if elem.tag == "row"]
rprint(prefix + "reading xml file ... took " + str(cms() - now) + "ms")
users = dmt(items).map(mapuser, prefix + "mapping users").getresults()
print(prefix + "done")
return users
def readPosts(file):
prefix = "readPosts: "
printnoln(prefix + "reading xml file ...")
now = cms()
items = [elem for event, elem in et.iterparse(file) if elem.tag == "row"]
rprint(prefix + "reading xml file ... took " + str(cms() - now) + "ms")
print(prefix + "#posts total: " + str(len(items)))
questions = readQuestions(items)
questionids = set(dmt(questions).map(lambda q: q['Id'], prefix + "get question ids").getresults())
answers = readAnswers(items)
answerids = set(dmt(answers).map(lambda a: a['Id'], prefix + "get answer ids").getresults())
# comments = readComments(items)
# filter answers
answers = dmt(answers).filter(lambda a: a['ParentId'] in questionids, prefix + "filter answers by a.id in q.id").getresults()
# filter comments
# comments = dmt(comments).filter(lambda c: c['ParentId'] in questionids.union(answerids), prefix + "filter comments by c.id in q.id + a.id").getresults()
# create question answer mapping
printnoln(prefix + "create qamapping ...")
qamapping = {id: [] for id in questionids}
for a in answers:
qamapping[a['ParentId']].append(a)
rprint(prefix + "create qamapping ... done")
questions = dmt(questions).map(lambda q: setprop(q, 'Answers', qamapping[q['Id']]), prefix + "assign answers to questions").getresults()
# # create comment question comment answer mapping
# printnoln(prefix + "create qacmapping ...")
# qacmapping = {id: [] for id in questionids.union(answerids)}
# for c in comments:
# qacmapping[c['ParentId']].append(c)
# rprint(prefix + "create qacmapping ... done")
# answers = dmt(answers).map(lambda a: setprop(a, 'Comments', qacmapping[a['Id']]), prefix + "assign comments to answers").getresults()
# questions = dmt(questions).map(lambda q: setprop(q, 'Comments', qacmapping[q['Id']]), prefix + "assign comments to questions").getresults()
# safety check
countans = dmt(questions).map(lambda q: len(q['Answers']), prefix + "sum answer count") \
.reduce(operator.add, operator.add, lambda: 0, prefix + "sum answer count").getresults()
if countans != len(answers):
print(prefix + "countans != #answer: " + countans + " " + len(answers))
print(prefix + "done")
return questions
def readQuestions(items):
prefix = "readQuestions: "
questions = dmt(items).filter(lambda item: getTag(item, 'PostTypeId') == "1", prefix + "filter out questions") \
.map(mapQuestion, prefix + "mapping questions") \
.filter(lambda q: q['OwnerUserId'] is not None, prefix + "filter out broken questions").getresults()
print(prefix + "questions read: " + str(len(questions)))
return questions
def readAnswers(items):
prefix = "readAnswers: "
answers = dmt(items).filter(lambda item: getTag(item, 'PostTypeId') == "2", prefix + "filter out answers") \
.map(mapAnswer, prefix + "mapping answers") \
.filter(lambda q: q['OwnerUserId'] is not None, prefix + "filter out broken answers").getresults()
print(prefix + "answers read: " + str(len(answers)))
return answers
# def readComments(items):
# prefix = "readComments: "
# comments = dmt(items).filter(lambda item: getTag(item, 'PostTypeId') == "3", prefix + "filter out comments") \
# .map(mapComment, prefix + "mapping comments") \
# .filter(lambda c: c['OwnerUserId'] is not None, prefix + "filter out broken comments").getresults()
#
# print(prefix + "comments read: " + str(len(comments)))
# return comments
def getTag(item, tag):
return item.attrib.get(tag) if tagExists(item, tag) else None
def tagExists(item, tag):
return tag in item.attrib.keys()
def setprop(dic, key, value):
dic[key] = value
return dic
def removetags(text):
return TAG_RE.sub('', TAG_MATH.sub('', TAG_CODE.sub('', text)))