[chirp_devel] [PATCH] [thd74] Add a Kenwood d74 driver

Angus Ainslie
Sun Jun 14 19:43:13 PDT 2020


# HG changeset patch
# User Angus Ainslie <angus at akkea.ca>
# Date 1592188302 25200
#      Sun Jun 14 19:31:42 2020 -0700
# Branch py3
# Node ID eb35a46c5f7e43518f6adc88768107efc0a3d402
# Parent  b1e01577ff2505de7fcca665bf3d6af296f0f7ca
[thd74] Add a Kenwood d74 driver

Based on code from

Tom Hayward, Eric Wolak, William McKeehan

https://chirp.danplanet.com/issues/4129

This works with python3 no idea about python2

diff --git a/chirp/drivers/thd74.py b/chirp/drivers/thd74.py
new file mode 100644
--- /dev/null
+++ b/chirp/drivers/thd74.py
@@ -0,0 +1,571 @@
+import logging
+import struct
+import binascii
+
+import time
+
+from chirp import directory, bitwise, errors, chirp_common, memmap
+from chirp.settings import RadioSettingGroup, RadioSetting, RadioSettings, \
+            RadioSettingValueInteger, RadioSettingValueString, \
+            RadioSettingValueList, RadioSettingValueBoolean, \
+            InvalidValueError
+
+from . import thd72
+from chirp.util import hexprint
+
+LOG = logging.getLogger(__name__)
+
+# Save files from MCP-D74 have a 256-byte header, and possibly some oddness
+# TH-D74 memory map
+
+# 0x02000: memory flags, 4 bytes per memory
+# 0x04000: memories, each 40 bytes long
+# 0x10000: names, each 16 bytes long, null padded, ascii encoded
+
+# memory channel
+# 0 1 2 3  4 5     6            7     8     9    a          b c d e   f
+# [freq ]  ? mode  tmode/duplex rtone ctone dtcs cross_mode [offset]  ?
+
+# frequency is 32-bit unsigned little-endian Hz
+
+DEFAULT_PROG_VFO = (
+    (136000000, 174000000),
+    (410000000, 470000000),
+    (118000000, 136000000),
+    (136000000, 174000000),
+    (320000000, 400000000),
+    (400000000, 524000000),
+)
+
+# Some of the blocks written by MCP have a length set of less than 0x00/256
+BLOCK_SIZES = {
+    0x0003: 0x00B4,
+    0x0007: 0x0068,
+}
+
+mem_format = """
+// TODO: find lockout
+
+#seekto 0x10c0;
+struct {
+  char power_on_msg[16];
+  char modem_name[16];
+} onmsg_name;
+
+#seekto 0x1200;
+struct {
+  char callsign[8];
+} callsign;
+
+#seekto 0x02000;
+struct {
+// 4 bytes long
+  u8   disabled;
+  u8   unk;
+  u8   group;
+  u8   unk2;
+} flag[1032];
+
+#seekto 0x04000;
+// TODO: deal with the 16-byte trailers of every block
+struct {
+    struct {
+      ul32 freq;
+      ul32 offset;
+      
+      u8   tuning_step:4,
+           unk:4;
+      u8   mode:4,
+           unk1:4;
+      u8   tone_mode:4,
+           duplex:4;
+      u8   rtone;
+      
+      u8   ctone;
+      u8   dtcs;
+      u8   cross_mode:4
+           digital_squelch:4;
+      char urcall[8];
+      char rpt1[8];
+      char rpt2[8];
+      
+      u8   digital_squelch_code;
+      
+    } mem[6];
+    
+    u8 pad[16];
+} memory[1167]; // TODO: correct number of memories
+
+#seekto 0x10000;
+struct {
+  char name[16];
+} channel_name[1000];
+
+#seekto 0x14700;
+struct {
+  char name[16];
+} wx_name[10];
+
+#seekto 0x144d0;
+struct {
+  char name[16];
+} call_name[6];
+
+#seekto 0x14800;
+struct {
+  char name[16];
+} group_name[31];
+"""
+
+STEPS = [5.0, 6.25, None, None, 10.0, 12.5, 15.0, 20.0, 25.0, 50.0, 100.0, 9.0]
+MODES = [
+    "FM",
+    "DV",
+    "AM",
+    "LSB",
+    "USB",
+    "CW",
+    "NFM",
+    "DV"
+]
+
+def hex(data):
+    data_txt = ""
+    for idx in range(0, len(data), 16):
+        bytes = binascii.hexlify(str(data[idx:idx+16]).encode('utf8')).upper()
+        for idx in range(0, len(bytes), 2):
+            data_txt += str(bytes[idx:idx+2]) + " "
+        data_txt += "\n"
+    return data_txt.strip()
+
+class SProxy(object):
+    def __init__(self, delegate):
+        self.delegate = delegate
+
+    def read(self, len):
+        r = self.delegate.read(len)
+        LOG.debug("READ\n" + hex(r))
+        return r
+
+    def write(self, data):
+        LOG.debug("WRITE\n" + hex(data))
+        return self.delegate.write(str(data))
+
+    @property
+    def timeout(self):
+        return self.delegate.timeout
+
+    @timeout.setter
+    def timeout(self, timeout):
+        self.delegate.timeout = timeout
+
+
+
+ at directory.register
+class THD74Radio(thd72.THD72Radio):
+    MODEL = "TH-D74 (clone mode)"
+    #MODEL = "TH-D74"
+    _memsize = 500480
+    # I think baud rate might be ignored by USB-Serial stack of the D74's
+    # on-board FTDI chip, but it doesn't seem to hurt.
+    BAUD_RATE = 115200
+
+
+    #def __init__(self, pipe):
+    #    pipe = SProxy(pipe)
+    #    super(THD74Radio, self).__init__(pipe)
+
+    def get_features(self):
+        rf = super(THD74Radio, self).get_features()
+        rf.has_tuning_step = True
+        return rf
+
+    def process_mmap(self):
+        self._memobj = bitwise.parse(mem_format, self._mmap)
+        self._dirty_blocks = []
+
+    def sync_in(self):
+        # self._detect_baud()
+        self._mmap = self.download()
+        self.process_mmap()
+
+    def sync_out(self):
+        if len(self._dirty_blocks):
+            self.upload(self._dirty_blocks)
+        else:
+            self.upload()
+
+    def read_block(self, block, count=256):
+        cmd = struct.pack(">cHH", b"R", block, count%256)
+        print( "Read cmd %s" % cmd )
+        self.pipe.write(''.join(chr(b) for b in cmd))
+
+        r = self.pipe.read(5)
+        if len(r) != 5:
+            raise Exception("Did not receive block response")
+
+        print( "Read input %s %i %i %i %i" % ( r, ord(r[1]), ord(r[2]), ord(r[3]), ord(r[4] )))
+
+        #cmd, _block, _ = struct.unpack(">cHH", b''.join(ord(b) for b in r))
+        cmd = r[0]
+        _block = (ord(r[1]) << 8) + ord(r[2])
+        if cmd != 'W' or _block != block:
+            raise Exception("Invalid response: %s %i %i" % (cmd, block, _block))
+
+        data = ""
+        while len(data) < count:
+            data += self.pipe.read(count - len(data))
+
+        self.pipe.write(chr(0x06))
+        if self.pipe.read(1) != chr(0x06):
+            raise Exception("Did not receive post-block ACK!")
+
+        return data
+
+    def write_block(self, block, map, count=256):
+        #print("Write block ", block )
+        c = struct.pack(">cHH", b"W", block, count%256)
+        base = block * 256
+        data = map[base:base + count]
+        # It's crucial that these are written together. Otherwise the radio
+        # will fail to ACK under some conditions.
+        c_d = ''.join(chr(b) for b in c) + data
+        self.pipe.write(c_d)
+
+        ack = self.pipe.read(1)
+
+        if len(ack) == 0:
+            print("read timed out block %d - trying again" % block )
+            time.sleep(0.5)
+            ack = self.pipe.read(1)
+
+        if ack != chr(0x06):
+            print("Block %d write failed %d" % ( block, ord(ack)))
+
+        return ack == chr(0x06)
+
+    def _unlock(self):
+        """Voodoo sequence of operations to get the radio to accept our programming."""
+
+        h = self.read_block(0, 6)
+
+        unlock = ("\x57\x00\x00\x00\x30\xff\x01\xff\x00\x00\xff\xff\xff\xff\xff\x01" +
+            "\x00\x00\x00\x03\x01\x00\x00\x00\x00\x02\x00\x30\x30\x30\x00\xff" +
+            "\xff\xff\xff\xff\xff\xff\x00\xff\xff\xff\xff\xff\xff\xff\xff\xff" +
+            "\xff\xff\xff\xff\xff")
+
+        self.pipe.write(unlock)
+
+        ack = self.pipe.read(1)
+
+        if ack != chr(0x06):
+            raise errors.RadioError("Expected ack but got {} ({})".format(ord(ack), type(ack)))
+
+        c = struct.pack(">cHH", b"W", 0, 0x38C8)
+        self.pipe.write(''.join(chr(b) for b in c))
+        # magic unlock sequence
+        unlock = [0xFF] * 8 + [0] * 160 + [0xFF] * 32
+        unlock = "".join([chr(x) for x in unlock])
+        self.pipe.write(unlock)
+
+        time.sleep(0.01)
+        ack = self.pipe.read(1)
+
+        if ack != chr(0x06):
+            raise errors.RadioError("Expected ack but got {} ({})".format(ord(ack), type(ack)))
+
+    def download(self, raw=False, blocks=None):
+        if blocks is None:
+            blocks = list(range(int(self._memsize / 256)))
+        else:
+            blocks = [b for b in blocks if b < int(self._memsize / 256)]
+
+        if self.command("0M PROGRAM", 2, timeout=1.5) != "0M":
+            raise errors.RadioError("Radio didn't go into PROGRAM mode")
+
+        allblocks = list(range(int(self._memsize / 256)))
+        self.pipe.baudrate = 57600
+        try:
+            self.pipe.setRTS()
+        except AttributeError:
+            self.pipe.rts = True
+        self.pipe.read(1)
+        data = ""
+        LOG.debug("reading blocks %d..%d" % (blocks[0], blocks[-1]))
+        total = len(blocks)
+        count = 0
+        for i in allblocks:
+            if i not in blocks:
+                data += 256 * '\xff'
+                continue
+            data += self.read_block(i)
+            count += 1
+            if self.status_fn:
+                s = chirp_common.Status()
+                s.msg = "Cloning from radio"
+                s.max = total
+                s.cur = count
+                self.status_fn(s)
+
+
+        self.pipe.write("E")
+        if raw:
+            return data
+        return memmap.MemoryMap(data)
+
+    def upload(self, blocks=None):
+        # MCP-D74 sets DTR, so we should too
+        try:
+            self.pipe.setDTR()
+        except AttributeError:
+            self.pipe.dtr = True
+
+        if blocks is None:
+            blocks = list(range((int(self._memsize / 256)) - 2))
+        else:
+            blocks = [b for b in blocks if b < int(self._memsize / 256)]
+
+        if self.command("0M PROGRAM", 2, timeout=1.5) != "0M":
+            raise errors.RadioError("Radio didn't go into PROGRAM mode")
+
+        if self._unlock():
+            raise errors.RadioError("Unlock failed")
+
+        # This block definitely isn't written conventionally, so we let _unlock
+        # handle it and skip.
+        if 0 in blocks:
+            blocks.remove(0)
+
+        # For some reason MCP-D74 skips this block. If we don't, we'll get a NACK
+        # on the next one. There is also a more than 500 ms delay for the ACK.
+        if 1279 in blocks:
+            blocks.remove(1279)
+
+        print("writing blocks %d..%d" % (blocks[0], blocks[-1]))
+        total = len(blocks)
+        count = 0
+        for i in blocks:
+            time.sleep(0.001)
+            r = self.write_block(i, self._mmap, BLOCK_SIZES.get(i, 256))
+            count += 1
+            if not r:
+                raise errors.RadioError("write of block %i failed" % i)
+            if self.status_fn:
+                s = chirp_common.Status()
+                s.msg = "Cloning to radio"
+                s.max = total
+                s.cur = count
+                self.status_fn(s)
+
+        lock = ("\x57\x00\x00\x00\x06\x02\x01\xff\x00\x00\xff")
+        self.pipe.write(lock)
+
+        self.pipe.write("F")
+        # clear out blocks we uploaded from the dirty blocks list
+        self._dirty_blocks = [b for b in self._dirty_blocks if b not in blocks]
+
+    def command(self, cmd, response_length, timeout=0.5):
+        start = time.time()
+
+        LOG.debug("PC->D72: %s" % cmd)
+        default_timeout = self.pipe.timeout
+        self.pipe.write(cmd + "\r")
+        self.pipe.timeout = timeout
+        try:
+            data = self.pipe.read(response_length + 1)
+            LOG.debug("D72->PC: %s" % data.strip())
+        finally:
+            self.pipe.timeout = default_timeout
+        return data.strip()
+
+    def get_raw_memory(self, number):
+        bank = number // 6
+        idx = number % 6
+
+        _mem = self._memobj.memory[bank].mem[idx]
+        return repr(_mem) + \
+               repr(self._memobj.flag[number])
+
+    def get_id(self):
+        r = self.command("ID", 9)
+        if r.startswith("ID "):
+            return r.split(" ")[1]
+        else:
+            raise errors.RadioError("No response to ID command")
+
+    def set_channel_name(self, number, name):
+        name = name[:16] + '\x00' * 16
+        if number < 999:
+            self._memobj.channel_name[number].name = name[:16]
+            self.add_dirty_block(self._memobj.channel_name[number])
+        elif number >= 1020 and number < 1030:
+            number -= 1020
+            self._memobj.wx_name[number].name = name[:16]
+            self.add_dirty_block(self._memobj.wx_name[number])
+
+    def get_memory(self, number):
+        if isinstance(number, str):
+            try:
+                number = thd72.THD72_SPECIAL[number]
+            except KeyError:
+                raise errors.InvalidMemoryLocation("Unknown channel %s" %
+                                                   number)
+
+        if number < 0 or number > (max(thd72.THD72_SPECIAL.values()) + 1):
+            raise errors.InvalidMemoryLocation(
+                "Number must be between 0 and 999")
+
+        bank = number // 6
+        idx = number % 6
+
+        #print("reading memory #%d bank %d entry %d" %(number, bank, idx))
+        _mem = self._memobj.memory[bank].mem[idx]
+        flag = self._memobj.flag[number]
+
+        #print("Memory mode %d" % _mem.mode)
+        if _mem.mode < len( MODES ) and MODES[_mem.mode] == "DV":
+            mem = chirp_common.DVMemory()
+        else:
+            mem = chirp_common.Memory()
+
+        mem.number = number
+
+        if number > 999:
+            mem.extd_number = thd72.THD72_SPECIAL_REV[number]
+        if flag.disabled == 0xFF:
+            mem.empty = True
+            return mem
+
+        mem.name = self.get_channel_name(number)
+        mem.freq = int(_mem.freq)
+        mem.tmode = thd72.TMODES[int(_mem.tone_mode)]
+        mem.rtone = chirp_common.TONES[_mem.rtone]
+        mem.ctone = chirp_common.TONES[_mem.ctone]
+        mem.dtcs = chirp_common.DTCS_CODES[_mem.dtcs]
+        mem.duplex = thd72.DUPLEX[int(_mem.duplex)]
+        mem.offset = _mem.offset
+        mem.mode = MODES[int(_mem.mode)]
+        mem.tuning_step = STEPS[_mem.tuning_step]
+
+        if mem.mode == "DV":
+            mem.dv_urcall = _mem.urcall
+            mem.dv_rpt1call = _mem.rpt1
+            mem.dv_rpt2call = _mem.rpt2
+            mem.dv_code = _mem.digital_squelch_code
+
+        if number < 999:
+            # mem.skip = chirp_common.SKIP_VALUES[int(flag.skip)]
+            mem.cross_mode = chirp_common.CROSS_MODES[_mem.cross_mode]
+        if number > 999:
+            mem.cross_mode = chirp_common.CROSS_MODES[0]
+            mem.immutable = ["number", "bank", "extd_number", "cross_mode"]
+            if number >= 1020 and number < 1030:
+                mem.immutable += ["freq", "offset", "tone", "mode",
+                                  "tmode", "ctone", "skip"]  # FIXME: ALL
+            else:
+                mem.immutable += ["name"]
+
+        return mem
+
+    def set_memory(self, mem):
+        LOG.debug("set_memory(%d)" % mem.number)
+        if mem.number < 0 or mem.number > (max(thd72.THD72_SPECIAL.values()) + 1):
+            raise errors.InvalidMemoryLocation(
+                "Number must be between 0 and 999")
+
+        # weather channels can only change name, nothing else
+        if mem.number >= 1020 and mem.number < 1030:
+            self.set_channel_name(mem.number, mem.name)
+            return
+
+        flag = self._memobj.flag[mem.number]
+        self.add_dirty_block(self._memobj.flag[mem.number])
+
+        # only delete non-WX channels
+        was_empty = flag.disabled == 0xf
+        if mem.empty:
+            flag.disabled = 0xf
+            return
+        flag.disabled = 0
+
+        bank = mem.number // 6
+        idx = mem.number % 6
+
+        print("seting memory #%d bank %d entry %d" %(mem.number, bank, idx))
+        _mem = self._memobj.memory[bank].mem[idx]
+        self.add_dirty_block(_mem)
+        if was_empty:
+            self.initialize(_mem)
+
+        _mem.freq = mem.freq
+
+        if mem.number < 999:
+            self.set_channel_name(mem.number, mem.name)
+
+        _mem.tone_mode = thd72.TMODES_REV[mem.tmode]
+        _mem.rtone = chirp_common.TONES.index(mem.rtone)
+        _mem.ctone = chirp_common.TONES.index(mem.ctone)
+        _mem.dtcs = chirp_common.DTCS_CODES.index(mem.dtcs)
+        _mem.cross_mode = chirp_common.CROSS_MODES.index(mem.cross_mode)
+        _mem.duplex = thd72.DUPLEX_REV[mem.duplex]
+        _mem.offset = mem.offset
+        _mem.mode = thd72.MODES_REV[mem.mode]
+
+        prog_vfo = thd72.get_prog_vfo(mem.freq)
+        #flag.prog_vfo = prog_vfo
+
+        #if mem.number < 999:
+        #    flag.skip = chirp_common.SKIP_VALUES.index(mem.skip)
+
+
+    @staticmethod
+    def _add_00_pad(val, length):
+        return val.ljust(length, "\x00")[:length]
+
+
+    @classmethod
+    def apply_callsign(cls, setting, obj):
+        callsign = setting.value.get_value().upper()
+        setattr(obj, "callsign", cls._add_00_pad(callsign, 8))
+
+
+    @classmethod
+    def apply_power_on_msg(cls, setting, obj):
+        msg = setting.value.get_value()
+        setattr(obj, "power_on_msg", cls._add_00_pad(msg, 16))
+
+
+    def _get_general_settings(self):
+        menu = RadioSettingGroup("general", "General")
+        cs = self._memobj.callsign
+
+        val = RadioSettingValueString(
+            0, 6, str(cs.callsign).rstrip("\x00"))
+        rs = RadioSetting("cs.callsign", "Callsign", val)
+        rs.set_apply_callback(self.apply_callsign, cs)
+        menu.append(rs)
+
+        msg = self._memobj.onmsg_name
+
+        val = RadioSettingValueString(
+            0, 16, str(msg.power_on_msg).rstrip("\x00"))
+        rs = RadioSetting("msg.power_on_msg", "Power on message", val)
+        rs.set_apply_callback(self.apply_power_on_msg, msg)
+        menu.append(rs)
+
+        return menu
+
+
+    def _get_settings(self):
+        top = RadioSettings(self._get_general_settings())
+        return top
+
+
+    def get_settings(self):
+        try:
+            return self._get_settings()
+        except:
+            import traceback
+            LOG.error("Failed to parse settings: %s", traceback.format_exc())
+            return None
+



More information about the chirp_devel mailing list