[chirp_devel] [boblov_x3plus] Add Boblov X3Plus radio Fixes #6073
Craigslist Reply
Wed Sep 5 13:16:03 PDT 2018
# HG changeset patch
# User Robert C Jennings <rcj4747 at gmail.com>
# Date 1535818178 18000
# Sat Sep 01 11:09:38 2018 -0500
# Node ID 0a65ceee60f5ff019c954536d2fb1162428945e0
# Parent 4873d5437a583c3a1b169808c3d29a53524bc5b2
[boblov_x3plus] Add Boblov X3Plus radio Fixes #6073
Add support for this motorcycle/bicycle helmet radio. It operates in
the EU PMR446 and FRS/GMRS frequencies for certain, but also appears
to support 70cm band work (untested).
diff --git a/chirp/drivers/boblov_x3plus.py b/chirp/drivers/boblov_x3plus.py
new file mode 100644
--- /dev/null
+++ b/chirp/drivers/boblov_x3plus.py
@@ -0,0 +1,572 @@
+"""
+Radio driver for the Boblov X3 Plus Motorcycle Helmet Radio
+"""
+# Copyright 2018 Robert C Jennings <rcj4747 at gmail.com>
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 2 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+
+import logging
+import struct
+import time
+
+from datetime import datetime
+from textwrap import dedent
+
+from chirp import (
+ bitwise,
+ chirp_common,
+ directory,
+ errors,
+ memmap,
+ util,
+)
+from chirp.settings import (
+ RadioSetting,
+ RadioSettingGroup,
+ RadioSettings,
+ RadioSettingValueBoolean,
+ RadioSettingValueInteger,
+ RadioSettingValueList,
+)
+
+LOG = logging.getLogger(__name__)
+
+
+ at directory.register
+class BoblovX3Plus(chirp_common.CloneModeRadio,
+ chirp_common.ExperimentalRadio):
+ """Boblov X3 Plus motorcycle/cycling helmet radio"""
+
+ VENDOR = 'Boblov'
+ MODEL = 'X3Plus'
+ BAUD_RATE = 9600
+ CHANNELS = 16
+
+ MEM_FORMAT = """
+ #seekto 0x0010;
+ struct {
+ lbcd rxfreq[4];
+ lbcd txfreq[4];
+ lbcd rxtone[2];
+ lbcd txtone[2];
+ u8 unknown1:1,
+ compander:1,
+ scramble:1,
+ skip:1,
+ highpower:1,
+ narrow:1,
+ unknown2:1,
+ bcl:1;
+ u8 unknown3[3];
+ } memory[16];
+ #seekto 0x03C0;
+ struct {
+ u8 unknown1:4,
+ voiceprompt:2,
+ batterysaver:1,
+ beep:1;
+ u8 squelchlevel;
+ u8 unknown2;
+ u8 timeouttimer;
+ u8 voxlevel;
+ u8 unknown3;
+ u8 unknown4;
+ u8 voxdelay;
+ } settings;
+ """
+
+ # Radio command data
+ CMD_ACK = '\x06'
+ CMD_IDENTIFY = '\x02'
+ CMD_PROGRAM_ENTER = '.VKOGRAM'
+ CMD_PROGRAM_EXIT = '\x62' # 'b'
+ CMD_READ = 'R'
+ CMD_WRITE = 'W'
+
+ BLOCK_SIZE = 0x08
+
+ VOICE_LIST = ['Off', 'Chinese', 'English']
+ TIMEOUTTIMER_LIST = ['Off', '30 seconds', '60 seconds', '90 seconds',
+ '120 seconds', '150 seconds', '180 seconds',
+ '210 seconds', '240 seconds', '270 seconds',
+ '300 seconds']
+ VOXLEVEL_LIST = ['Off', '1', '2', '3', '4', '5', '6', '7', '8', '9']
+ VOXDELAY_LIST = ['1 seconds', '2 seconds',
+ '3 seconds', '4 seconds', '5 seconds']
+ X3P_POWER_LEVELS = [chirp_common.PowerLevel('Low', watts=0.5),
+ chirp_common.PowerLevel('High', watts=2.00)]
+
+ _memsize = 0x03F0
+ _ranges = [
+ (0x0000, 0x03F0),
+ ]
+
+ @classmethod
+ def get_prompts(cls):
+ rp = chirp_common.RadioPrompts()
+ rp.experimental = _(dedent("""\
+ The X3Plus driver is currently experimental.
+
+ There are no known issues but you should proceed with caution.
+
+ Please save an unedited copy of your first successful
+ download to a CHIRP Radio Images (*.img) file.
+ """))
+ return rp
+
+ @classmethod
+ def match_model(cls, filedata, filename):
+ """Given contents of a stored file (@filedata), return True if
+ this radio driver handles the represented model"""
+
+ if len(filedata) != cls._memsize:
+ LOG.debug('Boblov_x3plus: match_model: size mismatch')
+ return False
+
+ LOG.debug('Boblov_x3plus: match_model: size matches')
+
+ if 'P310' in filedata[0x03D0:0x03D8]:
+ LOG.debug('Boblov_x3plus: match_model: radio ID matches')
+ return True
+
+ LOG.debug('Boblov_x3plus: match_model: no radio ID match')
+ return False
+
+ def get_features(self):
+ """Return a RadioFeatures object for this radio"""
+
+ rf = chirp_common.RadioFeatures()
+ rf.has_settings = True
+ rf.valid_modes = ['NFM', 'FM'] # 12.5 KHz, 25 kHz.
+ rf.valid_power_levels = self.X3P_POWER_LEVELS
+ rf.valid_skips = ['', 'S']
+ rf.valid_tmodes = ['', 'Tone', 'TSQL', 'DTCS', 'Cross']
+ rf.valid_duplexes = ['', '-', '+', 'split', 'off']
+ rf.can_odd_split = True
+ rf.has_rx_dtcs = True
+ rf.has_ctone = True
+ rf.has_cross = True
+ rf.valid_cross_modes = [
+ 'Tone->Tone',
+ 'DTCS->',
+ '->DTCS',
+ 'Tone->DTCS',
+ 'DTCS->Tone',
+ '->Tone',
+ 'DTCS->DTCS']
+ rf.has_tuning_step = False
+ rf.has_bank = False
+ rf.has_name = False
+ rf.memory_bounds = (1, self.CHANNELS)
+ rf.valid_bands = [(400000000, 470000000)]
+ return rf
+
+ def process_mmap(self):
+ """Process a newly-loaded or downloaded memory map"""
+ self._memobj = bitwise.parse(self.MEM_FORMAT, self._mmap)
+
+ def sync_in(self):
+ "Initiate a radio-to-PC clone operation"
+
+ LOG.debug('Cloning from radio')
+ status = chirp_common.Status()
+ status.msg = 'Cloning from radio'
+ status.cur = 0
+ status.max = self._memsize
+ self.status_fn(status)
+
+ self._enter_programming_mode()
+
+ data = ''
+ for addr in range(0, self._memsize, self.BLOCK_SIZE):
+ status.cur = addr + self.BLOCK_SIZE
+ self.status_fn(status)
+
+ block = self._read_block(addr, self.BLOCK_SIZE)
+ data += block
+
+ LOG.debug('Address: %04x', addr)
+ LOG.debug(util.hexprint(block))
+
+ self._exit_programming_mode()
+
+ self._mmap = memmap.MemoryMap(data)
+ self.process_mmap()
+
+ def sync_out(self):
+ "Initiate a PC-to-radio clone operation"
+
+ LOG.debug('Upload to radio')
+ status = chirp_common.Status()
+ status.msg = 'Uploading to radio'
+ status.cur = 0
+ status.max = self._memsize
+ self.status_fn(status)
+
+ self._enter_programming_mode()
+
+ for start_addr, end_addr in self._ranges:
+ for addr in range(start_addr, end_addr, self.BLOCK_SIZE):
+ status.cur = addr + self.BLOCK_SIZE
+ self.status_fn(status)
+
+ self._write_block(addr, self.BLOCK_SIZE)
+
+ self._exit_programming_mode()
+
+ def get_raw_memory(self, number):
+ """Return a raw string describing the memory at @number"""
+ return repr(self._memobj.memory[number - 1])
+
+ @staticmethod
+ def _decode_tone(val):
+ val = int(val)
+ if val == 16665:
+ return '', None, None
+ elif val >= 12000:
+ return 'DTCS', val - 12000, 'R'
+ elif val >= 8000:
+ return 'DTCS', val - 8000, 'N'
+
+ return 'Tone', val / 10.0, None
+
+ @staticmethod
+ def _encode_tone(memval, mode, value, pol):
+ if mode == '':
+ memval[0].set_raw(0xFF)
+ memval[1].set_raw(0xFF)
+ elif mode == 'Tone':
+ memval.set_value(int(value * 10))
+ elif mode == 'DTCS':
+ flag = 0x80 if pol == 'N' else 0xC0
+ memval.set_value(value)
+ memval[1].set_bits(flag)
+ else:
+ raise Exception('Internal error: invalid mode `%s`' % mode)
+
+ def get_memory(self, number):
+ """Return a Memory object for the memory at location @number"""
+ try:
+ rmem = self._memobj.memory[number - 1]
+ except KeyError:
+ raise errors.InvalidMemoryLocation('Unknown channel %s' %
number)
+
+ if number < 1 or number > self.CHANNELS:
+ raise errors.InvalidMemoryLocation(
+ 'Channel number must be 1 and %s' % self.CHANNELS)
+
+ mem = chirp_common.Memory()
+ mem.number = number
+ mem.freq = int(rmem.rxfreq) * 10
+
+ # A blank (0MHz) or 0xFFFFFFFF frequency is considered empty
+ if mem.freq == 0 and rmem.rxfreq.get_raw() == '\xFF\xFF\xFF\xFF':
+ LOG.debug('empty channel %d', number)
+ mem.freq = 0
+ mem.empty = True
+ return mem
+
+ if rmem.txfreq.get_raw() == '\xFF\xFF\xFF\xFF':
+ mem.duplex = 'off'
+ mem.offset = 0
+ elif int(rmem.rxfreq) == int(rmem.txfreq):
+ mem.duplex = ''
+ mem.offset = 0
+ else:
+ mem.duplex = '-' if int(rmem.rxfreq) > int(rmem.txfreq)
else '+'
+ mem.offset = abs(int(rmem.rxfreq) - int(rmem.txfreq)) * 10
+
+ mem.mode = 'NFM' if rmem.narrow else 'FM'
+ mem.skip = 'S' if rmem.skip else ''
+ mem.power = self.X3P_POWER_LEVELS[rmem.highpower]
+
+ txtone = self._decode_tone(rmem.txtone)
+ rxtone = self._decode_tone(rmem.rxtone)
+ chirp_common.split_tone_decode(mem, txtone, rxtone)
+
+ mem.extra = RadioSettingGroup('Extra', 'extra')
+ mem.extra.append(RadioSetting('bcl', 'Busy Channel Lockout',
+ RadioSettingValueBoolean(
+ current=(not rmem.bcl))))
+ mem.extra.append(RadioSetting('scramble', 'Scramble',
+ RadioSettingValueBoolean(
+ current=(not rmem.scramble))))
+ mem.extra.append(RadioSetting('compander', 'Compander',
+ RadioSettingValueBoolean(
+ current=(not rmem.compander))))
+
+ return mem
+
+ def set_memory(self, memory):
+ """Set the memory object @memory"""
+ rmem = self._memobj.memory[memory.number - 1]
+
+ if memory.empty:
+ rmem.set_raw('\xFF' * (rmem.size() / 8))
+ return
+
+ rmem.rxfreq = memory.freq / 10
+
+ set_txtone = True
+ if memory.duplex == 'off':
+ for i in range(0, 4):
+ rmem.txfreq[i].set_raw('\xFF')
+ # If recieve only then txtone value should be none
+ self._encode_tone(rmem.txtone, mode='', value=None,
pol=None)
+ set_txtone = False
+ elif memory.duplex == 'split':
+ rmem.txfreq = memory.offset / 10
+ elif memory.duplex == '+':
+ rmem.txfreq = (memory.freq + memory.offset) / 10
+ elif memory.duplex == '-':
+ rmem.txfreq = (memory.freq - memory.offset) / 10
+ else:
+ rmem.txfreq = memory.freq / 10
+
+ txtone, rxtone = chirp_common.split_tone_encode(memory)
+ if set_txtone:
+ self._encode_tone(rmem.txtone, *txtone)
+ self._encode_tone(rmem.rxtone, *rxtone)
+
+ rmem.narrow = 'N' in memory.mode
+ rmem.skip = memory.skip == 'S'
+
+ for setting in memory.extra:
+ # NOTE: Only three settings right now, all are inverted
+ setattr(rmem, setting.get_name(), not int(setting.value))
+
+ def get_settings(self):
+ """
+ Return a RadioSettings list containing one or more
RadioSettingGroup
+ or RadioSetting objects. These represent general settings that can
+ be adjusted on the radio.
+ """
+ cur = self._memobj.settings
+ basic = RadioSettingGroup('basic', 'Basic Settings')
+ rs = RadioSetting('squelchlevel', 'Squelch level',
+ RadioSettingValueInteger(
+ minval=0, maxval=9,
+ current=cur.squelchlevel))
+ basic.append(rs)
+ rs = RadioSetting('timeouttimer', 'Timeout timer',
+ RadioSettingValueList(
+ options=self.TIMEOUTTIMER_LIST,
+
current=self.TIMEOUTTIMER_LIST[cur.timeouttimer]))
+ basic.append(rs)
+ rs = RadioSetting('voiceprompt', 'Voice prompt',
+ RadioSettingValueList(
+ options=self.VOICE_LIST,
+ current=self.VOICE_LIST[cur.voiceprompt]))
+ basic.append(rs)
+ rs = RadioSetting('voxlevel', 'Vox level',
+ RadioSettingValueList(
+ options=self.VOXLEVEL_LIST,
+ current=self.VOXLEVEL_LIST[cur.voxlevel]))
+ basic.append(rs)
+ rs = RadioSetting('voxdelay', 'VOX delay',
+ RadioSettingValueList(
+ options=self.VOXDELAY_LIST,
+ current=self.VOXDELAY_LIST[cur.voxdelay]))
+ basic.append(rs)
+ basic.append(RadioSetting('batterysaver', 'Battery saver',
+ RadioSettingValueBoolean(
+ current=cur.batterysaver)))
+ basic.append(RadioSetting('beep', 'Beep',
+ RadioSettingValueBoolean(
+ current=cur.beep)))
+ return RadioSettings(basic)
+
+ def set_settings(self, settings):
+ """
+ Accepts the top-level RadioSettingGroup returned from
+ get_settings() and adjusts the values in the radio accordingly.
+ This function expects the entire RadioSettingGroup hierarchy
+ returned from get_settings().
+ """
+ for element in settings:
+ if not isinstance(element, RadioSetting):
+ self.set_settings(element)
+ continue
+ else:
+ try:
+ if '.' in element.get_name():
+ bits = element.get_name().split('.')
+ obj = self._memobj
+ for bit in bits[:-1]:
+ obj = getattr(obj, bit)
+ setting = bits[-1]
+ else:
+ obj = self._memobj.settings
+ setting = element.get_name()
+
+ if element.has_apply_callback():
+ LOG.debug('Using apply callback')
+ element.run_apply_callback()
+ else:
+ LOG.debug('Setting %s = %s', setting,
element.value)
+ setattr(obj, setting, element.value)
+ except Exception:
+ LOG.debug(element.get_name())
+ raise
+
+ def _write(self, data, timeout=3):
+ """
+ Write data to the serial port and consume the echoed response
+
+ The radio echos the data it is sent before replying. Send the
+ data to the radio, consume the reply, and ensure that the reply
+ is the same as the data sent.
+ """
+
+ serial = self.pipe
+ expected = len(data)
+ resp = b''
+ start = datetime.now()
+
+ # LOG.debug('WRITE(%02d): %s', expected,
util.hexprint(data).rstrip())
+ serial.write(data)
+ while True:
+ if not expected:
+ break
+ rbytes = serial.read(expected)
+ resp += rbytes
+ expected -= len(rbytes)
+ if (datetime.now() - start).seconds > timeout:
+ raise errors.RadioError('Timeout while reading from radio')
+ if resp != data:
+ raise errors.RadioError('Echoed response did not match sent
data')
+
+ def _read(self, length, timeout=3):
+ """Read data from the serial port"""
+
+ resp = b''
+ serial = self.pipe
+ remaining = length
+ start = datetime.now()
+
+ if not remaining:
+ return resp
+
+ while True:
+ rbytes = serial.read(remaining)
+ resp += rbytes
+ remaining -= len(rbytes)
+ if not remaining:
+ break
+ if (datetime.now() - start).seconds > timeout:
+ raise errors.RadioError('Timeout while reading from radio')
+ time.sleep(0.1)
+
+ # LOG.debug('READ(%02d): %s', length,
util.hexprint(resp).rstrip())
+ return resp
+
+ def _read_block(self, block_addr, block_size):
+
+ LOG.debug('Reading block %04x...', block_addr)
+ cmd = struct.pack('>cHb', self.CMD_READ, block_addr,
+ block_size)
+ resp_prefix = self.CMD_WRITE + cmd[1:]
+
+ try:
+ msg = ('Failed to write command to radio for block '
+ 'read at %04x' % block_addr)
+ self._write(cmd)
+
+ msg = ('Failed to read response from radio for block '
+ 'read at %04x' % block_addr)
+ response = self._read(len(cmd) + block_size)
+
+ if response[:len(cmd)] != resp_prefix:
+ raise errors.RadioError('Error reading block %04x, '
+ 'Command not returned.' %
(block_addr))
+
+ msg = ('Failed to write ACK to radio after block read at '
+ '%04x' % block_addr)
+ self._write(self.CMD_ACK)
+
+ msg = ('Failed to read ACK from radio after block read at '
+ '%04x' % block_addr)
+ ack = self._read(1)
+ except Exception:
+ LOG.debug(msg, exc_info=True)
+ raise errors.RadioError(msg)
+
+ if ack != self.CMD_ACK:
+ raise errors.RadioError('No ACK reading block '
+ '%04x.' % (block_addr))
+
+ return response[len(cmd):]
+
+ def _write_block(self, block_addr, block_size):
+
+ cmd = struct.pack('>cHb', self.CMD_WRITE, block_addr, block_size)
+ data = self.get_mmap()[block_addr:block_addr + 8]
+
+ LOG.debug('Writing Data:\n%s%s',
+ util.hexprint(cmd), util.hexprint(data))
+
+ try:
+ self._write(cmd + data)
+ if self._read(1) != self.CMD_ACK:
+ raise Exception('No ACK')
+ except Exception:
+ msg = 'Failed to send block to radio at %04x' % block_addr
+ LOG.debug(msg, exc_info=True)
+ raise errors.RadioError(msg)
+
+ def _enter_programming_mode(self):
+
+ LOG.debug('Entering programming mode')
+ try:
+ msg = 'Error communicating with radio entering programming
mode.'
+ self._write(self.CMD_PROGRAM_ENTER)
+ time.sleep(0.5)
+ ack = self._read(1)
+
+ if not ack:
+ raise errors.RadioError('No response from radio')
+ elif ack != self.CMD_ACK:
+ raise errors.RadioError('Radio refused to enter '
+ 'programming mode')
+
+ msg = 'Error communicating with radio during identification'
+ self._write(self.CMD_IDENTIFY)
+ ident = self._read(8)
+
+ if not ident.startswith('SMP558'):
+ LOG.debug(util.hexprint(ident))
+ raise errors.RadioError('Radio returned unknown ID string')
+
+ msg = ('Error communicating with radio while querying '
+ 'model identifier')
+ self._write(self.CMD_ACK)
+
+ msg = 'Error communicating with radio on final handshake'
+ ack = self._read(1)
+
+ if ack != self.CMD_ACK:
+ raise errors.RadioError('Radio refused to enter
programming '
+ 'mode failed on final handshake.')
+ except Exception:
+ LOG.debug(msg, exc_info=True)
+ raise errors.RadioError(msg)
+
+ def _exit_programming_mode(self):
+ try:
+ self._write(self.CMD_PROGRAM_EXIT)
+ except Exception:
+ msg = 'Radio refused to exit programming mode'
+ LOG.debug(msg, exc_info=True)
+ raise errors.RadioError(msg)
+ LOG.debug('Exited programming mode')
More information about the chirp_devel
mailing list