summaryrefslogtreecommitdiff
path: root/bjoern/videoanalyse/post_processing.py
blob: 94ce067c1add7365b56c617fbfeb7a160a24c6a3 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
#!/usr/bin/env python3

import argparse
from pathlib import Path
import numpy as np
import pandas as pd
import Levenshtein
import csv
from itertools import groupby
from operator import itemgetter
from sklearn.metrics import pairwise_distances
from sklearn.cluster import DBSCAN
from pprint import pprint

argparser = argparse.ArgumentParser(description="Distance evaluation")
argparser.add_argument("vp_dir", help="Directory containing metrics.csv")

args = argparser.parse_args()

data_path = Path(args.vp_dir)

# Read results.csv
# with open(data_path / "metrics.csv", "r") as csvfile:
#     reader = csv.reader(csvfile, quotechar='"')
#     print(next(reader))
#


df = pd.read_csv(data_path / "metrics.csv")
df = df.fillna("")


# List with only urls
all_urls = list(df["url"].values)
urls = list(df["url"].values)


def group_urls(urls):
    unique_urls = np.unique(urls)

    # TODO: casting deprecation np
    def levenshtein_from_idx(idx1, idx2):
        return Levenshtein.distance(unique_urls[int(idx1)], unique_urls[int(idx2)])

    X = np.searchsorted(unique_urls, list([[x] for x in urls]))

    distance_matrix = pairwise_distances(
        X=X, Y=None, metric=levenshtein_from_idx, n_jobs=-1
    )
    # TODO: eps and min_samples parameter
    db = DBSCAN(eps=10, min_samples=5, metric="precomputed").fit(distance_matrix)
    labels = db.labels_
    zipped = zip(urls, labels)

    # grouping solution from: https://www.geeksforgeeks.org/python-group-tuple-into-list-based-on-value/
    # create an empty dictionary to store the grouped tuples
    grouped_dict = {}

    # loop through the tuples in the list
    for tup in zipped:
        # get the second element of the tuple
        key = tup[1]
        # if the key is not already in the dictionary, add it with an empty list as value
        if key not in grouped_dict:
            grouped_dict[key] = []
        # append the current tuple to the list corresponding to the key in the dictionary
        grouped_dict[key].append(tup[0])

    # convert the dictionary values to lists and store in res
    url_groups = [v for _, v in grouped_dict.items()]

    return url_groups


url_groups = group_urls(urls)
pprint(len(url_groups))

# # for every row check which group its url belongs to and add a column with group indices
# # also add columns with longest/most frequent url in group
with open(data_path / "metrics.csv", "r") as input_file, open(
    data_path / "metrics_grps.csv", "w", newline=""
) as output_file:
    csv_reader = csv.reader(input_file, quotechar='"', quoting=csv.QUOTE_NONNUMERIC)
    csv_writer = csv.writer(output_file, quotechar='"', quoting=csv.QUOTE_NONNUMERIC)
    header = next(csv_reader)
    header.extend(
        [
            "group_index",
            "longest",
            "longest-distance",
            "most_frequent",
            "most_frequent-distance",
        ]
    )
    csv_writer.writerow(header)
    for row in csv_reader:
        for idx, grp in enumerate(url_groups):
            if row[3] in grp:
                row.append(idx)
                longest_in_grp = max(grp, key=len)
                row.append(longest_in_grp)
                row.append(Levenshtein.distance(row[6], longest_in_grp))
                most_frequent_in_grp = max(set(grp), key=grp.count)
                row.append(most_frequent_in_grp)
                row.append(Levenshtein.distance(row[6], most_frequent_in_grp))
        csv_writer.writerow(row)