REXYGEN Community Forum
    • Categories
    • Recent
    • Tags
    • Popular
    • Login

    Bluetooth sensor with rexygen

    Communication (RS232, RS485, I2C, SPI, UDP, TCP, ...)
    2
    2
    6
    Loading More Posts
    • Oldest to Newest
    • Newest to Oldest
    • Most Votes
    Reply
    • Reply as topic
    Log in to reply
    This topic has been deleted. Only users with topic management privileges can see it.
    • H
      har
      last edited by

      Hi I am trying to find a way to get my bluetooth sensor(Witmotion WT9011DCL 9-axis BLE Magnetometer Gyroscope) to communiate with rerxygen so I can stream the data into rexygen. I have been able to connect the sensor with my raspberry Pi 4 monacrohat. I then tried to stream the data into rexygen but have been unable to do so. I have attached the my program below. b20b7866-14b5-4b69-9ec5-b5498db19a0b-image.png

      # coding:UTF-8
      import asyncio
      import bleak
      from PyRexExt import REX
      
      class DeviceModel:
          def __init__(self, deviceName, BLEDevice, callback_method):
              self.deviceName = deviceName
              self.BLEDevice = BLEDevice
              self.client = None
              self.writer_characteristic = None
              self.isOpen = False
              self.callback_method = callback_method
              self.TempBytes = []
      
          async def openDevice(self):
              try:
                  async with bleak.BleakClient(self.BLEDevice, timeout=15) as client:
                      self.client = client
                      self.isOpen = True
                      print("Device connected.")
                      REX.y9.v = 1  # Connected
      
                      target_service_uuid = "0000ffe5-0000-1000-8000-00805f9a34fb"
                      target_characteristic_uuid_read = "0000ffe4-0000-1000-8000-00805f9a34fb"
                      target_characteristic_uuid_write = "0000ffe9-0000-1000-8000-00805f9a34fb"
                      notify_characteristic = None
      
                      for service in client.services:
                          if service.uuid == target_service_uuid:
                              for char in service.characteristics:
                                  if char.uuid == target_characteristic_uuid_read:
                                      notify_characteristic = char
                                  if char.uuid == target_characteristic_uuid_write:
                                      self.writer_characteristic = char
      
                      if self.writer_characteristic:
                          await self.sendData([0xFF, 0xAA, 0x24, 0x01, 0x00])  # 6-axis mode
                          await self.sendData([0xFF, 0xAA, 0x03, 0x09, 0x00])  # 100 Hz
      
                      if notify_characteristic:
                          await client.start_notify(notify_characteristic.uuid, self.onDataReceived)
                          while self.isOpen:
                              await asyncio.sleep(1)
                          await client.stop_notify(notify_characteristic.uuid)
                      else:
                          print("Notify characteristic not found.")
                          REX.y9.v = 2  # Failed
      
              except Exception as e:
                  print("BLE connection failed:", e)
                  REX.y9.v = 2  # Failed
      
          def onDataReceived(self, sender, data):
              bytes_list = list(data)
              self.TempBytes.extend(bytes_list)
              if len(self.TempBytes) >= 20:
                  if self.TempBytes[0] == 0x55 and self.TempBytes[1] == 0x61:
                      self.processData(self.TempBytes[:20])
                  self.TempBytes = []
      
          def processData(self, Bytes):
              def get_val(low, high, scale):
                  raw = (high << 8) | low
                  val = raw - 65536 if raw >= 32768 else raw
                  return round(val / 32768 * scale, 3)
      
              acc_x = get_val(Bytes[2], Bytes[3], 16)
              acc_y = get_val(Bytes[4], Bytes[5], 16)
              acc_z = get_val(Bytes[6], Bytes[7], 16)
              gyro_x = get_val(Bytes[8], Bytes[9], 2000)
              gyro_y = get_val(Bytes[10], Bytes[11], 2000)
              gyro_z = get_val(Bytes[12], Bytes[13], 2000)
              ang_x = get_val(Bytes[14], Bytes[15], 180)
              ang_y = get_val(Bytes[16], Bytes[17], 180)
              ang_z = get_val(Bytes[18], Bytes[19], 180)
      
              # Send to REXYGEN outputs
              REX.y0.v = acc_x
              REX.y1.v = acc_y
              REX.y2.v = acc_z
              REX.y3.v = gyro_x
              REX.y4.v = gyro_y
              REX.y5.v = gyro_z
              REX.y6.v = ang_x
              REX.y7.v = ang_y
              REX.y8.v = ang_z
      
              print("Data:", acc_x, acc_y, acc_z, gyro_x, gyro_y, gyro_z, ang_x, ang_y, ang_z)
      
          async def sendData(self, data):
              try:
                  if self.client and self.writer_characteristic:
                      await self.client.write_gatt_char(self.writer_characteristic.uuid, bytes(data))
              except Exception as e:
                  print("Write failed:", e)
      
      class BLEScanner:
          def __init__(self, target_mac="CC:0E:4F:3B:A4:3B"):
              self.target_mac = target_mac
              self.BLEDevice = None
              self.device_model = None
      
          async def scan_and_connect(self):
              try:
                  print("Scanning for BLE devices...")
                  devices = await bleak.BleakScanner.discover(timeout=10.0)
                  for d in devices:
                      if d.address == self.target_mac:
                          self.BLEDevice = d
                          break
      
                  if not self.BLEDevice:
                      print("Target device not found.")
                      REX.y9.v = 2
                      return
      
                  self.device_model = DeviceModel("MyBLE", self.BLEDevice, None)
                  await self.device_model.openDevice()
      
              except Exception as e:
                  print("Scan or connect error:", e)
                  REX.y9.v = 2
      
      def run_ble_task():
          loop = asyncio.new_event_loop()
          asyncio.set_event_loop(loop)
          scanner = BLEScanner()
          loop.run_until_complete(scanner.scan_and_connect())
          loop.close()
      
      # Main entry point for REXYGEN
      if __name__ == "__main__":
          print("Starting REXYGEN BLE program")
          REX.y9.v = 2  # Default = failed
          run_ble_task()
      
      
      cechuratC 1 Reply Last reply Reply Quote 0
      • cechuratC
        cechurat @har
        last edited by

        @har Hi Har,

        I'm sorry, but we are not able to debug third-party Python code for you.

        If you made it work under plain Python, then you can send the data to REXYGEN using the REST API. Have a look at "0302-03 REST API Python Bash etc", which sends data to REXYGEN using Python.

        Kind regards,
        Tomas

        1 Reply Last reply Reply Quote 0
        • First post
          Last post

        This is a community forum for REXYGEN users and fans. Detailed information can be found at REXYGEN homepage.

        There is also an outdated REXYGEN community forum.

        Powered by NodeBB.