[chirp_devel] [PATCH] [ic2730a] Add support for Icom IC-2730A. Fixes #2745

Rhett Robinson
Sun Jan 14 15:02:33 PST 2018


Oh, I sent this a minute too early. The description on the patch is
inaccurate, and should read something like:
Adds support for a raw-mode Icom driver. This is required for the IC-2730A
(part of #2745).

On Sun, Jan 14, 2018 at 2:59 PM, Rhett Robinson <rrhett at gmail.com> wrote:

> # HG changeset patch
> # User Rhett Robinson <rrhett at gmail.com>
> # Date 1515970485 28800
> #      Sun Jan 14 14:54:45 2018 -0800
> # Node ID 6ffdeb62bf3efde8a94ddd24109b9948185f832b
> # Parent  16dc67ee13d18c5645c0bdbe0d20d021da702492
> [ic2730a] Add support for Icom IC-2730A. Fixes #2745
>
> diff -r 16dc67ee13d1 -r 6ffdeb62bf3e chirp/drivers/icf.py
> --- a/chirp/drivers/icf.py      Wed Jan 03 11:06:05 2018 -0500
> +++ b/chirp/drivers/icf.py      Sun Jan 14 14:54:45 2018 -0800
> @@ -85,6 +85,7 @@
>
>      def _process_frames(self):
>          if not self.data.startswith("\xFE\xFE"):
> +            LOG.error("Out of sync with radio:\n%s" %
> util.hexprint(self.data))
>              raise errors.InvalidDataError("Out of sync with radio")
>          elif len(self.data) < 5:
>              return []  # Not enough data for a full frame
> @@ -134,11 +135,11 @@
>          return self._process_frames()
>
>
> -def get_model_data(pipe, mdata="\x00\x00\x00\x00"):
> -    """Query the radio connected to @pipe for its model data"""
> -    send_clone_frame(pipe, 0xe0, mdata, raw=True)
> +def get_model_data(radio, mdata="\x00\x00\x00\x00"):
> +    """Query the @radio for its model data"""
> +    send_clone_frame(radio, 0xe0, mdata)
>
> -    stream = RadioStream(pipe)
> +    stream = RadioStream(radio.pipe)
>      frames = stream.get_frames()
>
>      if len(frames) != 1:
> @@ -164,27 +165,11 @@
>      return resp
>
>
> -def send_clone_frame(pipe, cmd, data, raw=False, checksum=False):
> -    """Send a clone frame with @cmd and @data to the radio attached
> -    to @pipe"""
> -    cs = 0
> +def send_clone_frame(radio, cmd, data, checksum=False):
> +    """Send a clone frame with @cmd and @data to the @radio"""
> +    payload = radio.get_payload(data, checksum)
>
> -    if raw:
> -        hed = data
> -    else:
> -        hed = ""
> -        for byte in data:
> -            val = ord(byte)
> -            hed += "%02X" % val
> -            cs += val
> -
> -    if checksum:
> -        cs = ((cs ^ 0xFFFF) + 1) & 0xFF
> -        cs = "%02X" % cs
> -    else:
> -        cs = ""
> -
> -    frame = "\xfe\xfe\xee\xef%s%s%s\xfd" % (chr(cmd), hed, cs)
> +    frame = "\xfe\xfe\xee\xef%s%s\xfd" % (chr(cmd), payload)
>
>      if SAVE_PIPE:
>          LOG.debug("Saving data...")
> @@ -197,30 +182,14 @@
>          # return frame
>          pass
>
> -    pipe.write(frame)
> +    radio.pipe.write(frame)
>
>      return frame
>
>
> -def process_bcd(bcddata):
> -    """Convert BCD-encoded data to raw"""
> -    data = ""
> -    i = 0
> -    while i < range(len(bcddata)) and i+1 < len(bcddata):
> -        try:
> -            val = int("%s%s" % (bcddata[i], bcddata[i+1]), 16)
> -            i += 2
> -            data += struct.pack("B", val)
> -        except ValueError, e:
> -            LOG.error("Failed to parse byte: %s" % e)
> -            break
> -
> -    return data
> -
> -
> -def process_data_frame(frame, _mmap):
> +def process_data_frame(radio, frame, _mmap):
>      """Process a data frame, adding the payload to @_mmap"""
> -    _data = process_bcd(frame.payload)
> +    _data = radio.process_frame_payload(frame.payload)
>      if len(_mmap) >= 0x10000:
>          saddr, = struct.unpack(">I", _data[0:4])
>          length, = struct.unpack("B", _data[4])
> @@ -264,7 +233,7 @@
>
>
>  def _clone_from_radio(radio):
> -    md = get_model_data(radio.pipe)
> +    md = get_model_data(radio)
>
>      if md[0:4] != radio.get_model():
>          LOG.info("This model: %s" % util.hexprint(md[0:4]))
> @@ -274,8 +243,7 @@
>      if radio.is_hispeed():
>          start_hispeed_clone(radio, CMD_CLONE_OUT)
>      else:
> -        send_clone_frame(radio.pipe, CMD_CLONE_OUT,
> -                         radio.get_model(), raw=True)
> +        send_clone_frame(radio, CMD_CLONE_OUT, radio.get_model())
>
>      LOG.debug("Sent clone frame")
>
> @@ -291,7 +259,7 @@
>
>          for frame in frames:
>              if frame.cmd == CMD_CLONE_DAT:
> -                src, dst = process_data_frame(frame, _mmap)
> +                src, dst = process_data_frame(radio, frame, _mmap)
>                  if last_size != (dst - src):
>                      LOG.debug("ICF Size change from %i to %i at %04x" %
>                                (last_size, dst - src, src))
> @@ -342,7 +310,7 @@
>              chunk = struct.pack(">HB", i, size)
>          chunk += _mmap[i:i+size]
>
> -        send_clone_frame(radio.pipe,
> +        send_clone_frame(radio,
>                           CMD_CLONE_DAT,
>                           chunk,
>                           checksum=True)
> @@ -360,22 +328,22 @@
>      # Uncomment to save out a capture of what we actually write to the
> radio
>      # SAVE_PIPE = file("pipe_capture.log", "w", 0)
>
> -    md = get_model_data(radio.pipe)
> +    md = get_model_data(radio)
>
>      if md[0:4] != radio.get_model():
>          raise errors.RadioError("I can't talk to this model")
>
>      # This mimics what the Icom software does, but isn't required and just
>      # takes longer
> -    # md = get_model_data(radio.pipe, model=md[0:2]+"\x00\x00")
> -    # md = get_model_data(radio.pipe, model=md[0:2]+"\x00\x00")
> +    # md = get_model_data(radio, mdata=md[0:2]+"\x00\x00")
> +    # md = get_model_data(radio, mdata=md[0:2]+"\x00\x00")
>
>      stream = RadioStream(radio.pipe)
>
>      if radio.is_hispeed():
>          start_hispeed_clone(radio, CMD_CLONE_IN)
>      else:
> -        send_clone_frame(radio.pipe, CMD_CLONE_IN, radio.get_model(),
> raw=True)
> +        send_clone_frame(radio, CMD_CLONE_IN, radio.get_model())
>
>      frames = []
>
> @@ -384,7 +352,7 @@
>              break
>          frames += stream.get_frames()
>
> -    send_clone_frame(radio.pipe, CMD_CLONE_END, radio.get_endframe(),
> raw=True)
> +    send_clone_frame(radio, CMD_CLONE_END, radio.get_endframe())
>
>      if SAVE_PIPE:
>          SAVE_PIPE.close()
> @@ -409,6 +377,7 @@
>      try:
>          return _clone_to_radio(radio)
>      except Exception, e:
> +        logging.exception("Failed to communicate with the radio")
>          raise errors.RadioError("Failed to communicate with the radio:
> %s" % e)
>
>
> @@ -579,6 +548,13 @@
>          raise errors.RadioError("Out of slots in this bank")
>
>
> +def compute_checksum(data):
> +    cs = 0
> +    for byte in data:
> +        cs += ord(byte)
> +    return ((cs ^ 0xFFFF) + 1) & 0xFF
> +
> +
>  class IcomCloneModeRadio(chirp_common.CloneModeRadio):
>      """Base class for Icom clone-mode radios"""
>      VENDOR = "Icom"
> @@ -612,6 +588,31 @@
>          """Returns the ranges this radio likes to have in a clone"""
>          return cls._ranges
>
> +    def process_frame_payload(self, payload):
> +        """Convert BCD-encoded data to raw"""
> +        bcddata = payload
> +        data = ""
> +        i = 0
> +        while i+1 < len(bcddata):
> +            try:
> +                val = int("%s%s" % (bcddata[i], bcddata[i+1]), 16)
> +                i += 2
> +                data += struct.pack("B", val)
> +            except ValueError, e:
> +                LOG.error("Failed to parse byte: %s" % e)
> +                break
> +
> +        return data
> +
> +    def get_payload(self, data, checksum):
> +        """Returns the data with optional checksum BCD-encoded for the
> radio."""
> +        payload = ""
> +        for byte in data:
> +            payload += "%02X" % ord(byte)
> +        if checksum:
> +            payload += "%02X" % compute_checksum(data)
> +        return payload
> +
>      def sync_in(self):
>          self._mmap = clone_from_radio(self)
>          self.process_mmap()
> @@ -646,6 +647,67 @@
>          return honor_speed_switch_setting(self, settings)
>
>
> +def flip_high_order_bit(data):
> +    return [chr(ord(d) ^ 0x80) for d in list(data)]
> +
> +
> +def escape_raw_byte(byte):
> +    """Escapes a raw byte for sending to the radio"""
> +    # Certain bytes are used as control characters to the radio, so if
> one of
> +    # these bytes is present in the stream to the radio, it gets escaped
> as
> +    # 0xff followed by (byte & 0x0f)
> +    if ord(byte) > 0xf9:
> +        return "\xff%s" % (chr(ord(byte) & 0xf))
> +    return byte
> +
> +
> +def unescape_raw_bytes(escaped_data):
> +    """Unescapes raw bytes from the radio."""
> +    data = ""
> +    i = 0
> +    while i < len(escaped_data):
> +        byte = escaped_data[i]
> +        if byte == '\xff':
> +            if i + 1 >= len(escaped_data):
> +                raise errors.InvalidDataError(
> +                    "Unexpected escape character at end of data")
> +            i += 1
> +            byte = chr(0xf0 | ord(escaped_data[i]))
> +        data += byte
> +        i += 1
> +    return data
> +
> +
> +class IcomRawCloneModeRadio(IcomCloneModeRadio):
> +    """Subclass for Icom clone-mode radios using the raw data protocol."""
> +
> +    def process_frame_payload(self, payload):
> +        """Payloads from a raw-clone-mode radio are already in raw
> format."""
> +        return unescape_raw_bytes(payload)
> +
> +    def get_payload(self, data, checksum):
> +        """Returns the data with optional checksum in raw format."""
> +        if checksum:
> +            cs = chr(compute_checksum(data))
> +        else:
> +            cs = ""
> +        payload = "%s%s" % (data, cs)
> +        # Escape control characters.
> +        escaped_payload = [escape_raw_byte(b) for b in payload]
> +        return "".join(escaped_payload)
> +
> +    def sync_in(self):
> +        # The radio returns all the bytes with the high-order bit flipped.
> +        _mmap = clone_from_radio(self)
> +        _mmap = flip_high_order_bit(_mmap.get_packed())
> +        self._mmap = memmap.MemoryMap(_mmap)
> +        self.process_mmap()
> +
> +    def get_mmap(self):
> +        _data = flip_high_order_bit(self._mmap.get_packed())
> +        return memmap.MemoryMap(_data)
> +
> +
>  class IcomLiveRadio(chirp_common.LiveRadio):
>      """Base class for an Icom Live-mode radio"""
>      VENDOR = "Icom"
>
-------------- next part --------------
An HTML attachment was scrubbed...
URL: http://intrepid.danplanet.com/pipermail/chirp_devel/attachments/20180114/c635db23/attachment-0001.html 


More information about the chirp_devel mailing list