[chirp_devel] [boblov_x3plus] Add Boblov X3Plus radio Fixes #6073
Robert Jennings
Wed Sep 5 13:18:01 PDT 2018
Attached is the radio image for testing.
Regards,
Robert Jennings
On 09/05/2018 03:16 PM, Craigslist Reply wrote:
> # HG changeset patch
> # User Robert C Jennings <rcj4747 at gmail.com>
> # Date 1535818178 18000
> # Sat Sep 01 11:09:38 2018 -0500
> # Node ID 0a65ceee60f5ff019c954536d2fb1162428945e0
> # Parent 4873d5437a583c3a1b169808c3d29a53524bc5b2
> [boblov_x3plus] Add Boblov X3Plus radio Fixes #6073
>
> Add support for this motorcycle/bicycle helmet radio. It operates in
> the EU PMR446 and FRS/GMRS frequencies for certain, but also appears
> to support 70cm band work (untested).
>
> diff --git a/chirp/drivers/boblov_x3plus.py b/chirp/drivers/boblov_x3plus.py
> new file mode 100644
> --- /dev/null
> +++ b/chirp/drivers/boblov_x3plus.py
> @@ -0,0 +1,572 @@
> +"""
> +Radio driver for the Boblov X3 Plus Motorcycle Helmet Radio
> +"""
> +# Copyright 2018 Robert C Jennings <rcj4747 at gmail.com>
> +#
> +# This program is free software: you can redistribute it and/or modify
> +# it under the terms of the GNU General Public License as published by
> +# the Free Software Foundation, either version 2 of the License, or
> +# (at your option) any later version.
> +#
> +# This program is distributed in the hope that it will be useful,
> +# but WITHOUT ANY WARRANTY; without even the implied warranty of
> +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
> +# GNU General Public License for more details.
> +#
> +# You should have received a copy of the GNU General Public License
> +# along with this program. If not, see <http://www.gnu.org/licenses/>.
> +
> +import logging
> +import struct
> +import time
> +
> +from datetime import datetime
> +from textwrap import dedent
> +
> +from chirp import (
> + bitwise,
> + chirp_common,
> + directory,
> + errors,
> + memmap,
> + util,
> +)
> +from chirp.settings import (
> + RadioSetting,
> + RadioSettingGroup,
> + RadioSettings,
> + RadioSettingValueBoolean,
> + RadioSettingValueInteger,
> + RadioSettingValueList,
> +)
> +
> +LOG = logging.getLogger(__name__)
> +
> +
> + at directory.register
> +class BoblovX3Plus(chirp_common.CloneModeRadio,
> + chirp_common.ExperimentalRadio):
> + """Boblov X3 Plus motorcycle/cycling helmet radio"""
> +
> + VENDOR = 'Boblov'
> + MODEL = 'X3Plus'
> + BAUD_RATE = 9600
> + CHANNELS = 16
> +
> + MEM_FORMAT = """
> + #seekto 0x0010;
> + struct {
> + lbcd rxfreq[4];
> + lbcd txfreq[4];
> + lbcd rxtone[2];
> + lbcd txtone[2];
> + u8 unknown1:1,
> + compander:1,
> + scramble:1,
> + skip:1,
> + highpower:1,
> + narrow:1,
> + unknown2:1,
> + bcl:1;
> + u8 unknown3[3];
> + } memory[16];
> + #seekto 0x03C0;
> + struct {
> + u8 unknown1:4,
> + voiceprompt:2,
> + batterysaver:1,
> + beep:1;
> + u8 squelchlevel;
> + u8 unknown2;
> + u8 timeouttimer;
> + u8 voxlevel;
> + u8 unknown3;
> + u8 unknown4;
> + u8 voxdelay;
> + } settings;
> + """
> +
> + # Radio command data
> + CMD_ACK = '\x06'
> + CMD_IDENTIFY = '\x02'
> + CMD_PROGRAM_ENTER = '.VKOGRAM'
> + CMD_PROGRAM_EXIT = '\x62' # 'b'
> + CMD_READ = 'R'
> + CMD_WRITE = 'W'
> +
> + BLOCK_SIZE = 0x08
> +
> + VOICE_LIST = ['Off', 'Chinese', 'English']
> + TIMEOUTTIMER_LIST = ['Off', '30 seconds', '60 seconds', '90 seconds',
> + '120 seconds', '150 seconds', '180 seconds',
> + '210 seconds', '240 seconds', '270 seconds',
> + '300 seconds']
> + VOXLEVEL_LIST = ['Off', '1', '2', '3', '4', '5', '6', '7', '8', '9']
> + VOXDELAY_LIST = ['1 seconds', '2 seconds',
> + '3 seconds', '4 seconds', '5 seconds']
> + X3P_POWER_LEVELS = [chirp_common.PowerLevel('Low', watts=0.5),
> + chirp_common.PowerLevel('High', watts=2.00)]
> +
> + _memsize = 0x03F0
> + _ranges = [
> + (0x0000, 0x03F0),
> + ]
> +
> + @classmethod
> + def get_prompts(cls):
> + rp = chirp_common.RadioPrompts()
> + rp.experimental = _(dedent("""\
> + The X3Plus driver is currently experimental.
> +
> + There are no known issues but you should proceed with caution.
> +
> + Please save an unedited copy of your first successful
> + download to a CHIRP Radio Images (*.img) file.
> + """))
> + return rp
> +
> + @classmethod
> + def match_model(cls, filedata, filename):
> + """Given contents of a stored file (@filedata), return True if
> + this radio driver handles the represented model"""
> +
> + if len(filedata) != cls._memsize:
> + LOG.debug('Boblov_x3plus: match_model: size mismatch')
> + return False
> +
> + LOG.debug('Boblov_x3plus: match_model: size matches')
> +
> + if 'P310' in filedata[0x03D0:0x03D8]:
> + LOG.debug('Boblov_x3plus: match_model: radio ID matches')
> + return True
> +
> + LOG.debug('Boblov_x3plus: match_model: no radio ID match')
> + return False
> +
> + def get_features(self):
> + """Return a RadioFeatures object for this radio"""
> +
> + rf = chirp_common.RadioFeatures()
> + rf.has_settings = True
> + rf.valid_modes = ['NFM', 'FM'] # 12.5 KHz, 25 kHz.
> + rf.valid_power_levels = self.X3P_POWER_LEVELS
> + rf.valid_skips = ['', 'S']
> + rf.valid_tmodes = ['', 'Tone', 'TSQL', 'DTCS', 'Cross']
> + rf.valid_duplexes = ['', '-', '+', 'split', 'off']
> + rf.can_odd_split = True
> + rf.has_rx_dtcs = True
> + rf.has_ctone = True
> + rf.has_cross = True
> + rf.valid_cross_modes = [
> + 'Tone->Tone',
> + 'DTCS->',
> + '->DTCS',
> + 'Tone->DTCS',
> + 'DTCS->Tone',
> + '->Tone',
> + 'DTCS->DTCS']
> + rf.has_tuning_step = False
> + rf.has_bank = False
> + rf.has_name = False
> + rf.memory_bounds = (1, self.CHANNELS)
> + rf.valid_bands = [(400000000, 470000000)]
> + return rf
> +
> + def process_mmap(self):
> + """Process a newly-loaded or downloaded memory map"""
> + self._memobj = bitwise.parse(self.MEM_FORMAT, self._mmap)
> +
> + def sync_in(self):
> + "Initiate a radio-to-PC clone operation"
> +
> + LOG.debug('Cloning from radio')
> + status = chirp_common.Status()
> + status.msg = 'Cloning from radio'
> + status.cur = 0
> + status.max = self._memsize
> + self.status_fn(status)
> +
> + self._enter_programming_mode()
> +
> + data = ''
> + for addr in range(0, self._memsize, self.BLOCK_SIZE):
> + status.cur = addr + self.BLOCK_SIZE
> + self.status_fn(status)
> +
> + block = self._read_block(addr, self.BLOCK_SIZE)
> + data += block
> +
> + LOG.debug('Address: %04x', addr)
> + LOG.debug(util.hexprint(block))
> +
> + self._exit_programming_mode()
> +
> + self._mmap = memmap.MemoryMap(data)
> + self.process_mmap()
> +
> + def sync_out(self):
> + "Initiate a PC-to-radio clone operation"
> +
> + LOG.debug('Upload to radio')
> + status = chirp_common.Status()
> + status.msg = 'Uploading to radio'
> + status.cur = 0
> + status.max = self._memsize
> + self.status_fn(status)
> +
> + self._enter_programming_mode()
> +
> + for start_addr, end_addr in self._ranges:
> + for addr in range(start_addr, end_addr, self.BLOCK_SIZE):
> + status.cur = addr + self.BLOCK_SIZE
> + self.status_fn(status)
> +
> + self._write_block(addr, self.BLOCK_SIZE)
> +
> + self._exit_programming_mode()
> +
> + def get_raw_memory(self, number):
> + """Return a raw string describing the memory at @number"""
> + return repr(self._memobj.memory[number - 1])
> +
> + @staticmethod
> + def _decode_tone(val):
> + val = int(val)
> + if val == 16665:
> + return '', None, None
> + elif val >= 12000:
> + return 'DTCS', val - 12000, 'R'
> + elif val >= 8000:
> + return 'DTCS', val - 8000, 'N'
> +
> + return 'Tone', val / 10.0, None
> +
> + @staticmethod
> + def _encode_tone(memval, mode, value, pol):
> + if mode == '':
> + memval[0].set_raw(0xFF)
> + memval[1].set_raw(0xFF)
> + elif mode == 'Tone':
> + memval.set_value(int(value * 10))
> + elif mode == 'DTCS':
> + flag = 0x80 if pol == 'N' else 0xC0
> + memval.set_value(value)
> + memval[1].set_bits(flag)
> + else:
> + raise Exception('Internal error: invalid mode `%s`' % mode)
> +
> + def get_memory(self, number):
> + """Return a Memory object for the memory at location @number"""
> + try:
> + rmem = self._memobj.memory[number - 1]
> + except KeyError:
> + raise errors.InvalidMemoryLocation('Unknown channel %s' %
> number)
> +
> + if number < 1 or number > self.CHANNELS:
> + raise errors.InvalidMemoryLocation(
> + 'Channel number must be 1 and %s' % self.CHANNELS)
> +
> + mem = chirp_common.Memory()
> + mem.number = number
> + mem.freq = int(rmem.rxfreq) * 10
> +
> + # A blank (0MHz) or 0xFFFFFFFF frequency is considered empty
> + if mem.freq == 0 and rmem.rxfreq.get_raw() == '\xFF\xFF\xFF\xFF':
> + LOG.debug('empty channel %d', number)
> + mem.freq = 0
> + mem.empty = True
> + return mem
> +
> + if rmem.txfreq.get_raw() == '\xFF\xFF\xFF\xFF':
> + mem.duplex = 'off'
> + mem.offset = 0
> + elif int(rmem.rxfreq) == int(rmem.txfreq):
> + mem.duplex = ''
> + mem.offset = 0
> + else:
> + mem.duplex = '-' if int(rmem.rxfreq) > int(rmem.txfreq)
> else '+'
> + mem.offset = abs(int(rmem.rxfreq) - int(rmem.txfreq)) * 10
> +
> + mem.mode = 'NFM' if rmem.narrow else 'FM'
> + mem.skip = 'S' if rmem.skip else ''
> + mem.power = self.X3P_POWER_LEVELS[rmem.highpower]
> +
> + txtone = self._decode_tone(rmem.txtone)
> + rxtone = self._decode_tone(rmem.rxtone)
> + chirp_common.split_tone_decode(mem, txtone, rxtone)
> +
> + mem.extra = RadioSettingGroup('Extra', 'extra')
> + mem.extra.append(RadioSetting('bcl', 'Busy Channel Lockout',
> + RadioSettingValueBoolean(
> + current=(not rmem.bcl))))
> + mem.extra.append(RadioSetting('scramble', 'Scramble',
> + RadioSettingValueBoolean(
> + current=(not rmem.scramble))))
> + mem.extra.append(RadioSetting('compander', 'Compander',
> + RadioSettingValueBoolean(
> + current=(not rmem.compander))))
> +
> + return mem
> +
> + def set_memory(self, memory):
> + """Set the memory object @memory"""
> + rmem = self._memobj.memory[memory.number - 1]
> +
> + if memory.empty:
> + rmem.set_raw('\xFF' * (rmem.size() / 8))
> + return
> +
> + rmem.rxfreq = memory.freq / 10
> +
> + set_txtone = True
> + if memory.duplex == 'off':
> + for i in range(0, 4):
> + rmem.txfreq[i].set_raw('\xFF')
> + # If recieve only then txtone value should be none
> + self._encode_tone(rmem.txtone, mode='', value=None,
> pol=None)
> + set_txtone = False
> + elif memory.duplex == 'split':
> + rmem.txfreq = memory.offset / 10
> + elif memory.duplex == '+':
> + rmem.txfreq = (memory.freq + memory.offset) / 10
> + elif memory.duplex == '-':
> + rmem.txfreq = (memory.freq - memory.offset) / 10
> + else:
> + rmem.txfreq = memory.freq / 10
> +
> + txtone, rxtone = chirp_common.split_tone_encode(memory)
> + if set_txtone:
> + self._encode_tone(rmem.txtone, *txtone)
> + self._encode_tone(rmem.rxtone, *rxtone)
> +
> + rmem.narrow = 'N' in memory.mode
> + rmem.skip = memory.skip == 'S'
> +
> + for setting in memory.extra:
> + # NOTE: Only three settings right now, all are inverted
> + setattr(rmem, setting.get_name(), not int(setting.value))
> +
> + def get_settings(self):
> + """
> + Return a RadioSettings list containing one or more
> RadioSettingGroup
> + or RadioSetting objects. These represent general settings that can
> + be adjusted on the radio.
> + """
> + cur = self._memobj.settings
> + basic = RadioSettingGroup('basic', 'Basic Settings')
> + rs = RadioSetting('squelchlevel', 'Squelch level',
> + RadioSettingValueInteger(
> + minval=0, maxval=9,
> + current=cur.squelchlevel))
> + basic.append(rs)
> + rs = RadioSetting('timeouttimer', 'Timeout timer',
> + RadioSettingValueList(
> + options=self.TIMEOUTTIMER_LIST,
> +
> current=self.TIMEOUTTIMER_LIST[cur.timeouttimer]))
> + basic.append(rs)
> + rs = RadioSetting('voiceprompt', 'Voice prompt',
> + RadioSettingValueList(
> + options=self.VOICE_LIST,
> + current=self.VOICE_LIST[cur.voiceprompt]))
> + basic.append(rs)
> + rs = RadioSetting('voxlevel', 'Vox level',
> + RadioSettingValueList(
> + options=self.VOXLEVEL_LIST,
> + current=self.VOXLEVEL_LIST[cur.voxlevel]))
> + basic.append(rs)
> + rs = RadioSetting('voxdelay', 'VOX delay',
> + RadioSettingValueList(
> + options=self.VOXDELAY_LIST,
> + current=self.VOXDELAY_LIST[cur.voxdelay]))
> + basic.append(rs)
> + basic.append(RadioSetting('batterysaver', 'Battery saver',
> + RadioSettingValueBoolean(
> + current=cur.batterysaver)))
> + basic.append(RadioSetting('beep', 'Beep',
> + RadioSettingValueBoolean(
> + current=cur.beep)))
> + return RadioSettings(basic)
> +
> + def set_settings(self, settings):
> + """
> + Accepts the top-level RadioSettingGroup returned from
> + get_settings() and adjusts the values in the radio accordingly.
> + This function expects the entire RadioSettingGroup hierarchy
> + returned from get_settings().
> + """
> + for element in settings:
> + if not isinstance(element, RadioSetting):
> + self.set_settings(element)
> + continue
> + else:
> + try:
> + if '.' in element.get_name():
> + bits = element.get_name().split('.')
> + obj = self._memobj
> + for bit in bits[:-1]:
> + obj = getattr(obj, bit)
> + setting = bits[-1]
> + else:
> + obj = self._memobj.settings
> + setting = element.get_name()
> +
> + if element.has_apply_callback():
> + LOG.debug('Using apply callback')
> + element.run_apply_callback()
> + else:
> + LOG.debug('Setting %s = %s', setting,
> element.value)
> + setattr(obj, setting, element.value)
> + except Exception:
> + LOG.debug(element.get_name())
> + raise
> +
> + def _write(self, data, timeout=3):
> + """
> + Write data to the serial port and consume the echoed response
> +
> + The radio echos the data it is sent before replying. Send the
> + data to the radio, consume the reply, and ensure that the reply
> + is the same as the data sent.
> + """
> +
> + serial = self.pipe
> + expected = len(data)
> + resp = b''
> + start = datetime.now()
> +
> + # LOG.debug('WRITE(%02d): %s', expected,
> util.hexprint(data).rstrip())
> + serial.write(data)
> + while True:
> + if not expected:
> + break
> + rbytes = serial.read(expected)
> + resp += rbytes
> + expected -= len(rbytes)
> + if (datetime.now() - start).seconds > timeout:
> + raise errors.RadioError('Timeout while reading from radio')
> + if resp != data:
> + raise errors.RadioError('Echoed response did not match sent
> data')
> +
> + def _read(self, length, timeout=3):
> + """Read data from the serial port"""
> +
> + resp = b''
> + serial = self.pipe
> + remaining = length
> + start = datetime.now()
> +
> + if not remaining:
> + return resp
> +
> + while True:
> + rbytes = serial.read(remaining)
> + resp += rbytes
> + remaining -= len(rbytes)
> + if not remaining:
> + break
> + if (datetime.now() - start).seconds > timeout:
> + raise errors.RadioError('Timeout while reading from radio')
> + time.sleep(0.1)
> +
> + # LOG.debug('READ(%02d): %s', length,
> util.hexprint(resp).rstrip())
> + return resp
> +
> + def _read_block(self, block_addr, block_size):
> +
> + LOG.debug('Reading block %04x...', block_addr)
> + cmd = struct.pack('>cHb', self.CMD_READ, block_addr,
> + block_size)
> + resp_prefix = self.CMD_WRITE + cmd[1:]
> +
> + try:
> + msg = ('Failed to write command to radio for block '
> + 'read at %04x' % block_addr)
> + self._write(cmd)
> +
> + msg = ('Failed to read response from radio for block '
> + 'read at %04x' % block_addr)
> + response = self._read(len(cmd) + block_size)
> +
> + if response[:len(cmd)] != resp_prefix:
> + raise errors.RadioError('Error reading block %04x, '
> + 'Command not returned.' %
> (block_addr))
> +
> + msg = ('Failed to write ACK to radio after block read at '
> + '%04x' % block_addr)
> + self._write(self.CMD_ACK)
> +
> + msg = ('Failed to read ACK from radio after block read at '
> + '%04x' % block_addr)
> + ack = self._read(1)
> + except Exception:
> + LOG.debug(msg, exc_info=True)
> + raise errors.RadioError(msg)
> +
> + if ack != self.CMD_ACK:
> + raise errors.RadioError('No ACK reading block '
> + '%04x.' % (block_addr))
> +
> + return response[len(cmd):]
> +
> + def _write_block(self, block_addr, block_size):
> +
> + cmd = struct.pack('>cHb', self.CMD_WRITE, block_addr, block_size)
> + data = self.get_mmap()[block_addr:block_addr + 8]
> +
> + LOG.debug('Writing Data:\n%s%s',
> + util.hexprint(cmd), util.hexprint(data))
> +
> + try:
> + self._write(cmd + data)
> + if self._read(1) != self.CMD_ACK:
> + raise Exception('No ACK')
> + except Exception:
> + msg = 'Failed to send block to radio at %04x' % block_addr
> + LOG.debug(msg, exc_info=True)
> + raise errors.RadioError(msg)
> +
> + def _enter_programming_mode(self):
> +
> + LOG.debug('Entering programming mode')
> + try:
> + msg = 'Error communicating with radio entering programming
> mode.'
> + self._write(self.CMD_PROGRAM_ENTER)
> + time.sleep(0.5)
> + ack = self._read(1)
> +
> + if not ack:
> + raise errors.RadioError('No response from radio')
> + elif ack != self.CMD_ACK:
> + raise errors.RadioError('Radio refused to enter '
> + 'programming mode')
> +
> + msg = 'Error communicating with radio during identification'
> + self._write(self.CMD_IDENTIFY)
> + ident = self._read(8)
> +
> + if not ident.startswith('SMP558'):
> + LOG.debug(util.hexprint(ident))
> + raise errors.RadioError('Radio returned unknown ID string')
> +
> + msg = ('Error communicating with radio while querying '
> + 'model identifier')
> + self._write(self.CMD_ACK)
> +
> + msg = 'Error communicating with radio on final handshake'
> + ack = self._read(1)
> +
> + if ack != self.CMD_ACK:
> + raise errors.RadioError('Radio refused to enter
> programming '
> + 'mode failed on final handshake.')
> + except Exception:
> + LOG.debug(msg, exc_info=True)
> + raise errors.RadioError(msg)
> +
> + def _exit_programming_mode(self):
> + try:
> + self._write(self.CMD_PROGRAM_EXIT)
> + except Exception:
> + msg = 'Radio refused to exit programming mode'
> + LOG.debug(msg, exc_info=True)
> + raise errors.RadioError(msg)
> + LOG.debug('Exited programming mode')
>
-------------- next part --------------
A non-text attachment was scrubbed...
Name: Boblov_X3Plus.img
Type: application/x-raw-disk-image
Size: 1008 bytes
Desc: not available
Url : http://intrepid.danplanet.com/pipermail/chirp_devel/attachments/20180905/15a72bba/attachment-0001.bin
More information about the chirp_devel
mailing list