Skip to content

Example: Event Registration and Handler Setup

Additional to the basic operations above, AQPXLIB also supports subscribing to events from the events delivered by the Protocol Exerciser.

Example code and setups

Here is an example of it.

run_i3c_event_example.py
import logging
import threading

from aqpxlib import AqProtocolExerciser
from aqpxlib.i3c import (
    ACUTE_DEFAULT_PID,
    PxI3CBus,
    PxI3CBaseController,
    PxI3C32BitRegisterTarget,
)
from aqpxlib.event import PxEventMsg
from aqpxlib.exceptions import PxAPIError

logger = logging.getLogger(__name__)

def exec_i3c_event_example():
    """Basic event registration and handler setup example for I3C.

    Steps will be performed in the following order:

    1. Create and open a connection to Acute's Protocol Exerciser.
    2. Setting proper configuration of voltage levels and pull-up resistors on the bus.
    3. Create an I3C Bus Handle Interface.
    4. Attach an I3C controller and a target device to the bus.
    5. Subscribe to the event of the target device.
    6. Perform Dynamic Address Assignment procedure (This is where we receive the assigned address event from)
    """

    with AqProtocolExerciser.connect(port=60600) as px_session:
        logger.info("Created an Exerciser session")

        # Configure an I3C Bus on SCL pin 0 and SDA pin 1
        i3c_bus = PxI3CBus(px_session, scl=0, sda=1)
        logger.info(f"Created {i3c_bus}")

        # Setting the voltage level to 3.3V and pull-up resistance to 1.5kΩ
        i3c_bus.voltage = 3300
        i3c_bus.pullup_resistance = 1500

        # Create a Controller device
        i3c_controller = PxI3CBaseController(px_session, name="i3c_controller")
        try:
            i3c_controller.attach_to_bus(i3c_bus)
        except PxAPIError:
            logger.warning("Multiple controller is not supported right now, using the existing controller")
            i3c_controller = i3c_bus.get_current_controller()
        logger.info(f"Added a I3C controller with name {i3c_controller.name}")

        # Create a Target Device
        bcr = 0x00  # Bus Characteristic Register
        dcr = 0x00  # Device Characteristic Register
        pid = ACUTE_DEFAULT_PID | (0x01 << 32) | 0 # Provisioned ID

        i3c_target = PxI3C32BitRegisterTarget(
            px_session,
            name="i3c_target",
            static_addr=0x12,
            sub_address_type=PxI3C32BitRegisterTarget.SubAddressType.NONE,
            bcr=bcr,
            dcr=dcr,
            pid=pid,
        )
        i3c_target.attach_to_bus(i3c_bus)
        logger.info(f"Added a I3C 32-bit register target with name {i3c_target.name}")

        # Subscribe to the assigned address event of the target device
        event = threading.Event()
        @i3c_target.on_event("i3_c_target_assigned_address")
        def i3c_target_assigned_address_handler(event_msg: PxEventMsg):
            """Handler for the assigned address event of the target device."""

            assigned_address = event_msg.device_event.i3_c_target_assigned_address.assigned_address
            logger.info(f"I3C target assigned address: 0x{assigned_address:02x}")
            event.set()

        # Here we directly perform DAA, which will assign the dynamic address to the target device, 
        # so our subscribed hander should be called.
        i3c_controller.do_daa()
        logger.info("Controller performed DAA")

        # to make sure we received our event
        assert event.wait(timeout=3), "Event has never been called"

        # Detach the devices from the bus
        i3c_target.detach_from_bus()
        i3c_controller.detach_from_bus()
        logger.info(f"Detached the controller and target from the bus")


if __name__ == "__main__":
    logging.basicConfig(format='[%(levelname)s] %(message)s')
    logger.setLevel(logging.INFO)
    exec_i3c_event_example()

Run the code by executing the following command in the terminal:

$ python run_i3c_event_example.py

Event Registration Decorator

The setup of the bus and devices are identical to the previous example, so we won't repeat it here. The main difference is that we are now subscribing to the assigned address event of the target device.

We will like to introduce how to use the decorator to register the event.

# Subscribe to the assigned address event of the target device
event = threading.Event()
@i3c_target.on_event("i3_c_target_assigned_address")
def i3c_target_assigned_address_handler(event_msg: pxmsg.PxEventMsg):
    """Handler for the assigned address event of the target device."""

    assigned_address = event_msg.device_event.i3_c_target_assigned_address.assigned_address
    logger.info(f"I3C target assigned address: 0x{assigned_address:02x}")
    event.set()

# Here we directly perform DAA, which will assign the dynamic address to the target device, 
# so our subscribed hander should be called.
i3c_controller.do_daa()
logger.info("Controller performed DAA")

# to make sure we received our event
assert event.wait(timeout=3), "Event has never been called"

When we perform DAA, the target device will receive an address assigned by the controller. At this point, the Protocol Exerciser will immediately notify the subscriber that the address is assigned.

The above example written in a decorator style, but you can also write it in a more declarative way by using the add_event_handler method from the target class.

The final output of the example should look like the following:

[INFO] Created an Exerciser session
[INFO] Created I3C Bus (SCL: 0, SDA: 1)
[INFO] Added a I3C controller with name i3c_controller
[INFO] Added a I3C 32-bit register target with name i3c_target
[INFO] Controller performed DAA
[INFO] I3C target assigned address: 0x08
[INFO] Detached the controller and target from the bus

Here we see that the target device has been assigned to address 0x08.