summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorNiclas Dobbertin <niclas.dobbertin@stud.tu-darmstadt.de>2023-10-02 19:11:24 +0200
committerNiclas Dobbertin <niclas.dobbertin@stud.tu-darmstadt.de>2023-10-02 19:11:24 +0200
commitddab7c6cc5ba7e785aadb224f294284b0564acd6 (patch)
tree11e2eab853ee35a9d621c5d350cde9cfc9f74393
parent7266e2787bfa661d490be9c4463e707e6ffe1715 (diff)
refactor post processing into single script
-rw-r--r--bjoern/videoanalyse/combine_ocr-logs.py52
-rw-r--r--bjoern/videoanalyse/eval.py29
-rw-r--r--bjoern/videoanalyse/post_processing.py103
3 files changed, 13 insertions, 171 deletions
diff --git a/bjoern/videoanalyse/combine_ocr-logs.py b/bjoern/videoanalyse/combine_ocr-logs.py
deleted file mode 100644
index 1d99629..0000000
--- a/bjoern/videoanalyse/combine_ocr-logs.py
+++ /dev/null
@@ -1,52 +0,0 @@
-#!/usr/bin/env python3
-
-import argparse
-from pathlib import Path
-from datetime import datetime, timedelta
-import pandas as pd
-import csv
-
-argparser = argparse.ArgumentParser(
- description="Combines results of OCR analysis with log files"
-)
-argparser.add_argument(
- "vp_dir", help="Directory containing analysis_results.csv and VPCODE.csv"
-)
-
-args = argparser.parse_args()
-
-vp_path = Path(args.vp_dir)
-
-video_path = next(vp_path.glob("*.mkv"))
-date_format = "%Y-%m-%d %H-%M-%S"
-video_date = datetime.strptime(video_path.stem, date_format)
-print(video_date)
-# video_delta = timedelta(hours=video_date.hour, minutes=video_date.minute, seconds=video_date.second)
-
-def add_video_time_to_start(x, video_date):
- start = timedelta(seconds=int(round(x)))
- return (start + video_date).time().isoformat()
-
-analysis = pd.read_csv(vp_path / "analysis_results.csv")
-analysis["Starttime"] = analysis["start_time"].apply(add_video_time_to_start, args=(video_date,))
-
-logs = pd.read_csv(vp_path / f"{vp_path.name}.csv")
-
-def get_log_url(start_time):
- start_time = datetime.strptime(start_time, "%H:%M:%S")
-
- for _, row in logs.iterrows():
- log_start = datetime.strptime(row[0], "%H:%M:%S")
- log_end = datetime.strptime(row[1], "%H:%M:%S")
- if start_time >= log_start and start_time <= log_end:
- return row[3]
- return 0
-
-
-
-analysis["log_url"] = analysis.apply(
- lambda row: get_log_url(row.Starttime), axis=1
- )
-
-
-analysis.to_csv(vp_path / "merged.csv", quoting=csv.QUOTE_NONNUMERIC)
diff --git a/bjoern/videoanalyse/eval.py b/bjoern/videoanalyse/eval.py
deleted file mode 100644
index a917b7a..0000000
--- a/bjoern/videoanalyse/eval.py
+++ /dev/null
@@ -1,29 +0,0 @@
-#!/usr/bin/env python3
-
-import argparse
-from pathlib import Path
-import pandas as pd
-import csv
-from Levenshtein import distance as levendist
-
-
-argparser = argparse.ArgumentParser(
- description="Distance evaluation"
-)
-argparser.add_argument(
- "vp_dir", help="Directory containing merged.csv"
-)
-
-args = argparser.parse_args()
-
-vp_path = Path(args.vp_dir)
-
-df = pd.read_csv(vp_path / "merged.csv")
-
-df["levenshtein-distance"] = df.apply(
- lambda row: levendist(str(row.url), str(row.log_url)), axis=1
-)
-
-
-
-df.to_csv(vp_path / "metrics.csv", quoting=csv.QUOTE_NONNUMERIC)
diff --git a/bjoern/videoanalyse/post_processing.py b/bjoern/videoanalyse/post_processing.py
index 94ce067..a8d37c4 100644
--- a/bjoern/videoanalyse/post_processing.py
+++ b/bjoern/videoanalyse/post_processing.py
@@ -2,105 +2,28 @@
import argparse
from pathlib import Path
-import numpy as np
-import pandas as pd
-import Levenshtein
-import csv
-from itertools import groupby
-from operator import itemgetter
-from sklearn.metrics import pairwise_distances
-from sklearn.cluster import DBSCAN
from pprint import pprint
-argparser = argparse.ArgumentParser(description="Distance evaluation")
-argparser.add_argument("vp_dir", help="Directory containing metrics.csv")
+import utils
+
+argparser = argparse.ArgumentParser(description="OCR-Logfile evaluation")
+argparser.add_argument("vp_dir", help="VP Directory")
args = argparser.parse_args()
data_path = Path(args.vp_dir)
+video_path = next(data_path.glob("*.mkv"))
+ocr_path = data_path / "analysis_results.csv"
+log_path = data_path / f"{data_path.stem}.csv"
-# Read results.csv
-# with open(data_path / "metrics.csv", "r") as csvfile:
-# reader = csv.reader(csvfile, quotechar='"')
-# print(next(reader))
-#
-
-
-df = pd.read_csv(data_path / "metrics.csv")
-df = df.fillna("")
-
-
-# List with only urls
-all_urls = list(df["url"].values)
-urls = list(df["url"].values)
-
-
-def group_urls(urls):
- unique_urls = np.unique(urls)
-
- # TODO: casting deprecation np
- def levenshtein_from_idx(idx1, idx2):
- return Levenshtein.distance(unique_urls[int(idx1)], unique_urls[int(idx2)])
-
- X = np.searchsorted(unique_urls, list([[x] for x in urls]))
-
- distance_matrix = pairwise_distances(
- X=X, Y=None, metric=levenshtein_from_idx, n_jobs=-1
- )
- # TODO: eps and min_samples parameter
- db = DBSCAN(eps=10, min_samples=5, metric="precomputed").fit(distance_matrix)
- labels = db.labels_
- zipped = zip(urls, labels)
-
- # grouping solution from: https://www.geeksforgeeks.org/python-group-tuple-into-list-based-on-value/
- # create an empty dictionary to store the grouped tuples
- grouped_dict = {}
-
- # loop through the tuples in the list
- for tup in zipped:
- # get the second element of the tuple
- key = tup[1]
- # if the key is not already in the dictionary, add it with an empty list as value
- if key not in grouped_dict:
- grouped_dict[key] = []
- # append the current tuple to the list corresponding to the key in the dictionary
- grouped_dict[key].append(tup[0])
-
- # convert the dictionary values to lists and store in res
- url_groups = [v for _, v in grouped_dict.items()]
+df = utils.combine_ocr_logs(video_path, ocr_path, log_path)
+df = df.fillna('')
- return url_groups
+df = utils.calc_levenshtein_distance(df)
-url_groups = group_urls(urls)
+url_groups = utils.group_urls(list(df["url"].values))
pprint(len(url_groups))
-# # for every row check which group its url belongs to and add a column with group indices
-# # also add columns with longest/most frequent url in group
-with open(data_path / "metrics.csv", "r") as input_file, open(
- data_path / "metrics_grps.csv", "w", newline=""
-) as output_file:
- csv_reader = csv.reader(input_file, quotechar='"', quoting=csv.QUOTE_NONNUMERIC)
- csv_writer = csv.writer(output_file, quotechar='"', quoting=csv.QUOTE_NONNUMERIC)
- header = next(csv_reader)
- header.extend(
- [
- "group_index",
- "longest",
- "longest-distance",
- "most_frequent",
- "most_frequent-distance",
- ]
- )
- csv_writer.writerow(header)
- for row in csv_reader:
- for idx, grp in enumerate(url_groups):
- if row[3] in grp:
- row.append(idx)
- longest_in_grp = max(grp, key=len)
- row.append(longest_in_grp)
- row.append(Levenshtein.distance(row[6], longest_in_grp))
- most_frequent_in_grp = max(set(grp), key=grp.count)
- row.append(most_frequent_in_grp)
- row.append(Levenshtein.distance(row[6], most_frequent_in_grp))
- csv_writer.writerow(row)
+df.to_csv(f"{data_path}/metrics.csv")
+utils.write_grouped_metrics(df, url_groups, data_path)