Bluetooth sensor with rexygen
-
Hi I am trying to find a way to get my bluetooth sensor(Witmotion WT9011DCL 9-axis BLE Magnetometer Gyroscope) to communiate with rerxygen so I can stream the data into rexygen. I have been able to connect the sensor with my raspberry Pi 4 monacrohat. I then tried to stream the data into rexygen but have been unable to do so. I have attached the my program below.
# coding:UTF-8 import asyncio import bleak from PyRexExt import REX class DeviceModel: def __init__(self, deviceName, BLEDevice, callback_method): self.deviceName = deviceName self.BLEDevice = BLEDevice self.client = None self.writer_characteristic = None self.isOpen = False self.callback_method = callback_method self.TempBytes = [] async def openDevice(self): try: async with bleak.BleakClient(self.BLEDevice, timeout=15) as client: self.client = client self.isOpen = True print("Device connected.") REX.y9.v = 1 # Connected target_service_uuid = "0000ffe5-0000-1000-8000-00805f9a34fb" target_characteristic_uuid_read = "0000ffe4-0000-1000-8000-00805f9a34fb" target_characteristic_uuid_write = "0000ffe9-0000-1000-8000-00805f9a34fb" notify_characteristic = None for service in client.services: if service.uuid == target_service_uuid: for char in service.characteristics: if char.uuid == target_characteristic_uuid_read: notify_characteristic = char if char.uuid == target_characteristic_uuid_write: self.writer_characteristic = char if self.writer_characteristic: await self.sendData([0xFF, 0xAA, 0x24, 0x01, 0x00]) # 6-axis mode await self.sendData([0xFF, 0xAA, 0x03, 0x09, 0x00]) # 100 Hz if notify_characteristic: await client.start_notify(notify_characteristic.uuid, self.onDataReceived) while self.isOpen: await asyncio.sleep(1) await client.stop_notify(notify_characteristic.uuid) else: print("Notify characteristic not found.") REX.y9.v = 2 # Failed except Exception as e: print("BLE connection failed:", e) REX.y9.v = 2 # Failed def onDataReceived(self, sender, data): bytes_list = list(data) self.TempBytes.extend(bytes_list) if len(self.TempBytes) >= 20: if self.TempBytes[0] == 0x55 and self.TempBytes[1] == 0x61: self.processData(self.TempBytes[:20]) self.TempBytes = [] def processData(self, Bytes): def get_val(low, high, scale): raw = (high << 8) | low val = raw - 65536 if raw >= 32768 else raw return round(val / 32768 * scale, 3) acc_x = get_val(Bytes[2], Bytes[3], 16) acc_y = get_val(Bytes[4], Bytes[5], 16) acc_z = get_val(Bytes[6], Bytes[7], 16) gyro_x = get_val(Bytes[8], Bytes[9], 2000) gyro_y = get_val(Bytes[10], Bytes[11], 2000) gyro_z = get_val(Bytes[12], Bytes[13], 2000) ang_x = get_val(Bytes[14], Bytes[15], 180) ang_y = get_val(Bytes[16], Bytes[17], 180) ang_z = get_val(Bytes[18], Bytes[19], 180) # Send to REXYGEN outputs REX.y0.v = acc_x REX.y1.v = acc_y REX.y2.v = acc_z REX.y3.v = gyro_x REX.y4.v = gyro_y REX.y5.v = gyro_z REX.y6.v = ang_x REX.y7.v = ang_y REX.y8.v = ang_z print("Data:", acc_x, acc_y, acc_z, gyro_x, gyro_y, gyro_z, ang_x, ang_y, ang_z) async def sendData(self, data): try: if self.client and self.writer_characteristic: await self.client.write_gatt_char(self.writer_characteristic.uuid, bytes(data)) except Exception as e: print("Write failed:", e) class BLEScanner: def __init__(self, target_mac="CC:0E:4F:3B:A4:3B"): self.target_mac = target_mac self.BLEDevice = None self.device_model = None async def scan_and_connect(self): try: print("Scanning for BLE devices...") devices = await bleak.BleakScanner.discover(timeout=10.0) for d in devices: if d.address == self.target_mac: self.BLEDevice = d break if not self.BLEDevice: print("Target device not found.") REX.y9.v = 2 return self.device_model = DeviceModel("MyBLE", self.BLEDevice, None) await self.device_model.openDevice() except Exception as e: print("Scan or connect error:", e) REX.y9.v = 2 def run_ble_task(): loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) scanner = BLEScanner() loop.run_until_complete(scanner.scan_and_connect()) loop.close() # Main entry point for REXYGEN if __name__ == "__main__": print("Starting REXYGEN BLE program") REX.y9.v = 2 # Default = failed run_ble_task()
-
@har Hi Har,
I'm sorry, but we are not able to debug third-party Python code for you.
If you made it work under plain Python, then you can send the data to REXYGEN using the REST API. Have a look at "0302-03 REST API Python Bash etc", which sends data to REXYGEN using Python.
Kind regards,
Tomas