From ddab7c6cc5ba7e785aadb224f294284b0564acd6 Mon Sep 17 00:00:00 2001 From: Niclas Dobbertin Date: Mon, 2 Oct 2023 19:11:24 +0200 Subject: refactor post processing into single script --- bjoern/videoanalyse/combine_ocr-logs.py | 52 ---------------- bjoern/videoanalyse/eval.py | 29 --------- bjoern/videoanalyse/post_processing.py | 103 ++++---------------------------- 3 files changed, 13 insertions(+), 171 deletions(-) delete mode 100644 bjoern/videoanalyse/combine_ocr-logs.py delete mode 100644 bjoern/videoanalyse/eval.py (limited to 'bjoern') diff --git a/bjoern/videoanalyse/combine_ocr-logs.py b/bjoern/videoanalyse/combine_ocr-logs.py deleted file mode 100644 index 1d99629..0000000 --- a/bjoern/videoanalyse/combine_ocr-logs.py +++ /dev/null @@ -1,52 +0,0 @@ -#!/usr/bin/env python3 - -import argparse -from pathlib import Path -from datetime import datetime, timedelta -import pandas as pd -import csv - -argparser = argparse.ArgumentParser( - description="Combines results of OCR analysis with log files" -) -argparser.add_argument( - "vp_dir", help="Directory containing analysis_results.csv and VPCODE.csv" -) - -args = argparser.parse_args() - -vp_path = Path(args.vp_dir) - -video_path = next(vp_path.glob("*.mkv")) -date_format = "%Y-%m-%d %H-%M-%S" -video_date = datetime.strptime(video_path.stem, date_format) -print(video_date) -# video_delta = timedelta(hours=video_date.hour, minutes=video_date.minute, seconds=video_date.second) - -def add_video_time_to_start(x, video_date): - start = timedelta(seconds=int(round(x))) - return (start + video_date).time().isoformat() - -analysis = pd.read_csv(vp_path / "analysis_results.csv") -analysis["Starttime"] = analysis["start_time"].apply(add_video_time_to_start, args=(video_date,)) - -logs = pd.read_csv(vp_path / f"{vp_path.name}.csv") - -def get_log_url(start_time): - start_time = datetime.strptime(start_time, "%H:%M:%S") - - for _, row in logs.iterrows(): - log_start = datetime.strptime(row[0], "%H:%M:%S") - log_end = datetime.strptime(row[1], "%H:%M:%S") - if start_time >= log_start and start_time <= log_end: - return row[3] - return 0 - - - -analysis["log_url"] = analysis.apply( - lambda row: get_log_url(row.Starttime), axis=1 - ) - - -analysis.to_csv(vp_path / "merged.csv", quoting=csv.QUOTE_NONNUMERIC) diff --git a/bjoern/videoanalyse/eval.py b/bjoern/videoanalyse/eval.py deleted file mode 100644 index a917b7a..0000000 --- a/bjoern/videoanalyse/eval.py +++ /dev/null @@ -1,29 +0,0 @@ -#!/usr/bin/env python3 - -import argparse -from pathlib import Path -import pandas as pd -import csv -from Levenshtein import distance as levendist - - -argparser = argparse.ArgumentParser( - description="Distance evaluation" -) -argparser.add_argument( - "vp_dir", help="Directory containing merged.csv" -) - -args = argparser.parse_args() - -vp_path = Path(args.vp_dir) - -df = pd.read_csv(vp_path / "merged.csv") - -df["levenshtein-distance"] = df.apply( - lambda row: levendist(str(row.url), str(row.log_url)), axis=1 -) - - - -df.to_csv(vp_path / "metrics.csv", quoting=csv.QUOTE_NONNUMERIC) diff --git a/bjoern/videoanalyse/post_processing.py b/bjoern/videoanalyse/post_processing.py index 94ce067..a8d37c4 100644 --- a/bjoern/videoanalyse/post_processing.py +++ b/bjoern/videoanalyse/post_processing.py @@ -2,105 +2,28 @@ import argparse from pathlib import Path -import numpy as np -import pandas as pd -import Levenshtein -import csv -from itertools import groupby -from operator import itemgetter -from sklearn.metrics import pairwise_distances -from sklearn.cluster import DBSCAN from pprint import pprint -argparser = argparse.ArgumentParser(description="Distance evaluation") -argparser.add_argument("vp_dir", help="Directory containing metrics.csv") +import utils + +argparser = argparse.ArgumentParser(description="OCR-Logfile evaluation") +argparser.add_argument("vp_dir", help="VP Directory") args = argparser.parse_args() data_path = Path(args.vp_dir) +video_path = next(data_path.glob("*.mkv")) +ocr_path = data_path / "analysis_results.csv" +log_path = data_path / f"{data_path.stem}.csv" -# Read results.csv -# with open(data_path / "metrics.csv", "r") as csvfile: -# reader = csv.reader(csvfile, quotechar='"') -# print(next(reader)) -# - - -df = pd.read_csv(data_path / "metrics.csv") -df = df.fillna("") - - -# List with only urls -all_urls = list(df["url"].values) -urls = list(df["url"].values) - - -def group_urls(urls): - unique_urls = np.unique(urls) - - # TODO: casting deprecation np - def levenshtein_from_idx(idx1, idx2): - return Levenshtein.distance(unique_urls[int(idx1)], unique_urls[int(idx2)]) - - X = np.searchsorted(unique_urls, list([[x] for x in urls])) - - distance_matrix = pairwise_distances( - X=X, Y=None, metric=levenshtein_from_idx, n_jobs=-1 - ) - # TODO: eps and min_samples parameter - db = DBSCAN(eps=10, min_samples=5, metric="precomputed").fit(distance_matrix) - labels = db.labels_ - zipped = zip(urls, labels) - - # grouping solution from: https://www.geeksforgeeks.org/python-group-tuple-into-list-based-on-value/ - # create an empty dictionary to store the grouped tuples - grouped_dict = {} - - # loop through the tuples in the list - for tup in zipped: - # get the second element of the tuple - key = tup[1] - # if the key is not already in the dictionary, add it with an empty list as value - if key not in grouped_dict: - grouped_dict[key] = [] - # append the current tuple to the list corresponding to the key in the dictionary - grouped_dict[key].append(tup[0]) - - # convert the dictionary values to lists and store in res - url_groups = [v for _, v in grouped_dict.items()] +df = utils.combine_ocr_logs(video_path, ocr_path, log_path) +df = df.fillna('') - return url_groups +df = utils.calc_levenshtein_distance(df) -url_groups = group_urls(urls) +url_groups = utils.group_urls(list(df["url"].values)) pprint(len(url_groups)) -# # for every row check which group its url belongs to and add a column with group indices -# # also add columns with longest/most frequent url in group -with open(data_path / "metrics.csv", "r") as input_file, open( - data_path / "metrics_grps.csv", "w", newline="" -) as output_file: - csv_reader = csv.reader(input_file, quotechar='"', quoting=csv.QUOTE_NONNUMERIC) - csv_writer = csv.writer(output_file, quotechar='"', quoting=csv.QUOTE_NONNUMERIC) - header = next(csv_reader) - header.extend( - [ - "group_index", - "longest", - "longest-distance", - "most_frequent", - "most_frequent-distance", - ] - ) - csv_writer.writerow(header) - for row in csv_reader: - for idx, grp in enumerate(url_groups): - if row[3] in grp: - row.append(idx) - longest_in_grp = max(grp, key=len) - row.append(longest_in_grp) - row.append(Levenshtein.distance(row[6], longest_in_grp)) - most_frequent_in_grp = max(set(grp), key=grp.count) - row.append(most_frequent_in_grp) - row.append(Levenshtein.distance(row[6], most_frequent_in_grp)) - csv_writer.writerow(row) +df.to_csv(f"{data_path}/metrics.csv") +utils.write_grouped_metrics(df, url_groups, data_path) -- cgit v1.2.3