summaryrefslogtreecommitdiff
path: root/bjoern/videoanalyse
diff options
context:
space:
mode:
Diffstat (limited to 'bjoern/videoanalyse')
-rw-r--r--bjoern/videoanalyse/utils.py125
1 files changed, 125 insertions, 0 deletions
diff --git a/bjoern/videoanalyse/utils.py b/bjoern/videoanalyse/utils.py
new file mode 100644
index 0000000..e060a89
--- /dev/null
+++ b/bjoern/videoanalyse/utils.py
@@ -0,0 +1,125 @@
+#!/usr/bin/env python3
+
+import csv
+from datetime import datetime, timedelta
+
+import numpy as np
+import pandas as pd
+from Levenshtein import distance as levendist
+from sklearn.cluster import DBSCAN
+from sklearn.metrics import pairwise_distances
+
+
+def combine_ocr_logs(video_path, ocr_path, log_path):
+ date_format = "%Y-%m-%d %H-%M-%S"
+ video_date = datetime.strptime(video_path.stem, date_format)
+ print(video_date)
+ # video_delta = timedelta(hours=video_date.hour, minutes=video_date.minute, seconds=video_date.second)
+
+ def add_video_time_to_start(x, video_date):
+ start = timedelta(seconds=int(round(x)))
+ return (start + video_date).time().isoformat()
+
+ # analysis = pd.read_csv(vp_path / "analysis_results.csv")
+ analysis = pd.read_csv(ocr_path)
+ analysis["Starttime"] = analysis["start_time"].apply(
+ add_video_time_to_start, args=(video_date,)
+ )
+
+ # logs = pd.read_csv(vp_path / f"{vp_path.name}.csv")
+ logs = pd.read_csv(log_path)
+
+ def get_log_url(start_time):
+ start_time = datetime.strptime(start_time, "%H:%M:%S")
+
+ for _, row in logs.iterrows():
+ log_start = datetime.strptime(row[0], "%H:%M:%S")
+ log_end = datetime.strptime(row[1], "%H:%M:%S")
+ if start_time >= log_start and start_time <= log_end:
+ return row[3]
+ return 0
+
+ analysis["log_url"] = analysis.apply(lambda row: get_log_url(row.Starttime), axis=1)
+
+ # analysis.to_csv(vp_path / "merged.csv", quoting=csv.QUOTE_NONNUMERIC)
+ return analysis
+
+
+def calc_levenshtein_distance(df):
+ df["levenshtein-distance"] = df.apply(
+ lambda row: levendist(str(row.url), str(row.log_url)), axis=1
+ )
+ return df
+
+
+def group_urls(urls):
+ unique_urls = np.unique(urls)
+
+ # TODO: casting deprecation np
+ def levenshtein_from_idx(idx1, idx2):
+ return levendist(unique_urls[int(idx1)], unique_urls[int(idx2)])
+
+ X = np.searchsorted(unique_urls, list([[x] for x in urls]))
+
+ distance_matrix = pairwise_distances(
+ X=X, Y=None, metric=levenshtein_from_idx, n_jobs=-1
+ )
+ # TODO: eps and min_samples parameter
+ db = DBSCAN(eps=10, min_samples=5, metric="precomputed").fit(distance_matrix)
+ labels = db.labels_
+ zipped = zip(urls, labels)
+
+ # grouping solution from: https://www.geeksforgeeks.org/python-group-tuple-into-list-based-on-value/
+ # create an empty dictionary to store the grouped tuples
+ grouped_dict = {}
+
+ # loop through the tuples in the list
+ for tup in zipped:
+ # get the second element of the tuple
+ key = tup[1]
+ # if the key is not already in the dictionary, add it with an empty list as value
+ if key not in grouped_dict:
+ grouped_dict[key] = []
+ # append the current tuple to the list corresponding to the key in the dictionary
+ grouped_dict[key].append(tup[0])
+
+ # convert the dictionary values to lists and store in res
+ url_groups = [v for _, v in grouped_dict.items()]
+
+ return url_groups
+
+
+# TODO: use df instead of csv reader
+# TODO: return df instead of writing to file
+def write_grouped_metrics(df, url_groups, data_path):
+ # # for every row check which group its url belongs to and add a column with group indices
+ # # also add columns with longest/most frequent url in group
+ with open(data_path / "metrics.csv", "r") as input_file, open(
+ data_path / "metrics_grps.csv", "w", newline=""
+ ) as output_file:
+ csv_reader = csv.reader(input_file, quotechar='"')
+ csv_writer = csv.writer(
+ output_file, quotechar='"', quoting=csv.QUOTE_NONNUMERIC
+ )
+ header = next(csv_reader)
+ header.extend(
+ [
+ "group_index",
+ "longest",
+ "longest-distance",
+ "most_frequent",
+ "most_frequent-distance",
+ ]
+ )
+ csv_writer.writerow(header)
+ for row in csv_reader:
+ for idx, grp in enumerate(url_groups):
+ if row[2] in grp:
+ row.append(str(idx))
+ longest_in_grp = max(grp, key=len)
+ row.append(longest_in_grp)
+ row.append(levendist(row[5], longest_in_grp))
+ most_frequent_in_grp = max(set(grp), key=grp.count)
+ row.append(str(most_frequent_in_grp))
+ row.append(levendist(row[5], most_frequent_in_grp))
+ csv_writer.writerow(row)