import asyncio import json import logging from pyshock import PiShock from datapuller import ( LiveData, MapData, ) logging.basicConfig(level=logging.INFO) playing = False combo_level = None shocker = None failed = False events = None class Events: combo_A = 100 combo_B = 300 combo_A_break = ['vibrate', 20, 0.5] combo_B_break = ['shock', 5, 0.3] fail = ['shock', 10, 1] def __init__(self, config): self.combo_A = config['comboA'] self.combo_B = config['comboB'] self.combo_A_break = config['comboABreak'] self.combo_B_break = config['comboBBreak'] self.fail = config['fail'] def update_combo(message): combo = message['Combo'] if combo > events.combo_A: combo_level = 'A' if combo > events.combo_B: combo_level = 'B' def break_combo(message): if combo_level == 'A': logger.info('Broke combo A!') shocker.act(*events.combo_A_break) if combo_level == 'B': logger.info('Broke combo B!') shocker.act(*events.combo_B_break) combo_level = None def map_update(message): if map_failed and not message['LevelFailed']: map_failed = false elif not map_failed and message['LevelFailed']: logger.info('Failed the map!') shocker.act(*events.fail) map_failed = true combo_level = None async def main(username, api_key, code): logging.info('Initializing...') shocker = PiShock(username, api_key, code, name='ZapSaber') shocker.beep() livedata = LiveData() mapdata = MapData() livedata.on('ScoreChange', update_combo) livedata.on('NoteMissed', break_combo) mapdata.on_update(map_update) await livedata.listen() await mapdata.listen() shocker.vibrate() if __name__ == '__main__': with open('config.json') as config_file: config = json.load(config_file) events = Events(config) asyncio.run(main(config['username'], config['apiKey'], config['code']))