zapsaber/datapuller.py

109 lines
3.0 KiB
Python
Executable File

import asyncio
import enum
import json
import logging
import websockets
logger = logging.getLogger(__name__)
class BaseData:
"""
BaseData listens to a websocket specified by one of its implementations.
"""
def __init__(self, host: str = '127.0.0.1', port: int = 2946,
path: str = '/BSDataPuller'):
self.host = host
self.port = port
self.path = path
async def listen(self):
"""
listen opens the websocket connection the appropriate endpoint and reacts to
incoming frames.
"""
try:
async with websockets.connect(
'ws://{}:{}{}'.format(self.host, self.port, self.path)) as websocket:
async for message in websocket:
logger.debug(message)
if message:
self._handle(json.loads(message))
except Exception as e:
logger.critical(e)
raise e
class LiveData(BaseData):
"""
LiveData listens to the LiveData BSDataPuller endpoint and fires events based on
incoming frames.
"""
TRIGGERS = enum.Enum('Triggers',
['Unknown', 'TimerElapsed', 'NoteMissed', 'EnergyChange', 'ScoreChange'])
TRIGGERS_INVERSE = [
TRIGGERS.Unknown,
TRIGGERS.TimerElapsed,
TRIGGERS.NoteMissed,
TRIGGERS.EnergyChange,
TRIGGERS.ScoreChange,
]
def __init__(self, host: str = '127.0.0.1', port: int = 2946,
path: str = '/BSDataPuller/LiveData'):
super().__init__(host, port, path)
self.events = {}
def on(self, event: str, function):
"""
on attaches a function to a trigger. When a frame related to that trigger is
received, all functions attached will be executed with the frame as an
argument.
"""
if event not in self.events:
self.events[event] = []
self.events[event].append(function)
def _handle(self, message: str):
"""
_handle determines what kind of message has been received and fires the
appropriate events.
"""
invert = self.TRIGGERS_INVERSE[message['EventTrigger']].name
if invert in self.events:
for event in self.events[invert]:
event(message)
class MapData(BaseData):
"""
MapData listens to the MapData BSDataPuller endpoint and fires events on update.
There are no specific triggers for this endpoint, so only one function is allowed
as an event.
"""
def __init__(self, host: str = '127.0.0.1', port: int = 2946,
path: str = '/BSDataPuller/MapData'):
super().__init__(host, port, path)
def on_update(self, function):
"""
on_update sets the function to be called when there's a map update.
"""
self.function = function
def _handle(self, message: str):
"""
_handle calls the attached function when a frame is received.
"""
self.function(message)