Hi I am trying to find a way to get my bluetooth sensor(Witmotion WT9011DCL 9-axis BLE Magnetometer Gyroscope) to communiate with rerxygen so I can stream the data into rexygen. I have been able to connect the sensor with my raspberry Pi 4 monacrohat. I then tried to stream the data into rexygen but have been unable to do so. I have attached the my program below. b20b7866-14b5-4b69-9ec5-b5498db19a0b-image.png
# coding:UTF-8
import asyncio
import bleak
from PyRexExt import REX
class DeviceModel:
def __init__(self, deviceName, BLEDevice, callback_method):
self.deviceName = deviceName
self.BLEDevice = BLEDevice
self.client = None
self.writer_characteristic = None
self.isOpen = False
self.callback_method = callback_method
self.TempBytes = []
async def openDevice(self):
try:
async with bleak.BleakClient(self.BLEDevice, timeout=15) as client:
self.client = client
self.isOpen = True
print("Device connected.")
REX.y9.v = 1 # Connected
target_service_uuid = "0000ffe5-0000-1000-8000-00805f9a34fb"
target_characteristic_uuid_read = "0000ffe4-0000-1000-8000-00805f9a34fb"
target_characteristic_uuid_write = "0000ffe9-0000-1000-8000-00805f9a34fb"
notify_characteristic = None
for service in client.services:
if service.uuid == target_service_uuid:
for char in service.characteristics:
if char.uuid == target_characteristic_uuid_read:
notify_characteristic = char
if char.uuid == target_characteristic_uuid_write:
self.writer_characteristic = char
if self.writer_characteristic:
await self.sendData([0xFF, 0xAA, 0x24, 0x01, 0x00]) # 6-axis mode
await self.sendData([0xFF, 0xAA, 0x03, 0x09, 0x00]) # 100 Hz
if notify_characteristic:
await client.start_notify(notify_characteristic.uuid, self.onDataReceived)
while self.isOpen:
await asyncio.sleep(1)
await client.stop_notify(notify_characteristic.uuid)
else:
print("Notify characteristic not found.")
REX.y9.v = 2 # Failed
except Exception as e:
print("BLE connection failed:", e)
REX.y9.v = 2 # Failed
def onDataReceived(self, sender, data):
bytes_list = list(data)
self.TempBytes.extend(bytes_list)
if len(self.TempBytes) >= 20:
if self.TempBytes[0] == 0x55 and self.TempBytes[1] == 0x61:
self.processData(self.TempBytes[:20])
self.TempBytes = []
def processData(self, Bytes):
def get_val(low, high, scale):
raw = (high << 8) | low
val = raw - 65536 if raw >= 32768 else raw
return round(val / 32768 * scale, 3)
acc_x = get_val(Bytes[2], Bytes[3], 16)
acc_y = get_val(Bytes[4], Bytes[5], 16)
acc_z = get_val(Bytes[6], Bytes[7], 16)
gyro_x = get_val(Bytes[8], Bytes[9], 2000)
gyro_y = get_val(Bytes[10], Bytes[11], 2000)
gyro_z = get_val(Bytes[12], Bytes[13], 2000)
ang_x = get_val(Bytes[14], Bytes[15], 180)
ang_y = get_val(Bytes[16], Bytes[17], 180)
ang_z = get_val(Bytes[18], Bytes[19], 180)
# Send to REXYGEN outputs
REX.y0.v = acc_x
REX.y1.v = acc_y
REX.y2.v = acc_z
REX.y3.v = gyro_x
REX.y4.v = gyro_y
REX.y5.v = gyro_z
REX.y6.v = ang_x
REX.y7.v = ang_y
REX.y8.v = ang_z
print("Data:", acc_x, acc_y, acc_z, gyro_x, gyro_y, gyro_z, ang_x, ang_y, ang_z)
async def sendData(self, data):
try:
if self.client and self.writer_characteristic:
await self.client.write_gatt_char(self.writer_characteristic.uuid, bytes(data))
except Exception as e:
print("Write failed:", e)
class BLEScanner:
def __init__(self, target_mac="CC:0E:4F:3B:A4:3B"):
self.target_mac = target_mac
self.BLEDevice = None
self.device_model = None
async def scan_and_connect(self):
try:
print("Scanning for BLE devices...")
devices = await bleak.BleakScanner.discover(timeout=10.0)
for d in devices:
if d.address == self.target_mac:
self.BLEDevice = d
break
if not self.BLEDevice:
print("Target device not found.")
REX.y9.v = 2
return
self.device_model = DeviceModel("MyBLE", self.BLEDevice, None)
await self.device_model.openDevice()
except Exception as e:
print("Scan or connect error:", e)
REX.y9.v = 2
def run_ble_task():
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
scanner = BLEScanner()
loop.run_until_complete(scanner.scan_and_connect())
loop.close()
# Main entry point for REXYGEN
if __name__ == "__main__":
print("Starting REXYGEN BLE program")
REX.y9.v = 2 # Default = failed
run_ble_task()