First working branch

This commit is contained in:
Madison Scott-Clary 2022-11-24 15:07:25 -08:00
parent b939420c65
commit ab8f7df848
7 changed files with 74 additions and 71 deletions

0
.gitignore vendored Normal file → Executable file
View File

12
README.md Normal file → Executable file
View File

@ -15,14 +15,14 @@ Configuration is done through a JSON file named `config.json` in the same direct
"code": "<your shocker's code>",
"comboA": 100,
"comboB": 300,
"comboABreak": ["vibrate", 20, 1.0],
"comboBBreak": ["shock": 5, 1.0],
"fail": ["shock", 10, 2.0]
"comboABreak": ["vibrate", 1.0, 20],
"comboBBreak": ["shock", 1.0, 5],
"fail": ["shock", 2.0, 10]
}
```
The values for combo breaks and fail is an array.
1. `vibrate`, `shock`, and `beep`.
2. Intensity
3. Duration
1. `vibrate`, `shock`, and `beep` (if action is `beep`, intensity is ignored)
2. Duration
3. Intensity

16
datapuller.py Normal file → Executable file
View File

@ -21,14 +21,24 @@ class BaseData:
async with websockets.connect(
'ws://{}:{}{}'.format(self.host, self.port, self.path)) as websocket:
async for message in websocket:
await self._handle(message)
logger.debug(message)
if message:
self._handle(json.loads(message))
except Exception as e:
logger.critical(e)
raise e
class LiveData(BaseData):
TRIGGERS = enum.Enum('Triggers',
['Unknown', 'TimerElapsed', 'NoteMissed', 'EnergyChange', 'ScoreChange'])
TRIGGERS_INVERSE = [
TRIGGERS.Unknown,
TRIGGERS.TimerElapsed,
TRIGGERS.NoteMissed,
TRIGGERS.EnergyChange,
TRIGGERS.ScoreChange,
]
def __init__(self, host: str = '127.0.0.1', port: int = 2946,
path: str = '/BSDataPuller/LiveData'):
@ -41,7 +51,9 @@ class LiveData(BaseData):
self.events[event].append(function)
def _handle(self, message: str):
for event in events[TRIGGERS(message['EventTrigger']).name]:
invert = self.TRIGGERS_INVERSE[message['EventTrigger']].name
if invert in self.events:
for event in self.events[invert]:
event(message)

4
poetry.lock generated Normal file → Executable file
View File

@ -66,8 +66,8 @@ python-versions = ">=3.7"
[metadata]
lock-version = "1.1"
python-versions = "^3.10"
content-hash = "5a734eed883538e1b3f3ec6d75d33bfcf056ecd3a28311b0e975c847bb268ce2"
python-versions = "^3.9"
content-hash = "a1bceaf7911989ee719f409e1eb231de0d8e0d8ea8faebf994892b0f12f651cd"
[metadata.files]
certifi = [

2
pyproject.toml Normal file → Executable file
View File

@ -7,7 +7,7 @@ license = "MIT"
readme = "README.md"
[tool.poetry.dependencies]
python = "^3.10"
python = "^3.9"
requests = "^2.28.1"
[tool.poetry.group.dev.dependencies]

13
pyshock.py Normal file → Executable file
View File

@ -1,7 +1,6 @@
from enum import Enum
import json
import logging
from threading import Timer
import requests
@ -15,16 +14,13 @@ class PiShock:
ACTIONS = Enum('ACTIONS', ['shock', 'vibrate', 'beep'])
def __init__(self, username: str, api_key: str, code: str, name: str = '',
max_intensity: int = 50, max_duration: int = 5, throttle: int = 5):
max_intensity: int = 50, max_duration: int = 5):
self.username = username
self.api_key = api_key
self.code = code
self.name = name if name else self.NAME
self.max_intensity = max_intensity
self.max_duration = max_duration
self.throttle = throttle
self.throttled = False
self.throttle_timer = None
def shock(self, duration: int = 1, intensity: int = 5):
self._act(self.ACTIONS.shock, duration, intensity)
@ -42,9 +38,6 @@ class PiShock:
logger.warning('Action not found: {}', e)
def _act(self, op: str, duration: int = 1, intensity: int = 5):
if op == self.ACTIONS.shock and self.throttled:
logger.error('Request throttled')
return
if duration > self.max_duration:
logger.error('%d is greater than maximum allowed duration, %d',
duration, self.max_duration)
@ -71,7 +64,3 @@ class PiShock:
logger.debug(res.text)
if res.status_code != 200 or res.text != 'Operation Succeeded.':
logger.warning('Error communicating with shocker: %s', res.text)
if op == self.ACTIONS.shock:
self.throttled = True
self.throttle_timer = Timer(self.throttle, lambda: self.throttled == False)

96
zapsaber.py Normal file → Executable file
View File

@ -10,72 +10,75 @@ from datapuller import (
logging.basicConfig(level=logging.INFO)
playing = False
combo_level = None
shocker = None
failed = False
events = None
logger = logging.getLogger(__name__)
class Events:
combo_A = 100
combo_B = 300
combo_A_break = ['vibrate', 20, 0.5]
combo_B_break = ['shock', 5, 0.3]
combo_A_break = ['vibrate', 0.5, 20]
combo_B_break = ['shock', 0.3, 5]
fail = ['shock', 10, 1]
fail = ['shock', 1, 10]
def __init__(self, config):
playing = False
combo_level = None
map_failed = False
def __init__(self, config, shocker):
self.combo_A = config['comboA']
self.combo_B = config['comboB']
self.combo_A_break = config['comboABreak']
self.combo_B_break = config['comboBBreak']
self.fail = config['fail']
self.shocker = shocker
def update_combo(message):
def update_combo(self, message):
combo = message['Combo']
if combo > events.combo_A:
combo_level = 'A'
if combo > events.combo_B:
combo_level = 'B'
def break_combo(message):
if combo_level == 'A':
logger.info('Broke combo A!')
shocker.act(*events.combo_A_break)
if combo_level == 'B':
logger.info('Broke combo B!')
shocker.act(*events.combo_B_break)
combo_level = None
def map_update(message):
if map_failed and not message['LevelFailed']:
map_failed = false
elif not map_failed and message['LevelFailed']:
health = message['PlayerHealth']
if not self.map_failed and health == 0:
logger.info('Failed the map!')
shocker.act(*events.fail)
map_failed = true
combo_level = None
self.shocker.act(*self.fail)
self.map_failed = True
self.combo_level = None
if combo > self.combo_A:
self.combo_level = 'A'
if combo > self.combo_B:
self.combo_level = 'B'
def break_combo(self, message):
if self.combo_level == 'A':
logger.info('Broke combo A!')
self.shocker.act(*self.combo_A_break)
if self.combo_level == 'B':
logger.info('Broke combo B!')
self.shocker.act(*self.combo_B_break)
self.combo_level = None
def tick(self, message):
logger.info('Tick')
if self.playing and message['TimeElapsed'] == 0:
logger.info('Stopping map...')
self.map_failed = False
self.combo = 0
self.playing = False
elif not self.playing and message['TimeElapsed'] != 0:
logger.info('Starting map...')
self.playing = True
async def main(username, api_key, code):
logging.info('Initializing...')
shocker = PiShock(username, api_key, code, name='ZapSaber')
shocker.beep()
async def main(config):
logger.info('Initializing...')
shocker = PiShock(config['username'], config['apiKey'], config['code'], name='ZapSaber')
#shocker.beep()
events = Events(config, shocker)
livedata = LiveData()
mapdata = MapData()
livedata.on('ScoreChange', update_combo)
livedata.on('NoteMissed', break_combo)
mapdata.on_update(map_update)
livedata.on('ScoreChange', events.update_combo)
livedata.on('NoteMissed', events.break_combo)
livedata.on('TimerElapsed', events.tick)
await livedata.listen()
await mapdata.listen()
@ -85,5 +88,4 @@ async def main(username, api_key, code):
if __name__ == '__main__':
with open('config.json') as config_file:
config = json.load(config_file)
events = Events(config)
asyncio.run(main(config['username'], config['apiKey'], config['code']))
asyncio.run(main(config))