[chirp_devel] [PATCH] Adds support for a raw-mode Icom driver. This is required for the IC-2730A (part

Rhett Robinson
Sun Jan 14 15:56:35 PST 2018


# HG changeset patch
# User Rhett Robinson <rrhett at gmail.com>
# Date 1515970948 28800
#      Sun Jan 14 15:02:28 2018 -0800
# Node ID ebcb6aeda1b85ef516247e69b21c24d33cd1d423
# Parent  16dc67ee13d18c5645c0bdbe0d20d021da702492
Adds support for a raw-mode Icom driver. This is required for the IC-2730A (part
of #2745).

diff -r 16dc67ee13d1 -r ebcb6aeda1b8 chirp/drivers/icf.py
--- a/chirp/drivers/icf.py	Wed Jan 03 11:06:05 2018 -0500
+++ b/chirp/drivers/icf.py	Sun Jan 14 15:02:28 2018 -0800
@@ -85,6 +85,7 @@
 
     def _process_frames(self):
         if not self.data.startswith("\xFE\xFE"):
+            LOG.error("Out of sync with radio:\n%s" % util.hexprint(self.data))
             raise errors.InvalidDataError("Out of sync with radio")
         elif len(self.data) < 5:
             return []  # Not enough data for a full frame
@@ -134,11 +135,11 @@
         return self._process_frames()
 
 
-def get_model_data(pipe, mdata="\x00\x00\x00\x00"):
-    """Query the radio connected to @pipe for its model data"""
-    send_clone_frame(pipe, 0xe0, mdata, raw=True)
+def get_model_data(radio, mdata="\x00\x00\x00\x00"):
+    """Query the @radio for its model data"""
+    send_clone_frame(radio, 0xe0, mdata)
 
-    stream = RadioStream(pipe)
+    stream = RadioStream(radio.pipe)
     frames = stream.get_frames()
 
     if len(frames) != 1:
@@ -164,27 +165,11 @@
     return resp
 
 
-def send_clone_frame(pipe, cmd, data, raw=False, checksum=False):
-    """Send a clone frame with @cmd and @data to the radio attached
-    to @pipe"""
-    cs = 0
+def send_clone_frame(radio, cmd, data, checksum=False):
+    """Send a clone frame with @cmd and @data to the @radio"""
+    payload = radio.get_payload(data, checksum)
 
-    if raw:
-        hed = data
-    else:
-        hed = ""
-        for byte in data:
-            val = ord(byte)
-            hed += "%02X" % val
-            cs += val
-
-    if checksum:
-        cs = ((cs ^ 0xFFFF) + 1) & 0xFF
-        cs = "%02X" % cs
-    else:
-        cs = ""
-
-    frame = "\xfe\xfe\xee\xef%s%s%s\xfd" % (chr(cmd), hed, cs)
+    frame = "\xfe\xfe\xee\xef%s%s\xfd" % (chr(cmd), payload)
 
     if SAVE_PIPE:
         LOG.debug("Saving data...")
@@ -197,30 +182,14 @@
         # return frame
         pass
 
-    pipe.write(frame)
+    radio.pipe.write(frame)
 
     return frame
 
 
-def process_bcd(bcddata):
-    """Convert BCD-encoded data to raw"""
-    data = ""
-    i = 0
-    while i < range(len(bcddata)) and i+1 < len(bcddata):
-        try:
-            val = int("%s%s" % (bcddata[i], bcddata[i+1]), 16)
-            i += 2
-            data += struct.pack("B", val)
-        except ValueError, e:
-            LOG.error("Failed to parse byte: %s" % e)
-            break
-
-    return data
-
-
-def process_data_frame(frame, _mmap):
+def process_data_frame(radio, frame, _mmap):
     """Process a data frame, adding the payload to @_mmap"""
-    _data = process_bcd(frame.payload)
+    _data = radio.process_frame_payload(frame.payload)
     if len(_mmap) >= 0x10000:
         saddr, = struct.unpack(">I", _data[0:4])
         length, = struct.unpack("B", _data[4])
@@ -264,7 +233,7 @@
 
 
 def _clone_from_radio(radio):
-    md = get_model_data(radio.pipe)
+    md = get_model_data(radio)
 
     if md[0:4] != radio.get_model():
         LOG.info("This model: %s" % util.hexprint(md[0:4]))
@@ -274,8 +243,7 @@
     if radio.is_hispeed():
         start_hispeed_clone(radio, CMD_CLONE_OUT)
     else:
-        send_clone_frame(radio.pipe, CMD_CLONE_OUT,
-                         radio.get_model(), raw=True)
+        send_clone_frame(radio, CMD_CLONE_OUT, radio.get_model())
 
     LOG.debug("Sent clone frame")
 
@@ -291,7 +259,7 @@
 
         for frame in frames:
             if frame.cmd == CMD_CLONE_DAT:
-                src, dst = process_data_frame(frame, _mmap)
+                src, dst = process_data_frame(radio, frame, _mmap)
                 if last_size != (dst - src):
                     LOG.debug("ICF Size change from %i to %i at %04x" %
                               (last_size, dst - src, src))
@@ -342,7 +310,7 @@
             chunk = struct.pack(">HB", i, size)
         chunk += _mmap[i:i+size]
 
-        send_clone_frame(radio.pipe,
+        send_clone_frame(radio,
                          CMD_CLONE_DAT,
                          chunk,
                          checksum=True)
@@ -360,22 +328,22 @@
     # Uncomment to save out a capture of what we actually write to the radio
     # SAVE_PIPE = file("pipe_capture.log", "w", 0)
 
-    md = get_model_data(radio.pipe)
+    md = get_model_data(radio)
 
     if md[0:4] != radio.get_model():
         raise errors.RadioError("I can't talk to this model")
 
     # This mimics what the Icom software does, but isn't required and just
     # takes longer
-    # md = get_model_data(radio.pipe, model=md[0:2]+"\x00\x00")
-    # md = get_model_data(radio.pipe, model=md[0:2]+"\x00\x00")
+    # md = get_model_data(radio, mdata=md[0:2]+"\x00\x00")
+    # md = get_model_data(radio, mdata=md[0:2]+"\x00\x00")
 
     stream = RadioStream(radio.pipe)
 
     if radio.is_hispeed():
         start_hispeed_clone(radio, CMD_CLONE_IN)
     else:
-        send_clone_frame(radio.pipe, CMD_CLONE_IN, radio.get_model(), raw=True)
+        send_clone_frame(radio, CMD_CLONE_IN, radio.get_model())
 
     frames = []
 
@@ -384,7 +352,7 @@
             break
         frames += stream.get_frames()
 
-    send_clone_frame(radio.pipe, CMD_CLONE_END, radio.get_endframe(), raw=True)
+    send_clone_frame(radio, CMD_CLONE_END, radio.get_endframe())
 
     if SAVE_PIPE:
         SAVE_PIPE.close()
@@ -409,6 +377,7 @@
     try:
         return _clone_to_radio(radio)
     except Exception, e:
+        logging.exception("Failed to communicate with the radio")
         raise errors.RadioError("Failed to communicate with the radio: %s" % e)
 
 
@@ -579,6 +548,13 @@
         raise errors.RadioError("Out of slots in this bank")
 
 
+def compute_checksum(data):
+    cs = 0
+    for byte in data:
+        cs += ord(byte)
+    return ((cs ^ 0xFFFF) + 1) & 0xFF
+
+
 class IcomCloneModeRadio(chirp_common.CloneModeRadio):
     """Base class for Icom clone-mode radios"""
     VENDOR = "Icom"
@@ -612,6 +588,31 @@
         """Returns the ranges this radio likes to have in a clone"""
         return cls._ranges
 
+    def process_frame_payload(self, payload):
+        """Convert BCD-encoded data to raw"""
+        bcddata = payload
+        data = ""
+        i = 0
+        while i+1 < len(bcddata):
+            try:
+                val = int("%s%s" % (bcddata[i], bcddata[i+1]), 16)
+                i += 2
+                data += struct.pack("B", val)
+            except ValueError, e:
+                LOG.error("Failed to parse byte: %s" % e)
+                break
+
+        return data
+
+    def get_payload(self, data, checksum):
+        """Returns the data with optional checksum BCD-encoded for the radio."""
+        payload = ""
+        for byte in data:
+            payload += "%02X" % ord(byte)
+        if checksum:
+            payload += "%02X" % compute_checksum(data)
+        return payload
+
     def sync_in(self):
         self._mmap = clone_from_radio(self)
         self.process_mmap()
@@ -646,6 +647,67 @@
         return honor_speed_switch_setting(self, settings)
 
 
+def flip_high_order_bit(data):
+    return [chr(ord(d) ^ 0x80) for d in list(data)]
+
+
+def escape_raw_byte(byte):
+    """Escapes a raw byte for sending to the radio"""
+    # Certain bytes are used as control characters to the radio, so if one of
+    # these bytes is present in the stream to the radio, it gets escaped as
+    # 0xff followed by (byte & 0x0f)
+    if ord(byte) > 0xf9:
+        return "\xff%s" % (chr(ord(byte) & 0xf))
+    return byte
+
+
+def unescape_raw_bytes(escaped_data):
+    """Unescapes raw bytes from the radio."""
+    data = ""
+    i = 0
+    while i < len(escaped_data):
+        byte = escaped_data[i]
+        if byte == '\xff':
+            if i + 1 >= len(escaped_data):
+                raise errors.InvalidDataError(
+                    "Unexpected escape character at end of data")
+            i += 1
+            byte = chr(0xf0 | ord(escaped_data[i]))
+        data += byte
+        i += 1
+    return data
+
+
+class IcomRawCloneModeRadio(IcomCloneModeRadio):
+    """Subclass for Icom clone-mode radios using the raw data protocol."""
+
+    def process_frame_payload(self, payload):
+        """Payloads from a raw-clone-mode radio are already in raw format."""
+        return unescape_raw_bytes(payload)
+
+    def get_payload(self, data, checksum):
+        """Returns the data with optional checksum in raw format."""
+        if checksum:
+            cs = chr(compute_checksum(data))
+        else:
+            cs = ""
+        payload = "%s%s" % (data, cs)
+        # Escape control characters.
+        escaped_payload = [escape_raw_byte(b) for b in payload]
+        return "".join(escaped_payload)
+
+    def sync_in(self):
+        # The radio returns all the bytes with the high-order bit flipped.
+        _mmap = clone_from_radio(self)
+        _mmap = flip_high_order_bit(_mmap.get_packed())
+        self._mmap = memmap.MemoryMap(_mmap)
+        self.process_mmap()
+
+    def get_mmap(self):
+        _data = flip_high_order_bit(self._mmap.get_packed())
+        return memmap.MemoryMap(_data)
+
+
 class IcomLiveRadio(chirp_common.LiveRadio):
     """Base class for an Icom Live-mode radio"""
     VENDOR = "Icom"



More information about the chirp_devel mailing list