2015年8月24日月曜日

【python3】kivyで心理学実験 - Attentional Blink【kivy】

有料のMATLABになんとなく依存したくないなあと思い最近pythonに手を出してるんですが、半月のめり込みやっとこさそれっぽいプログラムが書けるようになりました。

といってもお馴染みpsychopyではないです(あれは書籍いっぱいあるしそのうち覚えればいいかなぁ)。
普通に考えればpsychopy使えばいいんですが、新しいもの好きが仇と(?)なって、kivyというマルチタッチアプリ制作向けのライブラリを使うことにしてみました。
(本当はpygameも使ってたんですが、なんかpsychtoolに似てて代わり映えないなぁ、と思って一通りいじってから止めちゃいました)

ということでベーシックなAttentional Blinkの実験を。
実験条件はラグのみ。ターゲットは数字。妨害刺激はアルファベット。

今回使ったファイル置き場はこちら

↓実験の様子(T1は何?T2は何?に対してキーボードで回答)



↓出力された結果

nPreT1は、T1が呈示されるまでにいくつの妨害刺激が呈示されたか。



<settings.py>

# -*- coding: utf-8 -*-

# subject name
subj = 's1'

# stimuli list
tstim = (0, 2, 3, 4, 5, 6, 7, 8, 9)
dstim = ('A', 'B', 'C', 'D', 'E', 'F', 'G', 'H', 'J', 'K', 'L', 'N', 'P', 'R', 'T', 'U', 'V', 'X', 'Y', 'Z')
resp_keys = {'numpad0': 0, 'numpad1': 1, 'numpad2': 2, 'numpad3': 3, 'numpad4': 4,
    'numpad5': 5, 'numpad6': 6, 'numpad7': 7, 'numpad8': 8, 'numpad9': 9}

# settings of presentation
update_rate = 1/60
soa = update_rate*6
isi = update_rate*2

npret1 = list(range(3,7)) # distractor before T1
stream_length = 20

# experiment's condition
nrep = 3
lag = (1,2,3,4,5) # T2's delay from T1

tstimがターゲット刺激で、dstimが妨害刺激。
resp_keysの'numpad0'ってのはテンキーのキーコードの名前。
update_rateは画面の更新頻度。
npret1はT1呈示前にいくら刺激が提示されるかについての設定。

<attentionalblinksample.kv>

#: kivy 1.9.0

<BackGround>:
    canvas:
        Color:
            rgb: (.5, .5, .5)
        Rectangle:
            size: self.size

<FixationCross>:
    canvas:
        Color:
            rgba: [0, 0, 0, self.transparency]
        Rectangle
            pos: self.center_x - 1, self.center_y - 15
            size: 2,30
        Rectangle:
            pos: self.center_x - 15, self.center_y - 1
            size: 30,2

<StimuliStream>:
    Label:
        text: self.parent.stimulus
        color: [0, 0, 0, self.parent.transparency]
        font_size: 50
        center: self.parent.center

<MessageLabel>:
    Label:
        text: self.parent.message
        color: [0, 0, 0, self.parent.transparency]
        font_size: 30
        center: self.parent.center

<Root>:
    bg: bgval
    fc: fcval
    ss: ssval
    ml: mlval

    BackGround:
        id: bgval

    FixationCross:
        id: fcval

    StimuliStream:
        id: ssval

    MessageLabel:
        id: mlval

maiy.pyに書かれたtransparency(透明度)を後々操作することによって、各ウィジェットの表示・非表示を切り替えていく。
transparencyを0にするとそのウィジェットは透明になる。

<main.py>

# -*- coding: utf-8 -*-

import kivy
kivy.require('1.9.0')

from kivy.app import App
from kivy.clock import Clock
from kivy.core.window import Window
from kivy.properties import NumericProperty, StringProperty, ObjectProperty
from kivy.uix.floatlayout import FloatLayout
from kivy.uix.label import Label
from kivy.uix.widget import Widget
import sys
import numpy as np
import pandas as pd
import settings as sf

# preparation before experiment
ntrial = sf.nrep * len(sf.lag)
lags = np.tile(sf.lag,sf.nrep)
np.random.shuffle(lags)

# setting Widget
class BackGround(Widget):
    pass

class FixationCross(Widget):
    transparency = NumericProperty(0)
    pass

class StimuliStream(Widget):
    stimulus = StringProperty('')
    transparency = NumericProperty(0)

    def make_stream(self):
        self.stream = np.random.choice(sf.dstim, sf.stream_length)
        for i in np.arange(self.stream.size)[1:]:
            while self.stream[i] == self.stream[i-1]:
                self.stream[i] = np.random.choice(sf.dstim)

class MessageLabel(Widget):
    messages = ('Ready?', 'What is T1?', 'What is T2?', 'Thank you!')
    message = StringProperty(messages[0])
    transparency = NumericProperty(1)

class Root(FloatLayout):
    bg = ObjectProperty(None)
    fc = ObjectProperty(None)
    ss = ObjectProperty(None)
    ml = ObjectProperty(None)

    timer = 0
    trial_counter = 0
    states = 'PREPARATION'
    lag = np.array([])
    npret1 = np.array([])
    t1 = np.array([])
    t2 = np.array([])
    resp1 = np.array([])
    resp2 = np.array([])

    def __init__(self, **kwargs):
        super(Root, self).__init__(**kwargs)
        self._keyboard = Window.request_keyboard(self._keyboard_closed, self)
        self._keyboard.bind(on_key_down=self._on_keyboard_down)

    def _keyboard_closed(self):
        print('My keyboard have been closed!')
        self._keyboard.unbind(on_key_down=self._on_keyboard_down)
        self._keyboard = None

    def _on_keyboard_down(self, keyboard, keycode, text, modifiers):
        if keycode[1] == 'escape':
            sys.exit()

        elif self.states == 'RESPONSE1_WAITING':
            if keycode[1] in sf.resp_keys.keys():
                self.resp1 = np.append(self.resp1, sf.resp_keys[keycode[1]])
                self.ml.message = self.ml.messages[2]
                self.states = 'RESPONSE2_WAITING'

        elif self.states == 'RESPONSE2_WAITING':
            if keycode[1] in sf.resp_keys.keys():
                self.resp2 = np.append(self.resp2, sf.resp_keys[keycode[1]])
                self.ml.message = self.ml.messages[0]
                self.states = 'PREPARATION'
                self.trial_counter += 1
                if self.trial_counter == ntrial:
                    self.ml.message = self.ml.messages[3]
                    self.csv_export()
                    self.states = 'FINISHED'

        elif self.states == 'SPACE_KEY_WAITING':
            if keycode[1] == 'spacebar':
                self.timer = 0
                self.setting_center_of_display(1)
                self.setting_value()
                self.states = 'BEFORE_TRIAL'

        elif self.states == 'FINISHED':
            if keycode[1] == 'spacebar':
                sys.exit()

    def setting_value(self):
        self.lag = np.append(self.lag, lags[self.trial_counter])
        self.npret1 = np.append(self.npret1, np.random.choice(sf.npret1))
        self.t1 = np.append(self.t1, np.random.choice(sf.tstim))
        self.t2 = np.append(self.t2, np.random.choice(sf.tstim))
        while self.t1[self.trial_counter] == self.t2[self.trial_counter]:
            self.t2[self.trial_counter] = np.random.choice(sf.tstim)

    def setting_center_of_display(self, index):
        if index == 0:
            self.fc.transparency = 0
            self.ss.transparency = 0
            self.ml.transparency = 0
        elif index == 1:
            self.fc.transparency = 1
            self.ss.transparency = 0
            self.ml.transparency = 0
        elif index == 2:
            self.fc.transparency = 0
            self.ss.transparency = 1
            self.ml.transparency = 0
        elif index == 3:
            self.fc.transparency = 0
            self.ss.transparency = 0
            self.ml.transparency = 1

    def csv_export(self):
        data = {'subj': np.tile(sf.subj,ntrial),
                'lag': self.lag,
                'nPreT1': self.npret1,
                'T1': self.t1,
                'T2': self.t2,
                'resp1': self.resp1,
                'resp2': self.resp2}
        frame = pd.DataFrame(data,
                columns = ['subj', 'lag', 'nPreT1', 'T1', 'T2', 'resp1', 'resp2'])
        frame.to_csv('result.csv', index = False)

    def update(self, dt):
        self.timer = self.timer + dt
        if self.states == 'PREPARATION':
            self.stream_counter = 0
            self.ss.make_stream()
            self.ss.stimulus = self.ss.stream[0]
            self.states = 'SPACE_KEY_WAITING'

        elif self.states == 'BEFORE_TRIAL':
            if self.timer >= 0.5:
                self.timer = 0
                self.setting_center_of_display(2)
                self.states = 'PRESENTING'
            elif self.timer >= 0.5 - sf.isi:
                self.setting_center_of_display(0)

        elif self.states == 'PRESENTING':
            if self.stream_counter == sf.stream_length -1:
                if self.timer >= sf.soa + 0.5:
                    self.setting_center_of_display(3)
                    self.ml.message = self.ml.messages[1]
                    self.states = 'RESPONSE1_WAITING'
                elif self.timer >= sf.soa - sf.isi:
                    self.setting_center_of_display(0)
            else:
                if self.timer >= sf.soa:
                    self.stream_counter += 1
                    if self.stream_counter == self.npret1[self.trial_counter] + 1:
                        self.ss.stimulus = np.str(np.int(self.t1[self.trial_counter]))
                    elif self.stream_counter == self.npret1[self.trial_counter] + self.lag[self.trial_counter] + 1:
                        self.ss.stimulus = np.str(np.int(self.t2[self.trial_counter]))
                    else:
                        self.ss.stimulus = self.ss.stream[self.stream_counter]
                    self.timer = 0
                    self.setting_center_of_display(2)
                elif self.timer >= sf.soa - sf.isi:
                    self.setting_center_of_display(0)

class AttentionalBlinkSampleApp(App):

    def build(self):
        self.root = Root()
        Clock.schedule_interval(self.root.update, sf.update_rate)
        return self.root

if __name__ == '__main__':
    AttentionalBlinkSampleApp().run()


試行が規定の回数(ラグ水準 × 繰り返し数)実行されると、その結果が同一フォルダ内にresult.csvとして保存される。
pandasを使うとこのcsvへの出力が楽でいい感じ。というかpandasがまんまRで笑ってしまった。

0 件のコメント:

コメントを投稿