diff options
-rwxr-xr-x | bjoern/videoanalyse/LogParser.py | 280 |
1 files changed, 140 insertions, 140 deletions
diff --git a/bjoern/videoanalyse/LogParser.py b/bjoern/videoanalyse/LogParser.py index 1104e7f..c314de7 100755 --- a/bjoern/videoanalyse/LogParser.py +++ b/bjoern/videoanalyse/LogParser.py @@ -1,140 +1,140 @@ -# -*- coding: utf-8 -*-
-"""
-This script takes multiple .log data files from PCScreenWatcher and a Firefox history database (places.sqlite)
-inside a folder to extract each activity title, their url if applicable and usage timestamps into a pseudo-csv file.
-"""
-
-import os
-import re
-from datetime import datetime
-import csv
-import codecs
-from read_sqlite import get_url_from_sqlite
-from pathlib import Path
-
-
-# takes the log data string and returns a list of activity titles and their time windows
-def extract_activities(log_data):
- BROWSER_TITLE_SUFFIX = " - Mozilla Firefox"
- # regex which matches between squared brackets
- reg_titles = re.compile("(?<=\[).*?(?=\])")
- # regex for total/active time
- reg_time = re.compile(
- "(?<!\()Total: \d*\.*\d*m*\d*\.*\d*s*, Active: \d*\.*\d*m*\d*\.*\d*s*"
- )
- # regex which matches number + . and splits into different strings
- reg_split = re.compile("\d+\. ")
-
- extracted_data = {}
- for log_date in log_data:
- windows = reg_split.split(log_date)
- brackets = []
-
- del_flag = True
- # extract squared brackets per string
- for s in windows:
- if not del_flag:
- found_brackets = reg_titles.findall(s)
- if found_brackets:
- # Only use Browser titles
- if found_brackets[0].endswith(BROWSER_TITLE_SUFFIX):
- title = found_brackets[0].replace(BROWSER_TITLE_SUFFIX, "")
- brackets.append(title)
- enter_exit = s.split("Enter-Exit: ")[-1]
- timestamps = reg_titles.findall(enter_exit)
- for timestamp in timestamps:
- t_enter, t_exit = timestamp.split("-")
- if not title in extracted_data:
- extracted_data[title] = []
- enter_date = datetime.strptime(t_enter, "%H:%M:%S").time()
- exit_date = datetime.strptime(t_exit, "%H:%M:%S").time()
- extracted_data[title].append((enter_date, exit_date))
- if "Activies in each window" in s:
- del_flag = False
- if "Restricted Windows Summary:" in s:
- del_flag = True
-
- return extracted_data
-
-
-def get_log_data(data_path):
- files = os.listdir(data_path)
- log_files = []
- log_data = []
- for s in files:
- if s.endswith(".log"):
- log_files.append(os.path.join(data_path, s))
- for l in log_files:
- with codecs.open(l, "r", "utf-8") as reader:
- log_data.append(reader.read())
- return log_data
-
-
-def get_history_db(data_path):
- files = os.listdir(data_path)
- for s in files:
- if s.endswith(".sqlite"):
- history_db = os.path.join(data_path, s)
- return history_db
- return None
-
-
-def match_urls(history_db, log):
- for entry in log:
- url = get_url_from_sqlite(history_db, entry[2])
- entry.append(url)
- return log
-
-
-def generate_log(activities: dict):
- # For each start time in ascending order, make an entry with title and timestamp
- log = []
- while activities:
- first_title = list(activities.keys())[0]
- smallest_start_time = (first_title, 0)
- for title in activities.keys():
- for idx, timestamp in enumerate(activities[title]):
- if (
- timestamp[0]
- < activities[smallest_start_time[0]][smallest_start_time[1]][0]
- ):
- smallest_start_time = (title, idx)
- log.append(
- [
- activities[smallest_start_time[0]][smallest_start_time[1]][
- 0
- ].isoformat(),
- activities[smallest_start_time[0]][smallest_start_time[1]][
- 1
- ].isoformat(),
- smallest_start_time[0],
- ]
- )
- del activities[smallest_start_time[0]][smallest_start_time[1]]
- if not activities[smallest_start_time[0]]:
- del activities[smallest_start_time[0]]
- return log
-
-
-def write_logfile(vp_dir, log):
- path = Path(f"{vp_dir}/logs.csv")
- with open(path, "w") as csvfile:
- writer = csv.writer(csvfile, delimiter=",", quoting=csv.QUOTE_NONNUMERIC)
- writer.writerow(["Starttime", "Endtime", "Title", "URL"])
- for row in log:
- writer.writerow(row)
-
-
-def main():
- for vp_dir in [f.name for f in os.scandir() if f.is_dir()]:
- print(vp_dir)
- log = extract_activities(get_log_data(vp_dir))
- log = generate_log(log)
-
- history = get_history_db(vp_dir)
- log = match_urls(history, log)
- write_logfile(vp_dir, log)
-
-
-if __name__ == "__main__":
- main()
+# -*- coding: utf-8 -*- +""" +This script takes multiple .log data files from PCScreenWatcher and a Firefox history database (places.sqlite) +inside a folder to extract each activity title, their url if applicable and usage timestamps into a pseudo-csv file. +""" + +import os +import re +from datetime import datetime +import csv +import codecs +from read_sqlite import get_url_from_sqlite +from pathlib import Path + + +# takes the log data string and returns a list of activity titles and their time windows +def extract_activities(log_data): + BROWSER_TITLE_SUFFIX = " - Mozilla Firefox" + # regex which matches between squared brackets + reg_titles = re.compile("(?<=\[).*?(?=\])") + # regex for total/active time + reg_time = re.compile( + "(?<!\()Total: \d*\.*\d*m*\d*\.*\d*s*, Active: \d*\.*\d*m*\d*\.*\d*s*" + ) + # regex which matches number + . and splits into different strings + reg_split = re.compile("\d+\. ") + + extracted_data = {} + for log_date in log_data: + windows = reg_split.split(log_date) + brackets = [] + + del_flag = True + # extract squared brackets per string + for s in windows: + if not del_flag: + found_brackets = reg_titles.findall(s) + if found_brackets: + # Only use Browser titles + if found_brackets[0].endswith(BROWSER_TITLE_SUFFIX): + title = found_brackets[0].replace(BROWSER_TITLE_SUFFIX, "") + brackets.append(title) + enter_exit = s.split("Enter-Exit: ")[-1] + timestamps = reg_titles.findall(enter_exit) + for timestamp in timestamps: + t_enter, t_exit = timestamp.split("-") + if not title in extracted_data: + extracted_data[title] = [] + enter_date = datetime.strptime(t_enter, "%H:%M:%S").time() + exit_date = datetime.strptime(t_exit, "%H:%M:%S").time() + extracted_data[title].append((enter_date, exit_date)) + if "Activies in each window" in s: + del_flag = False + if "Restricted Windows Summary:" in s: + del_flag = True + + return extracted_data + + +def get_log_data(data_path): + files = os.listdir(data_path) + log_files = [] + log_data = [] + for s in files: + if s.endswith(".log"): + log_files.append(os.path.join(data_path, s)) + for l in log_files: + with codecs.open(l, "r", "utf-8") as reader: + log_data.append(reader.read()) + return log_data + + +def get_history_db(data_path): + files = os.listdir(data_path) + for s in files: + if s.endswith(".sqlite"): + history_db = os.path.join(data_path, s) + return history_db + return None + + +def match_urls(history_db, log): + for entry in log: + url = get_url_from_sqlite(history_db, entry[2]) + entry.append(url) + return log + + +def generate_log(activities: dict): + # For each start time in ascending order, make an entry with title and timestamp + log = [] + while activities: + first_title = list(activities.keys())[0] + smallest_start_time = (first_title, 0) + for title in activities.keys(): + for idx, timestamp in enumerate(activities[title]): + if ( + timestamp[0] + < activities[smallest_start_time[0]][smallest_start_time[1]][0] + ): + smallest_start_time = (title, idx) + log.append( + [ + activities[smallest_start_time[0]][smallest_start_time[1]][ + 0 + ].isoformat(), + activities[smallest_start_time[0]][smallest_start_time[1]][ + 1 + ].isoformat(), + smallest_start_time[0], + ] + ) + del activities[smallest_start_time[0]][smallest_start_time[1]] + if not activities[smallest_start_time[0]]: + del activities[smallest_start_time[0]] + return log + + +def write_logfile(vp_dir, log): + path = Path(f"{vp_dir}/logs.csv") + with open(path, "w") as csvfile: + writer = csv.writer(csvfile, delimiter=",", quoting=csv.QUOTE_NONNUMERIC) + writer.writerow(["Starttime", "Endtime", "Title", "URL"]) + for row in log: + writer.writerow(row) + + +def main(): + for vp_dir in [f.name for f in os.scandir() if f.is_dir()]: + print(vp_dir) + log = extract_activities(get_log_data(vp_dir)) + log = generate_log(log) + + history = get_history_db(vp_dir) + log = match_urls(history, log) + write_logfile(vp_dir, log) + + +if __name__ == "__main__": + main() |