diff options
Diffstat (limited to 'bjoern')
-rw-r--r-- | bjoern/videoanalyse/utils.py | 125 |
1 files changed, 125 insertions, 0 deletions
diff --git a/bjoern/videoanalyse/utils.py b/bjoern/videoanalyse/utils.py new file mode 100644 index 0000000..e060a89 --- /dev/null +++ b/bjoern/videoanalyse/utils.py @@ -0,0 +1,125 @@ +#!/usr/bin/env python3 + +import csv +from datetime import datetime, timedelta + +import numpy as np +import pandas as pd +from Levenshtein import distance as levendist +from sklearn.cluster import DBSCAN +from sklearn.metrics import pairwise_distances + + +def combine_ocr_logs(video_path, ocr_path, log_path): + date_format = "%Y-%m-%d %H-%M-%S" + video_date = datetime.strptime(video_path.stem, date_format) + print(video_date) + # video_delta = timedelta(hours=video_date.hour, minutes=video_date.minute, seconds=video_date.second) + + def add_video_time_to_start(x, video_date): + start = timedelta(seconds=int(round(x))) + return (start + video_date).time().isoformat() + + # analysis = pd.read_csv(vp_path / "analysis_results.csv") + analysis = pd.read_csv(ocr_path) + analysis["Starttime"] = analysis["start_time"].apply( + add_video_time_to_start, args=(video_date,) + ) + + # logs = pd.read_csv(vp_path / f"{vp_path.name}.csv") + logs = pd.read_csv(log_path) + + def get_log_url(start_time): + start_time = datetime.strptime(start_time, "%H:%M:%S") + + for _, row in logs.iterrows(): + log_start = datetime.strptime(row[0], "%H:%M:%S") + log_end = datetime.strptime(row[1], "%H:%M:%S") + if start_time >= log_start and start_time <= log_end: + return row[3] + return 0 + + analysis["log_url"] = analysis.apply(lambda row: get_log_url(row.Starttime), axis=1) + + # analysis.to_csv(vp_path / "merged.csv", quoting=csv.QUOTE_NONNUMERIC) + return analysis + + +def calc_levenshtein_distance(df): + df["levenshtein-distance"] = df.apply( + lambda row: levendist(str(row.url), str(row.log_url)), axis=1 + ) + return df + + +def group_urls(urls): + unique_urls = np.unique(urls) + + # TODO: casting deprecation np + def levenshtein_from_idx(idx1, idx2): + return levendist(unique_urls[int(idx1)], unique_urls[int(idx2)]) + + X = np.searchsorted(unique_urls, list([[x] for x in urls])) + + distance_matrix = pairwise_distances( + X=X, Y=None, metric=levenshtein_from_idx, n_jobs=-1 + ) + # TODO: eps and min_samples parameter + db = DBSCAN(eps=10, min_samples=5, metric="precomputed").fit(distance_matrix) + labels = db.labels_ + zipped = zip(urls, labels) + + # grouping solution from: https://www.geeksforgeeks.org/python-group-tuple-into-list-based-on-value/ + # create an empty dictionary to store the grouped tuples + grouped_dict = {} + + # loop through the tuples in the list + for tup in zipped: + # get the second element of the tuple + key = tup[1] + # if the key is not already in the dictionary, add it with an empty list as value + if key not in grouped_dict: + grouped_dict[key] = [] + # append the current tuple to the list corresponding to the key in the dictionary + grouped_dict[key].append(tup[0]) + + # convert the dictionary values to lists and store in res + url_groups = [v for _, v in grouped_dict.items()] + + return url_groups + + +# TODO: use df instead of csv reader +# TODO: return df instead of writing to file +def write_grouped_metrics(df, url_groups, data_path): + # # for every row check which group its url belongs to and add a column with group indices + # # also add columns with longest/most frequent url in group + with open(data_path / "metrics.csv", "r") as input_file, open( + data_path / "metrics_grps.csv", "w", newline="" + ) as output_file: + csv_reader = csv.reader(input_file, quotechar='"') + csv_writer = csv.writer( + output_file, quotechar='"', quoting=csv.QUOTE_NONNUMERIC + ) + header = next(csv_reader) + header.extend( + [ + "group_index", + "longest", + "longest-distance", + "most_frequent", + "most_frequent-distance", + ] + ) + csv_writer.writerow(header) + for row in csv_reader: + for idx, grp in enumerate(url_groups): + if row[2] in grp: + row.append(str(idx)) + longest_in_grp = max(grp, key=len) + row.append(longest_in_grp) + row.append(levendist(row[5], longest_in_grp)) + most_frequent_in_grp = max(set(grp), key=grp.count) + row.append(str(most_frequent_in_grp)) + row.append(levendist(row[5], most_frequent_in_grp)) + csv_writer.writerow(row) |