From 791eb3ee3f0e3e9d5ba1225eb6feed41915e035b Mon Sep 17 00:00:00 2001 From: Niclas Dobbertin <niclas.dobbertin@stud.tu-darmstadt.de> Date: Wed, 4 Oct 2023 12:34:02 +0200 Subject: change LogParser to unix line endings --- bjoern/videoanalyse/LogParser.py | 280 +++++++++++++++++++-------------------- 1 file changed, 140 insertions(+), 140 deletions(-) (limited to 'bjoern/videoanalyse') diff --git a/bjoern/videoanalyse/LogParser.py b/bjoern/videoanalyse/LogParser.py index 1104e7f..c314de7 100755 --- a/bjoern/videoanalyse/LogParser.py +++ b/bjoern/videoanalyse/LogParser.py @@ -1,140 +1,140 @@ -# -*- coding: utf-8 -*- -""" -This script takes multiple .log data files from PCScreenWatcher and a Firefox history database (places.sqlite) -inside a folder to extract each activity title, their url if applicable and usage timestamps into a pseudo-csv file. -""" - -import os -import re -from datetime import datetime -import csv -import codecs -from read_sqlite import get_url_from_sqlite -from pathlib import Path - - -# takes the log data string and returns a list of activity titles and their time windows -def extract_activities(log_data): - BROWSER_TITLE_SUFFIX = " - Mozilla Firefox" - # regex which matches between squared brackets - reg_titles = re.compile("(?<=\[).*?(?=\])") - # regex for total/active time - reg_time = re.compile( - "(?<!\()Total: \d*\.*\d*m*\d*\.*\d*s*, Active: \d*\.*\d*m*\d*\.*\d*s*" - ) - # regex which matches number + . and splits into different strings - reg_split = re.compile("\d+\. ") - - extracted_data = {} - for log_date in log_data: - windows = reg_split.split(log_date) - brackets = [] - - del_flag = True - # extract squared brackets per string - for s in windows: - if not del_flag: - found_brackets = reg_titles.findall(s) - if found_brackets: - # Only use Browser titles - if found_brackets[0].endswith(BROWSER_TITLE_SUFFIX): - title = found_brackets[0].replace(BROWSER_TITLE_SUFFIX, "") - brackets.append(title) - enter_exit = s.split("Enter-Exit: ")[-1] - timestamps = reg_titles.findall(enter_exit) - for timestamp in timestamps: - t_enter, t_exit = timestamp.split("-") - if not title in extracted_data: - extracted_data[title] = [] - enter_date = datetime.strptime(t_enter, "%H:%M:%S").time() - exit_date = datetime.strptime(t_exit, "%H:%M:%S").time() - extracted_data[title].append((enter_date, exit_date)) - if "Activies in each window" in s: - del_flag = False - if "Restricted Windows Summary:" in s: - del_flag = True - - return extracted_data - - -def get_log_data(data_path): - files = os.listdir(data_path) - log_files = [] - log_data = [] - for s in files: - if s.endswith(".log"): - log_files.append(os.path.join(data_path, s)) - for l in log_files: - with codecs.open(l, "r", "utf-8") as reader: - log_data.append(reader.read()) - return log_data - - -def get_history_db(data_path): - files = os.listdir(data_path) - for s in files: - if s.endswith(".sqlite"): - history_db = os.path.join(data_path, s) - return history_db - return None - - -def match_urls(history_db, log): - for entry in log: - url = get_url_from_sqlite(history_db, entry[2]) - entry.append(url) - return log - - -def generate_log(activities: dict): - # For each start time in ascending order, make an entry with title and timestamp - log = [] - while activities: - first_title = list(activities.keys())[0] - smallest_start_time = (first_title, 0) - for title in activities.keys(): - for idx, timestamp in enumerate(activities[title]): - if ( - timestamp[0] - < activities[smallest_start_time[0]][smallest_start_time[1]][0] - ): - smallest_start_time = (title, idx) - log.append( - [ - activities[smallest_start_time[0]][smallest_start_time[1]][ - 0 - ].isoformat(), - activities[smallest_start_time[0]][smallest_start_time[1]][ - 1 - ].isoformat(), - smallest_start_time[0], - ] - ) - del activities[smallest_start_time[0]][smallest_start_time[1]] - if not activities[smallest_start_time[0]]: - del activities[smallest_start_time[0]] - return log - - -def write_logfile(vp_dir, log): - path = Path(f"{vp_dir}/logs.csv") - with open(path, "w") as csvfile: - writer = csv.writer(csvfile, delimiter=",", quoting=csv.QUOTE_NONNUMERIC) - writer.writerow(["Starttime", "Endtime", "Title", "URL"]) - for row in log: - writer.writerow(row) - - -def main(): - for vp_dir in [f.name for f in os.scandir() if f.is_dir()]: - print(vp_dir) - log = extract_activities(get_log_data(vp_dir)) - log = generate_log(log) - - history = get_history_db(vp_dir) - log = match_urls(history, log) - write_logfile(vp_dir, log) - - -if __name__ == "__main__": - main() +# -*- coding: utf-8 -*- +""" +This script takes multiple .log data files from PCScreenWatcher and a Firefox history database (places.sqlite) +inside a folder to extract each activity title, their url if applicable and usage timestamps into a pseudo-csv file. +""" + +import os +import re +from datetime import datetime +import csv +import codecs +from read_sqlite import get_url_from_sqlite +from pathlib import Path + + +# takes the log data string and returns a list of activity titles and their time windows +def extract_activities(log_data): + BROWSER_TITLE_SUFFIX = " - Mozilla Firefox" + # regex which matches between squared brackets + reg_titles = re.compile("(?<=\[).*?(?=\])") + # regex for total/active time + reg_time = re.compile( + "(?<!\()Total: \d*\.*\d*m*\d*\.*\d*s*, Active: \d*\.*\d*m*\d*\.*\d*s*" + ) + # regex which matches number + . and splits into different strings + reg_split = re.compile("\d+\. ") + + extracted_data = {} + for log_date in log_data: + windows = reg_split.split(log_date) + brackets = [] + + del_flag = True + # extract squared brackets per string + for s in windows: + if not del_flag: + found_brackets = reg_titles.findall(s) + if found_brackets: + # Only use Browser titles + if found_brackets[0].endswith(BROWSER_TITLE_SUFFIX): + title = found_brackets[0].replace(BROWSER_TITLE_SUFFIX, "") + brackets.append(title) + enter_exit = s.split("Enter-Exit: ")[-1] + timestamps = reg_titles.findall(enter_exit) + for timestamp in timestamps: + t_enter, t_exit = timestamp.split("-") + if not title in extracted_data: + extracted_data[title] = [] + enter_date = datetime.strptime(t_enter, "%H:%M:%S").time() + exit_date = datetime.strptime(t_exit, "%H:%M:%S").time() + extracted_data[title].append((enter_date, exit_date)) + if "Activies in each window" in s: + del_flag = False + if "Restricted Windows Summary:" in s: + del_flag = True + + return extracted_data + + +def get_log_data(data_path): + files = os.listdir(data_path) + log_files = [] + log_data = [] + for s in files: + if s.endswith(".log"): + log_files.append(os.path.join(data_path, s)) + for l in log_files: + with codecs.open(l, "r", "utf-8") as reader: + log_data.append(reader.read()) + return log_data + + +def get_history_db(data_path): + files = os.listdir(data_path) + for s in files: + if s.endswith(".sqlite"): + history_db = os.path.join(data_path, s) + return history_db + return None + + +def match_urls(history_db, log): + for entry in log: + url = get_url_from_sqlite(history_db, entry[2]) + entry.append(url) + return log + + +def generate_log(activities: dict): + # For each start time in ascending order, make an entry with title and timestamp + log = [] + while activities: + first_title = list(activities.keys())[0] + smallest_start_time = (first_title, 0) + for title in activities.keys(): + for idx, timestamp in enumerate(activities[title]): + if ( + timestamp[0] + < activities[smallest_start_time[0]][smallest_start_time[1]][0] + ): + smallest_start_time = (title, idx) + log.append( + [ + activities[smallest_start_time[0]][smallest_start_time[1]][ + 0 + ].isoformat(), + activities[smallest_start_time[0]][smallest_start_time[1]][ + 1 + ].isoformat(), + smallest_start_time[0], + ] + ) + del activities[smallest_start_time[0]][smallest_start_time[1]] + if not activities[smallest_start_time[0]]: + del activities[smallest_start_time[0]] + return log + + +def write_logfile(vp_dir, log): + path = Path(f"{vp_dir}/logs.csv") + with open(path, "w") as csvfile: + writer = csv.writer(csvfile, delimiter=",", quoting=csv.QUOTE_NONNUMERIC) + writer.writerow(["Starttime", "Endtime", "Title", "URL"]) + for row in log: + writer.writerow(row) + + +def main(): + for vp_dir in [f.name for f in os.scandir() if f.is_dir()]: + print(vp_dir) + log = extract_activities(get_log_data(vp_dir)) + log = generate_log(log) + + history = get_history_db(vp_dir) + log = match_urls(history, log) + write_logfile(vp_dir, log) + + +if __name__ == "__main__": + main() -- cgit v1.2.3