#!/usr/bin/env python3 from __future__ import annotations import pickle import random from collections import namedtuple from pprint import pprint import pandas as pd from psychopy import core, event, gui, visual import frensch_procedures DisplayVariable = namedtuple("DisplayVariable", ["name", "values"]) DisplayProcedure = namedtuple("DisplayProcedure", ["procedure", "solution"]) intro_text = """Vielen Dank dass Sie bei unserem Experiment zum menschlichen Lernen teilnehmen! In diesem Experiment arbeiten Sie in einem Labor, welches die Wasserqualität analysiert. Dafür bekommen Sie einige Wasserproben und müssen für jede Probe verschiedene Kennwerte ermitteln. Jede Wasserprobe besitzt bereits verschiedene gemessene Werte, wie der Algengehalt, welche für die Berechnungen benutzt werden. (Leertaste zum Fortfahren) """ intro2_text = """Im folgenden müssen sie verschiedene Rechenaufgaben lösen um die Kennwerte zu berechnen. Verwenden Sie die gewohnten Rechenregeln und geben sie Ihre Lösung bitte immer als zweistellige Zahl ein, und bestätigen mit Enter. Die Werte der Variablen (z.B. Algen) werden oben am Bildschirm angezeigt. Manche Variablen haben mehrere mögliche Werte; "Gifte_2" besagt z.B., dass der zweite Wert der Giftwerte zu verwenden ist. "_max/_min" besagt, dass der maximale/minimale Wert dieser Variable zu verwenden ist. Als letzte Berechnung müssen Sie den Gesamtwert der Wasserqualität aus ihren Ergebnissen berechnen. Nach jeder Aufgabe können Sie kurz pausieren. Drücken Sie die Leertaste um zu beginnen""" def experiment_shutdown(): WIN.close() core.quit() ORDER_CONDITIONS = ["fixed", "random", "blocked"] condition_dlg = gui.Dlg(title="Experiment Condition") condition_dlg.addText("Condition") condition_dlg.addField("condition") CONDITION = condition_dlg.show()[0] assert CONDITION in ORDER_CONDITIONS WIN = visual.Window((1920, 1080), fullscr=True, units="pix") MONITOR_FPS = 60 TRAIN_TRIALS = 75 TEST_TRIALS = 50 # TRAIN_TRIALS = 3 # TEST_TRIALS = 3 PROCEDURE_KEYS = ["1", "2", "3", "4", "5", "6", "overall"] # Cancel experiment anytime with Esc event.globalKeys.add(key="escape", func=experiment_shutdown, name="shutdown") def generate_variable_display(varx: list[DisplayVariable], x_positions: list[int]): assert len(varx) == len(x_positions) stims = [] def gen_value_stims(values, x, y, offset): for value in values: y -= offset value_stim = visual.TextBox2( WIN, pos=(x, y), text=value, # size=200, letterHeight=70, alignment="center", ) stims.append(value_stim) y = 500 offset = 100 for var, x_pos in zip(varx, x_positions): stim_var = visual.TextBox2( WIN, pos=[x_pos, y], text=var.name, # size=[1000, 1000], letterHeight=40, alignment="center", ) stims.append(stim_var) gen_value_stims(var.values, x_pos, y, offset) return stims def generate_procedure_display(procedure: DisplayProcedure, position): stim_procedure = visual.TextBox2( WIN, pos=position, text=procedure.procedure, size=[1000000, 1000], letterHeight=32, alignment="center", ) return stim_procedure def generate_all_watersamples(n): samples = [] for _ in range(n): samples.append(frensch_procedures.constrained_WaterSample()) return samples def run_blocked_trials(water_samples, procedure_keys): results = {} for proc_idx, proc in enumerate(procedure_keys): for sample_idx, sample in enumerate(water_samples): if sample_idx % 6 == 0: pause_after_trial.draw() WIN.flip() event.waitKeys(keyList=["space"]) cur_key = f"train_{sample_idx}" if not cur_key in results.keys(): results[cur_key] = {} results[cur_key]["procedure_order"] = tuple(procedure_keys) results[cur_key]["water_sample"] = sample.water_sample_dict() solid = DisplayVariable("Mineralien", [sample.solid]) algae = DisplayVariable("Algen", [sample.algae]) lime = DisplayVariable("Sandstein", sample.lime) toxin = DisplayVariable("Gifte", sample.toxin) x_positions = [-800, -400, 400, 800] stims = generate_variable_display([solid, algae, lime, toxin], x_positions) procedures = sample.procedure_dict() proc_x = -600 proc_y = -100 answ_x = 200 answ_y = -100 y_offset = 80 for prev in procedure_keys[:proc_idx]: if not prev: continue p = DisplayProcedure(procedures[prev][1], procedures[prev][0]) p = generate_procedure_display(p, (proc_x, proc_y)) stims.append(p) proc_y -= y_offset stim_answer_equals = visual.TextBox2( WIN, "=", letterHeight=50, pos=(answ_x - 100, answ_y), size=[150, 70], alignment="center", ) stims.append(stim_answer_equals) stim_answer_box = visual.TextBox2( WIN, results[f"train_{sample_idx}"][prev]["answer"], letterHeight=50, pos=(answ_x, answ_y), size=[150, 70], editable=True, fillColor="white", color="black", alignment="center", ) stims.append(stim_answer_box) answ_y -= y_offset p = DisplayProcedure(procedures[proc][1], procedures[proc][0]) p = generate_procedure_display(p, (proc_x, proc_y)) stims.append(p) proc_y -= y_offset stim_answer_equals = visual.TextBox2( WIN, "=", letterHeight=50, pos=(answ_x - 100, answ_y), size=[150, 70], alignment="center", ) stims.append(stim_answer_equals) print(sample_idx) stim_answer_box = visual.TextBox2( WIN, "", letterHeight=50, pos=(answ_x, answ_y), size=[150, 70], editable=True, fillColor="white", color="black", alignment="center", ) stims.append(stim_answer_box) answ_y -= y_offset not_finished = True answer = "not answered" start_time = core.monotonicClock.getTime() while not_finished: stim_answer_box.hasFocus = True for stim in stims: stim.draw() WIN.flip() answer = stim_answer_box.text if "\n" in answer: if answer[0].isdigit() and answer[1].isdigit(): not_finished = False else: stim_answer_box.text = answer[:-1] if len(answer) > 2: stim_answer_box.text = stim_answer_box.text[:2] answer_time = core.monotonicClock.getTime() - start_time answer = (answer.replace("\n", ""), answer_time) results[cur_key][proc] = {"answer": answer[0], "time": answer[1]} pause_new_proc.draw() WIN.flip() event.waitKeys(keyList=["space"]) return results def run_trial(water_sample, procedure_keys: list, condition): water_sample.print_all() if condition == "random": overall = procedure_keys.pop() random.shuffle(procedure_keys) procedure_keys.append(overall) solid = DisplayVariable("Mineralien", [water_sample.solid]) algae = DisplayVariable("Algen", [water_sample.algae]) lime = DisplayVariable("Sandstein", water_sample.lime) toxin = DisplayVariable("Gifte", water_sample.toxin) x_positions = [-800, -400, 400, 800] stims = generate_variable_display([solid, algae, lime, toxin], x_positions) procedures = water_sample.procedure_dict() answers = [] proc_x = -600 proc_y = -100 answ_x = 200 answ_y = -100 y_offset = 80 for proc in procedure_keys: p = DisplayProcedure(procedures[proc][1], procedures[proc][0]) p = generate_procedure_display(p, (proc_x, proc_y)) stims.append(p) proc_y -= y_offset stim_answer_equals = visual.TextBox2( WIN, "=", letterHeight=50, pos=(answ_x - 100, answ_y), size=[150, 70], alignment="center", ) stims.append(stim_answer_equals) stim_answer_box = visual.TextBox2( WIN, "", letterHeight=50, pos=(answ_x, answ_y), size=[150, 70], editable=True, fillColor="white", color="black", alignment="center", ) stims.append(stim_answer_box) answ_y -= y_offset not_finished = True answer = "not answered" start_time = core.monotonicClock.getTime() while not_finished: stim_answer_box.hasFocus = True for stim in stims: stim.draw() WIN.flip() answer = stim_answer_box.text if "\n" in answer: if answer[0].isdigit() and answer[1].isdigit(): not_finished = False else: stim_answer_box.text = answer[:-1] if len(answer) > 2: stim_answer_box.text = stim_answer_box.text[:2] answer_time = core.monotonicClock.getTime() - start_time answers.append((answer.replace("\n", ""), answer_time)) # event.waitKeys(keyList=["space"]) return tuple(answers), tuple(procedure_keys) pause = visual.TextBox2( WIN, """Drücken Sie die Leertaste um mit der nächsten Wasserprobe fortzufahren""", letterHeight=50, alignment="center", ) pause_new_proc = visual.TextBox2( WIN, """Drücken Sie die Leertaste um mit dem nächsten Kennwert fortzufahren""", letterHeight=50, alignment="center", ) pause_after_trial = visual.TextBox2( WIN, """Drücken Sie die Leertaste um fortzufahren""", letterHeight=50, alignment="center", ) intro = visual.TextBox2( WIN, intro_text, letterHeight=30, alignment="center", size=(100000, 100000) ) intro.draw() WIN.flip() event.waitKeys(keyList=["space"]) intro2 = visual.TextBox2( WIN, intro2_text, letterHeight=30, alignment="center", size=(100000, 100000) ) intro2.draw() WIN.flip() event.waitKeys(keyList=["space"]) train_procedures = PROCEDURE_KEYS[:-1] random.shuffle(train_procedures) transfer_procedure = train_procedures[-1] train_procedures = train_procedures[:-1] train_procedures.append(PROCEDURE_KEYS[-1]) all_samples = generate_all_watersamples(TRAIN_TRIALS + TEST_TRIALS) if CONDITION != "blocked": results = {} for i in range(TRAIN_TRIALS): print(train_procedures) answer, procedure_keys = run_trial(all_samples[i], train_procedures, CONDITION) answer_dict = {} answer_dict["procedure_order"] = procedure_keys answer_dict["water_sample"] = all_samples[i].water_sample_dict() for proc, key in zip(answer, procedure_keys): answer_dict[key] = {"answer": proc[0], "time": proc[1]} results[f"train_{i}"] = answer_dict pause.draw() WIN.flip() event.waitKeys(keyList=["space"]) else: results = run_blocked_trials(all_samples[:TRAIN_TRIALS], train_procedures) phase = visual.TextBox2( WIN, """Sie haben den ersten Teil geschafft! Der zweite Teil ist etwas kürzer als der erste Drücken Sie die Leertaste um anzufangen.""", letterHeight=40, alignment="center", ) phase.draw() WIN.flip() event.waitKeys(keyList=["space"]) train_procedures[2] = transfer_procedure for i in range(TEST_TRIALS): print(train_procedures) answer, procedure_keys = run_trial( all_samples[TRAIN_TRIALS + i], train_procedures, "fixed" ) print(procedure_keys) answer_dict = {} answer_dict["procedure_order"] = procedure_keys answer_dict["water_sample"] = all_samples[TRAIN_TRIALS + i].water_sample_dict() for proc, key in zip(answer, procedure_keys): answer_dict[key] = {"answer": proc[0], "time": proc[1]} results[f"test_{i}"] = answer_dict pause.draw() WIN.flip() event.waitKeys(keyList=["space"]) pprint(results) df = pd.DataFrame.from_dict(results, orient="index") df.to_csv("vp_results.csv") with open("vp.pkl", "wb") as file: pickle.dump(results, file) outro_text = "Das Experiment ist nun vorüber.\n\nVielen Dank für Ihre Teilnahme!" outro = visual.TextBox2( WIN, outro_text, letterHeight=40, alignment="center", size=(100000, 100000) ) outro.draw() WIN.flip() event.waitKeys(keyList=["space"])