[chirp_devel] [PATCH] [thd74] Add a Kenwood d74 driver
Angus Ainslie
Sun Jun 14 19:43:13 PDT 2020
# HG changeset patch
# User Angus Ainslie <angus at akkea.ca>
# Date 1592188302 25200
# Sun Jun 14 19:31:42 2020 -0700
# Branch py3
# Node ID eb35a46c5f7e43518f6adc88768107efc0a3d402
# Parent b1e01577ff2505de7fcca665bf3d6af296f0f7ca
[thd74] Add a Kenwood d74 driver
Based on code from
Tom Hayward, Eric Wolak, William McKeehan
https://chirp.danplanet.com/issues/4129
This works with python3 no idea about python2
diff --git a/chirp/drivers/thd74.py b/chirp/drivers/thd74.py
new file mode 100644
--- /dev/null
+++ b/chirp/drivers/thd74.py
@@ -0,0 +1,571 @@
+import logging
+import struct
+import binascii
+
+import time
+
+from chirp import directory, bitwise, errors, chirp_common, memmap
+from chirp.settings import RadioSettingGroup, RadioSetting, RadioSettings, \
+ RadioSettingValueInteger, RadioSettingValueString, \
+ RadioSettingValueList, RadioSettingValueBoolean, \
+ InvalidValueError
+
+from . import thd72
+from chirp.util import hexprint
+
+LOG = logging.getLogger(__name__)
+
+# Save files from MCP-D74 have a 256-byte header, and possibly some oddness
+# TH-D74 memory map
+
+# 0x02000: memory flags, 4 bytes per memory
+# 0x04000: memories, each 40 bytes long
+# 0x10000: names, each 16 bytes long, null padded, ascii encoded
+
+# memory channel
+# 0 1 2 3 4 5 6 7 8 9 a b c d e f
+# [freq ] ? mode tmode/duplex rtone ctone dtcs cross_mode [offset] ?
+
+# frequency is 32-bit unsigned little-endian Hz
+
+DEFAULT_PROG_VFO = (
+ (136000000, 174000000),
+ (410000000, 470000000),
+ (118000000, 136000000),
+ (136000000, 174000000),
+ (320000000, 400000000),
+ (400000000, 524000000),
+)
+
+# Some of the blocks written by MCP have a length set of less than 0x00/256
+BLOCK_SIZES = {
+ 0x0003: 0x00B4,
+ 0x0007: 0x0068,
+}
+
+mem_format = """
+// TODO: find lockout
+
+#seekto 0x10c0;
+struct {
+ char power_on_msg[16];
+ char modem_name[16];
+} onmsg_name;
+
+#seekto 0x1200;
+struct {
+ char callsign[8];
+} callsign;
+
+#seekto 0x02000;
+struct {
+// 4 bytes long
+ u8 disabled;
+ u8 unk;
+ u8 group;
+ u8 unk2;
+} flag[1032];
+
+#seekto 0x04000;
+// TODO: deal with the 16-byte trailers of every block
+struct {
+ struct {
+ ul32 freq;
+ ul32 offset;
+
+ u8 tuning_step:4,
+ unk:4;
+ u8 mode:4,
+ unk1:4;
+ u8 tone_mode:4,
+ duplex:4;
+ u8 rtone;
+
+ u8 ctone;
+ u8 dtcs;
+ u8 cross_mode:4
+ digital_squelch:4;
+ char urcall[8];
+ char rpt1[8];
+ char rpt2[8];
+
+ u8 digital_squelch_code;
+
+ } mem[6];
+
+ u8 pad[16];
+} memory[1167]; // TODO: correct number of memories
+
+#seekto 0x10000;
+struct {
+ char name[16];
+} channel_name[1000];
+
+#seekto 0x14700;
+struct {
+ char name[16];
+} wx_name[10];
+
+#seekto 0x144d0;
+struct {
+ char name[16];
+} call_name[6];
+
+#seekto 0x14800;
+struct {
+ char name[16];
+} group_name[31];
+"""
+
+STEPS = [5.0, 6.25, None, None, 10.0, 12.5, 15.0, 20.0, 25.0, 50.0, 100.0, 9.0]
+MODES = [
+ "FM",
+ "DV",
+ "AM",
+ "LSB",
+ "USB",
+ "CW",
+ "NFM",
+ "DV"
+]
+
+def hex(data):
+ data_txt = ""
+ for idx in range(0, len(data), 16):
+ bytes = binascii.hexlify(str(data[idx:idx+16]).encode('utf8')).upper()
+ for idx in range(0, len(bytes), 2):
+ data_txt += str(bytes[idx:idx+2]) + " "
+ data_txt += "\n"
+ return data_txt.strip()
+
+class SProxy(object):
+ def __init__(self, delegate):
+ self.delegate = delegate
+
+ def read(self, len):
+ r = self.delegate.read(len)
+ LOG.debug("READ\n" + hex(r))
+ return r
+
+ def write(self, data):
+ LOG.debug("WRITE\n" + hex(data))
+ return self.delegate.write(str(data))
+
+ @property
+ def timeout(self):
+ return self.delegate.timeout
+
+ @timeout.setter
+ def timeout(self, timeout):
+ self.delegate.timeout = timeout
+
+
+
+ at directory.register
+class THD74Radio(thd72.THD72Radio):
+ MODEL = "TH-D74 (clone mode)"
+ #MODEL = "TH-D74"
+ _memsize = 500480
+ # I think baud rate might be ignored by USB-Serial stack of the D74's
+ # on-board FTDI chip, but it doesn't seem to hurt.
+ BAUD_RATE = 115200
+
+
+ #def __init__(self, pipe):
+ # pipe = SProxy(pipe)
+ # super(THD74Radio, self).__init__(pipe)
+
+ def get_features(self):
+ rf = super(THD74Radio, self).get_features()
+ rf.has_tuning_step = True
+ return rf
+
+ def process_mmap(self):
+ self._memobj = bitwise.parse(mem_format, self._mmap)
+ self._dirty_blocks = []
+
+ def sync_in(self):
+ # self._detect_baud()
+ self._mmap = self.download()
+ self.process_mmap()
+
+ def sync_out(self):
+ if len(self._dirty_blocks):
+ self.upload(self._dirty_blocks)
+ else:
+ self.upload()
+
+ def read_block(self, block, count=256):
+ cmd = struct.pack(">cHH", b"R", block, count%256)
+ print( "Read cmd %s" % cmd )
+ self.pipe.write(''.join(chr(b) for b in cmd))
+
+ r = self.pipe.read(5)
+ if len(r) != 5:
+ raise Exception("Did not receive block response")
+
+ print( "Read input %s %i %i %i %i" % ( r, ord(r[1]), ord(r[2]), ord(r[3]), ord(r[4] )))
+
+ #cmd, _block, _ = struct.unpack(">cHH", b''.join(ord(b) for b in r))
+ cmd = r[0]
+ _block = (ord(r[1]) << 8) + ord(r[2])
+ if cmd != 'W' or _block != block:
+ raise Exception("Invalid response: %s %i %i" % (cmd, block, _block))
+
+ data = ""
+ while len(data) < count:
+ data += self.pipe.read(count - len(data))
+
+ self.pipe.write(chr(0x06))
+ if self.pipe.read(1) != chr(0x06):
+ raise Exception("Did not receive post-block ACK!")
+
+ return data
+
+ def write_block(self, block, map, count=256):
+ #print("Write block ", block )
+ c = struct.pack(">cHH", b"W", block, count%256)
+ base = block * 256
+ data = map[base:base + count]
+ # It's crucial that these are written together. Otherwise the radio
+ # will fail to ACK under some conditions.
+ c_d = ''.join(chr(b) for b in c) + data
+ self.pipe.write(c_d)
+
+ ack = self.pipe.read(1)
+
+ if len(ack) == 0:
+ print("read timed out block %d - trying again" % block )
+ time.sleep(0.5)
+ ack = self.pipe.read(1)
+
+ if ack != chr(0x06):
+ print("Block %d write failed %d" % ( block, ord(ack)))
+
+ return ack == chr(0x06)
+
+ def _unlock(self):
+ """Voodoo sequence of operations to get the radio to accept our programming."""
+
+ h = self.read_block(0, 6)
+
+ unlock = ("\x57\x00\x00\x00\x30\xff\x01\xff\x00\x00\xff\xff\xff\xff\xff\x01" +
+ "\x00\x00\x00\x03\x01\x00\x00\x00\x00\x02\x00\x30\x30\x30\x00\xff" +
+ "\xff\xff\xff\xff\xff\xff\x00\xff\xff\xff\xff\xff\xff\xff\xff\xff" +
+ "\xff\xff\xff\xff\xff")
+
+ self.pipe.write(unlock)
+
+ ack = self.pipe.read(1)
+
+ if ack != chr(0x06):
+ raise errors.RadioError("Expected ack but got {} ({})".format(ord(ack), type(ack)))
+
+ c = struct.pack(">cHH", b"W", 0, 0x38C8)
+ self.pipe.write(''.join(chr(b) for b in c))
+ # magic unlock sequence
+ unlock = [0xFF] * 8 + [0] * 160 + [0xFF] * 32
+ unlock = "".join([chr(x) for x in unlock])
+ self.pipe.write(unlock)
+
+ time.sleep(0.01)
+ ack = self.pipe.read(1)
+
+ if ack != chr(0x06):
+ raise errors.RadioError("Expected ack but got {} ({})".format(ord(ack), type(ack)))
+
+ def download(self, raw=False, blocks=None):
+ if blocks is None:
+ blocks = list(range(int(self._memsize / 256)))
+ else:
+ blocks = [b for b in blocks if b < int(self._memsize / 256)]
+
+ if self.command("0M PROGRAM", 2, timeout=1.5) != "0M":
+ raise errors.RadioError("Radio didn't go into PROGRAM mode")
+
+ allblocks = list(range(int(self._memsize / 256)))
+ self.pipe.baudrate = 57600
+ try:
+ self.pipe.setRTS()
+ except AttributeError:
+ self.pipe.rts = True
+ self.pipe.read(1)
+ data = ""
+ LOG.debug("reading blocks %d..%d" % (blocks[0], blocks[-1]))
+ total = len(blocks)
+ count = 0
+ for i in allblocks:
+ if i not in blocks:
+ data += 256 * '\xff'
+ continue
+ data += self.read_block(i)
+ count += 1
+ if self.status_fn:
+ s = chirp_common.Status()
+ s.msg = "Cloning from radio"
+ s.max = total
+ s.cur = count
+ self.status_fn(s)
+
+
+ self.pipe.write("E")
+ if raw:
+ return data
+ return memmap.MemoryMap(data)
+
+ def upload(self, blocks=None):
+ # MCP-D74 sets DTR, so we should too
+ try:
+ self.pipe.setDTR()
+ except AttributeError:
+ self.pipe.dtr = True
+
+ if blocks is None:
+ blocks = list(range((int(self._memsize / 256)) - 2))
+ else:
+ blocks = [b for b in blocks if b < int(self._memsize / 256)]
+
+ if self.command("0M PROGRAM", 2, timeout=1.5) != "0M":
+ raise errors.RadioError("Radio didn't go into PROGRAM mode")
+
+ if self._unlock():
+ raise errors.RadioError("Unlock failed")
+
+ # This block definitely isn't written conventionally, so we let _unlock
+ # handle it and skip.
+ if 0 in blocks:
+ blocks.remove(0)
+
+ # For some reason MCP-D74 skips this block. If we don't, we'll get a NACK
+ # on the next one. There is also a more than 500 ms delay for the ACK.
+ if 1279 in blocks:
+ blocks.remove(1279)
+
+ print("writing blocks %d..%d" % (blocks[0], blocks[-1]))
+ total = len(blocks)
+ count = 0
+ for i in blocks:
+ time.sleep(0.001)
+ r = self.write_block(i, self._mmap, BLOCK_SIZES.get(i, 256))
+ count += 1
+ if not r:
+ raise errors.RadioError("write of block %i failed" % i)
+ if self.status_fn:
+ s = chirp_common.Status()
+ s.msg = "Cloning to radio"
+ s.max = total
+ s.cur = count
+ self.status_fn(s)
+
+ lock = ("\x57\x00\x00\x00\x06\x02\x01\xff\x00\x00\xff")
+ self.pipe.write(lock)
+
+ self.pipe.write("F")
+ # clear out blocks we uploaded from the dirty blocks list
+ self._dirty_blocks = [b for b in self._dirty_blocks if b not in blocks]
+
+ def command(self, cmd, response_length, timeout=0.5):
+ start = time.time()
+
+ LOG.debug("PC->D72: %s" % cmd)
+ default_timeout = self.pipe.timeout
+ self.pipe.write(cmd + "\r")
+ self.pipe.timeout = timeout
+ try:
+ data = self.pipe.read(response_length + 1)
+ LOG.debug("D72->PC: %s" % data.strip())
+ finally:
+ self.pipe.timeout = default_timeout
+ return data.strip()
+
+ def get_raw_memory(self, number):
+ bank = number // 6
+ idx = number % 6
+
+ _mem = self._memobj.memory[bank].mem[idx]
+ return repr(_mem) + \
+ repr(self._memobj.flag[number])
+
+ def get_id(self):
+ r = self.command("ID", 9)
+ if r.startswith("ID "):
+ return r.split(" ")[1]
+ else:
+ raise errors.RadioError("No response to ID command")
+
+ def set_channel_name(self, number, name):
+ name = name[:16] + '\x00' * 16
+ if number < 999:
+ self._memobj.channel_name[number].name = name[:16]
+ self.add_dirty_block(self._memobj.channel_name[number])
+ elif number >= 1020 and number < 1030:
+ number -= 1020
+ self._memobj.wx_name[number].name = name[:16]
+ self.add_dirty_block(self._memobj.wx_name[number])
+
+ def get_memory(self, number):
+ if isinstance(number, str):
+ try:
+ number = thd72.THD72_SPECIAL[number]
+ except KeyError:
+ raise errors.InvalidMemoryLocation("Unknown channel %s" %
+ number)
+
+ if number < 0 or number > (max(thd72.THD72_SPECIAL.values()) + 1):
+ raise errors.InvalidMemoryLocation(
+ "Number must be between 0 and 999")
+
+ bank = number // 6
+ idx = number % 6
+
+ #print("reading memory #%d bank %d entry %d" %(number, bank, idx))
+ _mem = self._memobj.memory[bank].mem[idx]
+ flag = self._memobj.flag[number]
+
+ #print("Memory mode %d" % _mem.mode)
+ if _mem.mode < len( MODES ) and MODES[_mem.mode] == "DV":
+ mem = chirp_common.DVMemory()
+ else:
+ mem = chirp_common.Memory()
+
+ mem.number = number
+
+ if number > 999:
+ mem.extd_number = thd72.THD72_SPECIAL_REV[number]
+ if flag.disabled == 0xFF:
+ mem.empty = True
+ return mem
+
+ mem.name = self.get_channel_name(number)
+ mem.freq = int(_mem.freq)
+ mem.tmode = thd72.TMODES[int(_mem.tone_mode)]
+ mem.rtone = chirp_common.TONES[_mem.rtone]
+ mem.ctone = chirp_common.TONES[_mem.ctone]
+ mem.dtcs = chirp_common.DTCS_CODES[_mem.dtcs]
+ mem.duplex = thd72.DUPLEX[int(_mem.duplex)]
+ mem.offset = _mem.offset
+ mem.mode = MODES[int(_mem.mode)]
+ mem.tuning_step = STEPS[_mem.tuning_step]
+
+ if mem.mode == "DV":
+ mem.dv_urcall = _mem.urcall
+ mem.dv_rpt1call = _mem.rpt1
+ mem.dv_rpt2call = _mem.rpt2
+ mem.dv_code = _mem.digital_squelch_code
+
+ if number < 999:
+ # mem.skip = chirp_common.SKIP_VALUES[int(flag.skip)]
+ mem.cross_mode = chirp_common.CROSS_MODES[_mem.cross_mode]
+ if number > 999:
+ mem.cross_mode = chirp_common.CROSS_MODES[0]
+ mem.immutable = ["number", "bank", "extd_number", "cross_mode"]
+ if number >= 1020 and number < 1030:
+ mem.immutable += ["freq", "offset", "tone", "mode",
+ "tmode", "ctone", "skip"] # FIXME: ALL
+ else:
+ mem.immutable += ["name"]
+
+ return mem
+
+ def set_memory(self, mem):
+ LOG.debug("set_memory(%d)" % mem.number)
+ if mem.number < 0 or mem.number > (max(thd72.THD72_SPECIAL.values()) + 1):
+ raise errors.InvalidMemoryLocation(
+ "Number must be between 0 and 999")
+
+ # weather channels can only change name, nothing else
+ if mem.number >= 1020 and mem.number < 1030:
+ self.set_channel_name(mem.number, mem.name)
+ return
+
+ flag = self._memobj.flag[mem.number]
+ self.add_dirty_block(self._memobj.flag[mem.number])
+
+ # only delete non-WX channels
+ was_empty = flag.disabled == 0xf
+ if mem.empty:
+ flag.disabled = 0xf
+ return
+ flag.disabled = 0
+
+ bank = mem.number // 6
+ idx = mem.number % 6
+
+ print("seting memory #%d bank %d entry %d" %(mem.number, bank, idx))
+ _mem = self._memobj.memory[bank].mem[idx]
+ self.add_dirty_block(_mem)
+ if was_empty:
+ self.initialize(_mem)
+
+ _mem.freq = mem.freq
+
+ if mem.number < 999:
+ self.set_channel_name(mem.number, mem.name)
+
+ _mem.tone_mode = thd72.TMODES_REV[mem.tmode]
+ _mem.rtone = chirp_common.TONES.index(mem.rtone)
+ _mem.ctone = chirp_common.TONES.index(mem.ctone)
+ _mem.dtcs = chirp_common.DTCS_CODES.index(mem.dtcs)
+ _mem.cross_mode = chirp_common.CROSS_MODES.index(mem.cross_mode)
+ _mem.duplex = thd72.DUPLEX_REV[mem.duplex]
+ _mem.offset = mem.offset
+ _mem.mode = thd72.MODES_REV[mem.mode]
+
+ prog_vfo = thd72.get_prog_vfo(mem.freq)
+ #flag.prog_vfo = prog_vfo
+
+ #if mem.number < 999:
+ # flag.skip = chirp_common.SKIP_VALUES.index(mem.skip)
+
+
+ @staticmethod
+ def _add_00_pad(val, length):
+ return val.ljust(length, "\x00")[:length]
+
+
+ @classmethod
+ def apply_callsign(cls, setting, obj):
+ callsign = setting.value.get_value().upper()
+ setattr(obj, "callsign", cls._add_00_pad(callsign, 8))
+
+
+ @classmethod
+ def apply_power_on_msg(cls, setting, obj):
+ msg = setting.value.get_value()
+ setattr(obj, "power_on_msg", cls._add_00_pad(msg, 16))
+
+
+ def _get_general_settings(self):
+ menu = RadioSettingGroup("general", "General")
+ cs = self._memobj.callsign
+
+ val = RadioSettingValueString(
+ 0, 6, str(cs.callsign).rstrip("\x00"))
+ rs = RadioSetting("cs.callsign", "Callsign", val)
+ rs.set_apply_callback(self.apply_callsign, cs)
+ menu.append(rs)
+
+ msg = self._memobj.onmsg_name
+
+ val = RadioSettingValueString(
+ 0, 16, str(msg.power_on_msg).rstrip("\x00"))
+ rs = RadioSetting("msg.power_on_msg", "Power on message", val)
+ rs.set_apply_callback(self.apply_power_on_msg, msg)
+ menu.append(rs)
+
+ return menu
+
+
+ def _get_settings(self):
+ top = RadioSettings(self._get_general_settings())
+ return top
+
+
+ def get_settings(self):
+ try:
+ return self._get_settings()
+ except:
+ import traceback
+ LOG.error("Failed to parse settings: %s", traceback.format_exc())
+ return None
+
More information about the chirp_devel
mailing list